This is an automated email from the ASF dual-hosted git repository. tkhurana pushed a commit to branch PHOENIX-7562-feature-new in repository https://gitbox.apache.org/repos/asf/phoenix.git
commit fc83f5e5a2436ec3837077021ec7ed0557015440 Author: tkhurana <[email protected]> AuthorDate: Sun Jun 7 09:31:08 2026 -0700 PHOENIX-7877 Addendum ReplicationLogWriter metrics: rename, units, decomposition support Make the replication metrics source produce a clean, decomposable sync-time signal for production analysis. Naming and units: - Rename JMX metric strings with phoenixWAL prefix to avoid collision with HBase MetricsWALSource (appendTime/syncTime) on flattened dashboards. - Encode the stored unit in the metric-name suffix (Ms or Ns) so JMX consumers cannot misinterpret values. - Convert ms-unit metrics (sync, fsSync, rotation) from ns input to ms storage at the impl boundary so HBase WAL comparisons are unit-aligned. - Keep ns precision for sub-millisecond metrics (append, ringBuffer, pendingSyncWait) where ms truncation would lose all signal. Histogram types: - Switch ms-unit time histograms to MutableTimeHistogram so JMX exports TimeRangeCount_* bucket counts (correct distribution merging is then possible). - Switch batchSize and pendingSyncCount to MutableSizeHistogram for the same reason on count-shaped metrics. - Keep ns-unit histograms as plain MutableHistogram (range buckets are ms-keyed and don't apply to ns values). Decomposition support: - Gate updateRingBufferTime to SYNC events only. ringBufferTime now measures producer-side queue + drain-ahead specifically, which is what the syncTime decomposition equation needs. - Add p50/p99 fields to ReplicationLogMetricValues for the four time components used in the decomposition (sync, fsSync, ringBuffer, pendingSyncWait). The DTO is consumed only from tests; populated via HistogramImpl.snapshot() reached through reflection. - Rename DTO getters/builder methods with Max suffix where they expose histogram max, leaving counter and percentile getters as-is. ReplicationSyncTime placement: - Move the timing block in IndexRegionObserver from the call site into replicateMutations, after the early-return guards. The metric is now recorded only when replication actually does work (replication enabled, HA group present, not in test-skip mode), so percentiles reflect real replication latency rather than being diluted by zero-cost no-op samples. Test: - Add MetricsReplicationLogGroupSourceImplTest covering verbatim ns recording, ns->ms conversion, sub-ms truncation, max-across-samples, counters, and size histograms. - Update simulator and IT callers to use the renamed getters and to compare the decomposition equation in nanoseconds (with a 1ms tolerance for fsSync ms-floor truncation). - Gate testReplicationSyncPathSimulator behind -Dtest.runSimulator=true so it doesn't run in default CI. --- .../phoenix/hbase/index/IndexRegionObserver.java | 71 ++++---- .../phoenix/replication/ReplicationLogGroup.java | 7 +- .../metrics/MetricsReplicationLogGroupSource.java | 46 ++--- .../MetricsReplicationLogGroupSourceImpl.java | 93 +++++++--- .../metrics/ReplicationLogMetricValues.java | 195 ++++++++++++++++----- .../phoenix/replication/ReplicationLogGroupIT.java | 26 +-- .../replication/ReplicationLogGroupTest.java | 152 ++++++++++++++++ .../MetricsReplicationLogGroupSourceImplTest.java | 105 +++++++++++ 8 files changed, 550 insertions(+), 145 deletions(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java index 2547719695..c7825ce96c 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java @@ -2319,13 +2319,7 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { // updates CompletableFuture<Void> postIndexFuture = CompletableFuture.runAsync(() -> doPost(c, context)); - long start = EnvironmentEdgeManager.currentTimeMillis(); - try { - replicateMutations(c.getEnvironment(), miniBatchOp, context); - } finally { - long duration = EnvironmentEdgeManager.currentTimeMillis() - start; - metricSource.updateReplicationSyncTime(dataTableName, duration); - } + replicateMutations(c.getEnvironment(), miniBatchOp, context); FutureUtils.get(postIndexFuture); } } finally { @@ -2942,40 +2936,47 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { } ReplicationLogGroup group = logGroup.get(); - for (int i = 0; i < miniBatchOp.size(); i++) { - Mutation m = miniBatchOp.getOperation(i); - if (this.ignoreReplicationFilter.test(m)) { - continue; - } - // When coprocessors add cells (local index, conditional TTL, ON DUPLICATE KEY UPDATE), - // HBase merges them into the data mutation which can mix row keys and cell types. - // Split those back into individual Put/Delete mutations for correct serialization. - if (miniBatchOp.getOperationsFromCoprocessors(i) == null) { - group.append(this.dataTableName, -1, m); - } else { - for (Mutation split : splitCellsIntoMutations(m)) { - group.append(this.dataTableName, -1, split); + // Record ReplicationSyncTime only when we are actually doing work (not on early-return paths). + long start = EnvironmentEdgeManager.currentTimeMillis(); + try { + for (int i = 0; i < miniBatchOp.size(); i++) { + Mutation m = miniBatchOp.getOperation(i); + if (this.ignoreReplicationFilter.test(m)) { + continue; + } + // When coprocessors add cells (local index, conditional TTL, ON DUPLICATE KEY UPDATE), + // HBase merges them into the data mutation which can mix row keys and cell types. + // Split those back into individual Put/Delete mutations for correct serialization. + if (miniBatchOp.getOperationsFromCoprocessors(i) == null) { + group.append(this.dataTableName, -1, m); + } else { + for (Mutation split : splitCellsIntoMutations(m)) { + group.append(this.dataTableName, -1, split); + } } } - } - if (context.preIndexUpdates != null) { - for (Map.Entry<HTableInterfaceReference, Mutation> entry : context.preIndexUpdates - .entries()) { - if (this.ignoreReplicationFilter.test(entry.getValue())) { - continue; + if (context.preIndexUpdates != null) { + for (Map.Entry<HTableInterfaceReference, Mutation> entry : context.preIndexUpdates + .entries()) { + if (this.ignoreReplicationFilter.test(entry.getValue())) { + continue; + } + group.append(entry.getKey().getTableName(), -1, entry.getValue()); } - group.append(entry.getKey().getTableName(), -1, entry.getValue()); } - } - if (context.postIndexUpdates != null) { - for (Map.Entry<HTableInterfaceReference, Mutation> entry : context.postIndexUpdates - .entries()) { - if (this.ignoreReplicationFilter.test(entry.getValue())) { - continue; + if (context.postIndexUpdates != null) { + for (Map.Entry<HTableInterfaceReference, Mutation> entry : context.postIndexUpdates + .entries()) { + if (this.ignoreReplicationFilter.test(entry.getValue())) { + continue; + } + group.append(entry.getKey().getTableName(), -1, entry.getValue()); } - group.append(entry.getKey().getTableName(), -1, entry.getValue()); } + group.sync(); + } finally { + long duration = EnvironmentEdgeManager.currentTimeMillis() - start; + metricSource.updateReplicationSyncTime(this.dataTableName, duration); } - group.sync(); } } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java index 0d7a9f42e4..0a862d640a 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java @@ -1169,8 +1169,11 @@ public class ReplicationLogGroup { @Override public void onEvent(LogEvent event, long sequence, boolean endOfBatch) throws Exception { long currentTimeNs = System.nanoTime(); - long ringBufferTimeNs = currentTimeNs - event.timestampNs; - metrics.updateRingBufferTime(ringBufferTimeNs); + // Record ring-buffer wait only for SYNC events (queue + drain ahead). Producers are blocked + // on sync; data-event waits are not directly observable to a caller. + if (event.type == EVENT_TYPE_SYNC) { + metrics.updateRingBufferTime(currentTimeNs - event.timestampNs); + } batchEventCount++; if (fatalException != null) { // Append events are ignored; sync futures are failed immediately diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java index 6c0392f2b7..13dd680998 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java @@ -27,41 +27,41 @@ public interface MetricsReplicationLogGroupSource extends BaseSource { String METRICS_DESCRIPTION = "Metrics about Replication Log Operations for an HA Group"; String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME; - String ROTATION_COUNT = "rotationCount"; + String ROTATION_COUNT = "phoenixWALRotationCount"; String ROTATION_COUNT_DESC = "Total number of times rotateLog was called"; - String ROTATION_FAILURES = "rotationFailures"; - + String ROTATION_FAILURES = "phoenixWALRotationFailures"; String ROTATION_FAILURES_DESC = "Number of times log rotation has failed"; - // All time histograms in this source are nanoseconds. - String APPEND_TIME = "appendTime"; + // Time histograms encode the unit in the name suffix (Ms or Ns) so consumers cannot misinterpret. + String APPEND_TIME = "phoenixWALAppendTimeNs"; String APPEND_TIME_DESC = "Histogram of time taken for append operations in nanoseconds"; - String SYNC_TIME = "syncTime"; - String SYNC_TIME_DESC = "Histogram of time taken for sync operations in nanoseconds"; + String SYNC_TIME = "phoenixWALSyncTimeMs"; + String SYNC_TIME_DESC = "Histogram of time taken for sync operations in milliseconds"; - String ROTATION_TIME = "rotationTime"; - String ROTATION_TIME_DESC = "Histogram of time taken for log rotations in nanoseconds"; + String ROTATION_TIME = "phoenixWALRotationTimeMs"; + String ROTATION_TIME_DESC = "Histogram of time taken for log rotations in milliseconds"; - String RING_BUFFER_TIME = "ringBufferTime"; - String RING_BUFFER_TIME_DESC = "Time events spend in the ring buffer in nanoseconds"; + String RING_BUFFER_TIME = "phoenixWALSyncRingBufferTimeNs"; + String RING_BUFFER_TIME_DESC = + "Time SYNC events spend in the ring buffer (queue + drain ahead) in nanoseconds"; - String FS_SYNC_TIME = "fsSyncTime"; + String FS_SYNC_TIME = "phoenixWALFsSyncTimeMs"; String FS_SYNC_TIME_DESC = - "Histogram of time taken for the underlying filesystem sync (fsync) in nanoseconds"; + "Histogram of time taken for the underlying filesystem sync (fsync) in milliseconds"; - String BATCH_SIZE = "batchSize"; + String BATCH_SIZE = "phoenixWALBatchSize"; String BATCH_SIZE_DESC = "Histogram of number of events drained per Disruptor batch"; - String PENDING_SYNC_COUNT = "pendingSyncCount"; + String PENDING_SYNC_COUNT = "phoenixWALPendingSyncCount"; String PENDING_SYNC_COUNT_DESC = "Histogram of pending sync futures coalesced into one fsync"; - String PENDING_SYNC_WAIT_TIME = "pendingSyncWaitTime"; + String PENDING_SYNC_WAIT_TIME = "phoenixWALPendingSyncWaitTimeNs"; String PENDING_SYNC_WAIT_TIME_DESC = "Time a SYNC event waits between consumer pickup and fsync start, in nanoseconds"; - String SYNC_TO_SAF_TRANSITIONS = "syncToSafTransitions"; + String SYNC_TO_SAF_TRANSITIONS = "SyncToSafTransitions"; String SYNC_TO_SAF_TRANSITIONS_DESC = "Number of SYNC to STORE_AND_FORWARD mode transitions"; /** @@ -71,31 +71,33 @@ public interface MetricsReplicationLogGroupSource extends BaseSource { void incrementRotationCount(); /** - * Update the time taken for an append operation in nanoseconds. + * Update the time taken for an append operation. Recorded into histogram in nanoseconds. * @param timeNs Time taken in nanoseconds */ void updateAppendTime(long timeNs); /** - * Update the time taken for a sync operation in nanoseconds. + * Update the time taken for a sync operation. Recorded into histogram in milliseconds. * @param timeNs Time taken in nanoseconds */ void updateSyncTime(long timeNs); /** - * Update the time taken for a rotation operation in nanoseconds. + * Update the time taken for a rotation operation. Recorded into histogram in milliseconds. * @param timeNs Time taken in nanoseconds */ void updateRotationTime(long timeNs); /** - * Update the time an event spent in the ring buffer in nanoseconds. + * Update the time a SYNC event spent in the ring buffer (queue + drain ahead). Recorded into + * histogram in nanoseconds. * @param timeNs Time spent in nanoseconds */ void updateRingBufferTime(long timeNs); /** - * Update the time taken for the underlying filesystem sync (fsync) in nanoseconds. + * Update the time taken for the underlying filesystem sync (fsync). Recorded into histogram in + * milliseconds. * @param timeNs Time taken in nanoseconds */ void updateFsSyncTime(long timeNs); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java index 7fe679075d..4b0ab4fd84 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java @@ -17,10 +17,16 @@ */ package org.apache.phoenix.replication.metrics; +import java.lang.reflect.Field; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.metrics.BaseSourceImpl; +import org.apache.hadoop.hbase.metrics.Snapshot; +import org.apache.hadoop.hbase.metrics.impl.HistogramImpl; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.MutableFastCounter; import org.apache.hadoop.metrics2.lib.MutableHistogram; +import org.apache.hadoop.metrics2.lib.MutableSizeHistogram; +import org.apache.hadoop.metrics2.lib.MutableTimeHistogram; /** Implementation of metrics source for ReplicationLog operations. */ public class MetricsReplicationLogGroupSourceImpl extends BaseSourceImpl @@ -29,14 +35,14 @@ public class MetricsReplicationLogGroupSourceImpl extends BaseSourceImpl private final MutableFastCounter rotationCount; private final MutableFastCounter rotationFailuresCount; private final MutableFastCounter syncToSafTransitions; - private final MutableHistogram appendTime; - private final MutableHistogram syncTime; - private final MutableHistogram rotationTime; - private final MutableHistogram ringBufferTime; - private final MutableHistogram fsSyncTime; - private final MutableHistogram batchSize; - private final MutableHistogram pendingSyncCount; - private final MutableHistogram pendingSyncWaitTime; + private final MutableHistogram appendTimeNs; + private final MutableTimeHistogram syncTimeMs; + private final MutableTimeHistogram rotationTimeMs; + private final MutableHistogram ringBufferTimeNs; + private final MutableTimeHistogram fsSyncTimeMs; + private final MutableSizeHistogram batchSize; + private final MutableSizeHistogram pendingSyncCount; + private final MutableHistogram pendingSyncWaitTimeNs; public MetricsReplicationLogGroupSourceImpl(String haGroupName) { this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT, haGroupName); @@ -51,15 +57,15 @@ public class MetricsReplicationLogGroupSourceImpl extends BaseSourceImpl getMetricsRegistry().newCounter(ROTATION_FAILURES, ROTATION_FAILURES_DESC, 0L); syncToSafTransitions = getMetricsRegistry().newCounter(SYNC_TO_SAF_TRANSITIONS, SYNC_TO_SAF_TRANSITIONS_DESC, 0L); - appendTime = getMetricsRegistry().newHistogram(APPEND_TIME, APPEND_TIME_DESC); - syncTime = getMetricsRegistry().newHistogram(SYNC_TIME, SYNC_TIME_DESC); - rotationTime = getMetricsRegistry().newHistogram(ROTATION_TIME, ROTATION_TIME_DESC); - ringBufferTime = getMetricsRegistry().newHistogram(RING_BUFFER_TIME, RING_BUFFER_TIME_DESC); - fsSyncTime = getMetricsRegistry().newHistogram(FS_SYNC_TIME, FS_SYNC_TIME_DESC); - batchSize = getMetricsRegistry().newHistogram(BATCH_SIZE, BATCH_SIZE_DESC); + appendTimeNs = getMetricsRegistry().newHistogram(APPEND_TIME, APPEND_TIME_DESC); + syncTimeMs = getMetricsRegistry().newTimeHistogram(SYNC_TIME, SYNC_TIME_DESC); + rotationTimeMs = getMetricsRegistry().newTimeHistogram(ROTATION_TIME, ROTATION_TIME_DESC); + ringBufferTimeNs = getMetricsRegistry().newHistogram(RING_BUFFER_TIME, RING_BUFFER_TIME_DESC); + fsSyncTimeMs = getMetricsRegistry().newTimeHistogram(FS_SYNC_TIME, FS_SYNC_TIME_DESC); + batchSize = getMetricsRegistry().newSizeHistogram(BATCH_SIZE, BATCH_SIZE_DESC); pendingSyncCount = - getMetricsRegistry().newHistogram(PENDING_SYNC_COUNT, PENDING_SYNC_COUNT_DESC); - pendingSyncWaitTime = + getMetricsRegistry().newSizeHistogram(PENDING_SYNC_COUNT, PENDING_SYNC_COUNT_DESC); + pendingSyncWaitTimeNs = getMetricsRegistry().newHistogram(PENDING_SYNC_WAIT_TIME, PENDING_SYNC_WAIT_TIME_DESC); } @@ -85,27 +91,27 @@ public class MetricsReplicationLogGroupSourceImpl extends BaseSourceImpl @Override public void updateAppendTime(long timeNs) { - appendTime.add(timeNs); + appendTimeNs.add(timeNs); } @Override public void updateSyncTime(long timeNs) { - syncTime.add(timeNs); + syncTimeMs.add(TimeUnit.NANOSECONDS.toMillis(timeNs)); } @Override public void updateRotationTime(long timeNs) { - rotationTime.add(timeNs); + rotationTimeMs.add(TimeUnit.NANOSECONDS.toMillis(timeNs)); } @Override public void updateRingBufferTime(long timeNs) { - ringBufferTime.add(timeNs); + ringBufferTimeNs.add(timeNs); } @Override public void updateFsSyncTime(long timeNs) { - fsSyncTime.add(timeNs); + fsSyncTimeMs.add(TimeUnit.NANOSECONDS.toMillis(timeNs)); } @Override @@ -120,18 +126,51 @@ public class MetricsReplicationLogGroupSourceImpl extends BaseSourceImpl @Override public void updatePendingSyncWaitTime(long timeNs) { - pendingSyncWaitTime.add(timeNs); + pendingSyncWaitTimeNs.add(timeNs); } + /** + * Test-facing accessor that snapshots the four time histograms used in the producer-side sync + * decomposition (sync, ringBuffer, fsSync, pendingSyncWait) and returns max + p50 + p99 for each. + * <p> + * <b>Side effect:</b> the snapshot resets the underlying FastLongHistogram bins, so this method + * is destructive and intended for end-of-test inspection only. Subsequent histogram reads will + * see only data added after this call. + */ @Override public ReplicationLogMetricValues getCurrentMetricValues() { + Snapshot syncSnap = snapshot(syncTimeMs); + Snapshot ringSnap = snapshot(ringBufferTimeNs); + Snapshot fsSnap = snapshot(fsSyncTimeMs); + Snapshot pendSnap = snapshot(pendingSyncWaitTimeNs); return ReplicationLogMetricValues.builder().rotationCount(rotationCount.value()) .rotationFailuresCount(rotationFailuresCount.value()) - .syncToSafTransitions(syncToSafTransitions.value()).appendTime(appendTime.getMax()) - .syncTime(syncTime.getMax()).rotationTime(rotationTime.getMax()) - .ringBufferTime(ringBufferTime.getMax()).fsSyncTime(fsSyncTime.getMax()) - .batchSize(batchSize.getMax()).pendingSyncCount(pendingSyncCount.getMax()) - .pendingSyncWaitTime(pendingSyncWaitTime.getMax()).build(); + .syncToSafTransitions(syncToSafTransitions.value()).appendTimeMax(appendTimeNs.getMax()) + .syncTimeMax(syncSnap.getMax()).syncTimeP50(syncSnap.getMedian()) + .syncTimeP99(syncSnap.get99thPercentile()).rotationTimeMax(rotationTimeMs.getMax()) + .ringBufferTimeMax(ringSnap.getMax()).ringBufferTimeP50(ringSnap.getMedian()) + .ringBufferTimeP99(ringSnap.get99thPercentile()).fsSyncTimeMax(fsSnap.getMax()) + .fsSyncTimeP50(fsSnap.getMedian()).fsSyncTimeP99(fsSnap.get99thPercentile()) + .batchSizeMax(batchSize.getMax()).pendingSyncCountMax(pendingSyncCount.getMax()) + .pendingSyncWaitTimeMax(pendSnap.getMax()).pendingSyncWaitTimeP50(pendSnap.getMedian()) + .pendingSyncWaitTimeP99(pendSnap.get99thPercentile()).build(); + } + + /** + * Reach into MutableHistogram via reflection to call HistogramImpl.snapshot(), which exposes the + * full percentile distribution. The protected {@code MutableHistogram.histogram} field is the + * only path to these percentiles short of subclassing every histogram type. + */ + private static Snapshot snapshot(MutableHistogram histogram) { + try { + Field field = MutableHistogram.class.getDeclaredField("histogram"); + field.setAccessible(true); + HistogramImpl impl = (HistogramImpl) field.get(histogram); + return impl.snapshot(); + } catch (ReflectiveOperationException e) { + throw new IllegalStateException("Failed to access MutableHistogram.histogram via reflection", + e); + } } @Override diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogMetricValues.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogMetricValues.java index 3fa79d1d02..c177743d6b 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogMetricValues.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogMetricValues.java @@ -17,33 +17,54 @@ */ package org.apache.phoenix.replication.metrics; -/** Class to hold the values of all metrics tracked by the ReplicationLog metrics source. */ +/** + * Class to hold the values of all metrics tracked by the ReplicationLog metrics source. The + * time-named getters (e.g. {@link #getSyncTime()}) return the max observed value. Percentile + * accessors (e.g. {@link #getSyncTimeP50()}, {@link #getSyncTimeP99()}) are populated when the + * source produces this DTO via a destructive snapshot path; otherwise they are zero. + */ public class ReplicationLogMetricValues { private final long rotationCount; private final long rotationFailuresCount; private final long syncToSafTransitions; - private final long appendTime; - private final long syncTime; - private final long rotationTime; - private final long ringBufferTime; - private final long fsSyncTime; + private final long appendTimeNs; + private final long syncTimeMs; + private final long syncTimeP50Ms; + private final long syncTimeP99Ms; + private final long rotationTimeMs; + private final long ringBufferTimeNs; + private final long ringBufferTimeP50Ns; + private final long ringBufferTimeP99Ns; + private final long fsSyncTimeMs; + private final long fsSyncTimeP50Ms; + private final long fsSyncTimeP99Ms; private final long batchSize; private final long pendingSyncCount; - private final long pendingSyncWaitTime; + private final long pendingSyncWaitTimeNs; + private final long pendingSyncWaitTimeP50Ns; + private final long pendingSyncWaitTimeP99Ns; private ReplicationLogMetricValues(Builder b) { this.rotationCount = b.rotationCount; this.rotationFailuresCount = b.rotationFailuresCount; this.syncToSafTransitions = b.syncToSafTransitions; - this.appendTime = b.appendTime; - this.syncTime = b.syncTime; - this.rotationTime = b.rotationTime; - this.ringBufferTime = b.ringBufferTime; - this.fsSyncTime = b.fsSyncTime; + this.appendTimeNs = b.appendTimeNs; + this.syncTimeMs = b.syncTimeMs; + this.syncTimeP50Ms = b.syncTimeP50Ms; + this.syncTimeP99Ms = b.syncTimeP99Ms; + this.rotationTimeMs = b.rotationTimeMs; + this.ringBufferTimeNs = b.ringBufferTimeNs; + this.ringBufferTimeP50Ns = b.ringBufferTimeP50Ns; + this.ringBufferTimeP99Ns = b.ringBufferTimeP99Ns; + this.fsSyncTimeMs = b.fsSyncTimeMs; + this.fsSyncTimeP50Ms = b.fsSyncTimeP50Ms; + this.fsSyncTimeP99Ms = b.fsSyncTimeP99Ms; this.batchSize = b.batchSize; this.pendingSyncCount = b.pendingSyncCount; - this.pendingSyncWaitTime = b.pendingSyncWaitTime; + this.pendingSyncWaitTimeNs = b.pendingSyncWaitTimeNs; + this.pendingSyncWaitTimeP50Ns = b.pendingSyncWaitTimeP50Ns; + this.pendingSyncWaitTimeP99Ns = b.pendingSyncWaitTimeP99Ns; } public static Builder builder() { @@ -62,50 +83,90 @@ public class ReplicationLogMetricValues { return syncToSafTransitions; } - public long getAppendTime() { - return appendTime; + public long getAppendTimeMax() { + return appendTimeNs; + } + + public long getSyncTimeMax() { + return syncTimeMs; + } + + public long getSyncTimeP50() { + return syncTimeP50Ms; + } + + public long getSyncTimeP99() { + return syncTimeP99Ms; + } + + public long getRotationTimeMax() { + return rotationTimeMs; + } + + public long getRingBufferTimeMax() { + return ringBufferTimeNs; } - public long getSyncTime() { - return syncTime; + public long getRingBufferTimeP50() { + return ringBufferTimeP50Ns; } - public long getRotationTime() { - return rotationTime; + public long getRingBufferTimeP99() { + return ringBufferTimeP99Ns; } - public long getRingBufferTime() { - return ringBufferTime; + public long getFsSyncTimeMax() { + return fsSyncTimeMs; } - public long getFsSyncTime() { - return fsSyncTime; + public long getFsSyncTimeP50() { + return fsSyncTimeP50Ms; } - public long getBatchSize() { + public long getFsSyncTimeP99() { + return fsSyncTimeP99Ms; + } + + public long getBatchSizeMax() { return batchSize; } - public long getPendingSyncCount() { + public long getPendingSyncCountMax() { return pendingSyncCount; } - public long getPendingSyncWaitTime() { - return pendingSyncWaitTime; + public long getPendingSyncWaitTimeMax() { + return pendingSyncWaitTimeNs; + } + + public long getPendingSyncWaitTimeP50() { + return pendingSyncWaitTimeP50Ns; + } + + public long getPendingSyncWaitTimeP99() { + return pendingSyncWaitTimeP99Ns; } public static class Builder { private long rotationCount; private long rotationFailuresCount; private long syncToSafTransitions; - private long appendTime; - private long syncTime; - private long rotationTime; - private long ringBufferTime; - private long fsSyncTime; + private long appendTimeNs; + private long syncTimeMs; + private long syncTimeP50Ms; + private long syncTimeP99Ms; + private long rotationTimeMs; + private long ringBufferTimeNs; + private long ringBufferTimeP50Ns; + private long ringBufferTimeP99Ns; + private long fsSyncTimeMs; + private long fsSyncTimeP50Ms; + private long fsSyncTimeP99Ms; private long batchSize; private long pendingSyncCount; - private long pendingSyncWaitTime; + private long pendingSyncWaitTimeNs; + private long pendingSyncWaitTimeP50Ns; + private long pendingSyncWaitTimeP99Ns; public Builder rotationCount(long v) { this.rotationCount = v; @@ -122,43 +183,83 @@ public class ReplicationLogMetricValues { return this; } - public Builder appendTime(long v) { - this.appendTime = v; + public Builder appendTimeMax(long v) { + this.appendTimeNs = v; return this; } - public Builder syncTime(long v) { - this.syncTime = v; + public Builder syncTimeMax(long v) { + this.syncTimeMs = v; return this; } - public Builder rotationTime(long v) { - this.rotationTime = v; + public Builder syncTimeP50(long v) { + this.syncTimeP50Ms = v; return this; } - public Builder ringBufferTime(long v) { - this.ringBufferTime = v; + public Builder syncTimeP99(long v) { + this.syncTimeP99Ms = v; return this; } - public Builder fsSyncTime(long v) { - this.fsSyncTime = v; + public Builder rotationTimeMax(long v) { + this.rotationTimeMs = v; return this; } - public Builder batchSize(long v) { + public Builder ringBufferTimeMax(long v) { + this.ringBufferTimeNs = v; + return this; + } + + public Builder ringBufferTimeP50(long v) { + this.ringBufferTimeP50Ns = v; + return this; + } + + public Builder ringBufferTimeP99(long v) { + this.ringBufferTimeP99Ns = v; + return this; + } + + public Builder fsSyncTimeMax(long v) { + this.fsSyncTimeMs = v; + return this; + } + + public Builder fsSyncTimeP50(long v) { + this.fsSyncTimeP50Ms = v; + return this; + } + + public Builder fsSyncTimeP99(long v) { + this.fsSyncTimeP99Ms = v; + return this; + } + + public Builder batchSizeMax(long v) { this.batchSize = v; return this; } - public Builder pendingSyncCount(long v) { + public Builder pendingSyncCountMax(long v) { this.pendingSyncCount = v; return this; } - public Builder pendingSyncWaitTime(long v) { - this.pendingSyncWaitTime = v; + public Builder pendingSyncWaitTimeMax(long v) { + this.pendingSyncWaitTimeNs = v; + return this; + } + + public Builder pendingSyncWaitTimeP50(long v) { + this.pendingSyncWaitTimeP50Ns = v; + return this; + } + + public Builder pendingSyncWaitTimeP99(long v) { + this.pendingSyncWaitTimeP99Ns = v; return this; } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java index d70dabeb47..776fbcb509 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java @@ -181,18 +181,20 @@ public class ReplicationLogGroupIT extends HABaseIT { private void assertMetricsEmitted() { ReplicationLogMetricValues values = logGroup.getMetrics().getCurrentMetricValues(); - assertTrue("appendTime should be > 0, got " + values.getAppendTime(), - values.getAppendTime() > 0); - assertTrue("syncTime should be > 0, got " + values.getSyncTime(), values.getSyncTime() > 0); - assertTrue("ringBufferTime should be > 0, got " + values.getRingBufferTime(), - values.getRingBufferTime() > 0); - assertTrue("fsSyncTime should be > 0, got " + values.getFsSyncTime(), - values.getFsSyncTime() > 0); - assertTrue("batchSize should be > 0, got " + values.getBatchSize(), values.getBatchSize() > 0); - assertTrue("pendingSyncCount should be > 0, got " + values.getPendingSyncCount(), - values.getPendingSyncCount() > 0); - assertTrue("pendingSyncWaitTime should be > 0, got " + values.getPendingSyncWaitTime(), - values.getPendingSyncWaitTime() > 0); + assertTrue("appendTime should be > 0, got " + values.getAppendTimeMax(), + values.getAppendTimeMax() > 0); + assertTrue("syncTime should be > 0, got " + values.getSyncTimeMax(), + values.getSyncTimeMax() > 0); + assertTrue("ringBufferTime should be > 0, got " + values.getRingBufferTimeMax(), + values.getRingBufferTimeMax() > 0); + assertTrue("fsSyncTime should be > 0, got " + values.getFsSyncTimeMax(), + values.getFsSyncTimeMax() > 0); + assertTrue("batchSize should be > 0, got " + values.getBatchSizeMax(), + values.getBatchSizeMax() > 0); + assertTrue("pendingSyncCount should be > 0, got " + values.getPendingSyncCountMax(), + values.getPendingSyncCountMax() > 0); + assertTrue("pendingSyncWaitTime should be > 0, got " + values.getPendingSyncWaitTimeMax(), + values.getPendingSyncWaitTimeMax() > 0); } private void dumpTableLogCount(Map<String, List<Mutation>> mutationsByTable) { diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java index 349fdb8620..2f81671835 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java @@ -45,8 +45,12 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.Mutation; import org.apache.phoenix.jdbc.HAGroupStoreRecord; @@ -58,6 +62,8 @@ import org.apache.phoenix.replication.log.LogFileReaderContext; import org.apache.phoenix.replication.log.LogFileTestUtil; import org.apache.phoenix.replication.log.LogFileWriter; import org.apache.phoenix.replication.log.LogFileWriterContext; +import org.apache.phoenix.replication.metrics.ReplicationLogMetricValues; +import org.junit.Assume; import org.junit.Test; import org.mockito.InOrder; import org.mockito.Mockito; @@ -2075,4 +2081,150 @@ public class ReplicationLogGroupTest extends ReplicationLogBaseTest { holdConsumer.countDown(); filler.join(5000); } + + /** + * Measures sync coalescing effectiveness when many producer threads append+sync concurrently. + * Adds a small delay inside the inner writer's sync() so the consumer holds long enough for + * additional SYNC events to queue behind it on the ring buffer; under contention the + * LogEventHandler should consolidate them into a single inner sync per Disruptor batch. Logs the + * producer-sync count, inner-sync count, coalescing ratio, and metric histograms; no assertions — + * purpose is observation. + */ + @Test + public void testReplicationSyncPathSimulator() throws Exception { + Assume.assumeTrue("Simulator test, opt in with -Dtest.runSimulator=true", + Boolean.getBoolean("test.runSimulator")); + final String tableName = "TBLSCM"; + final int producerCount = Integer.getInteger("test.producerCount", 64); + final int syncsPerProducer = Integer.getInteger("test.syncsPerProducer", 20); + final int appendsPerSync = Integer.getInteger("test.appendsPerSync", 5); + final int cellsPerMutation = Integer.getInteger("test.cellsPerMutation", 1); + final long innerSyncDelayMs = Long.getLong("test.innerSyncDelayMs", 2); + + // Use the production-default ring buffer size so producers are not artificially blocked on + // ringBuffer.next() — the default test fixture uses a 32-slot buffer which fills under + // contention and inflates the producer-perceived sync latency. + conf.setInt(ReplicationLogGroup.REPLICATION_LOG_RINGBUFFER_SIZE_KEY, + ReplicationLogGroup.DEFAULT_REPLICATION_LOG_RINGBUFFER_SIZE); + // Disable size-based rotation for the duration of the run so coalescing measurements are not + // contaminated by rotation overhead. The fixture's 10 KB threshold otherwise causes 5-20 + // rotations per run at high producerCount. + conf.setLong(ReplicationLogGroup.REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY, Long.MAX_VALUE); + recreateLogGroup(); + + final List<LogFileWriter> allWriters = + java.util.Collections.synchronizedList(new ArrayList<>()); + + // Apply the sleep stub to a writer so concurrent producers' SYNC events queue behind it, + // and register it for later invocation counting. + java.util.function.Consumer<LogFileWriter> instrumentWriter = w -> { + try { + doAnswer(invocation -> { + sleep(innerSyncDelayMs); + return invocation.callRealMethod(); + }).when(w).sync(); + } catch (IOException e) { + throw new RuntimeException(e); + } + allWriters.add(w); + }; + + // Instrument the initial writer. + LogFileWriter initialWriter = logGroup.getActiveLog().getWriter(); + assertNotNull("Inner writer should not be null", initialWriter); + instrumentWriter.accept(initialWriter); + + // Instrument every writer the rotation path creates. + ReplicationLog activeLog = logGroup.getActiveLog(); + doAnswer(invocation -> { + LogFileWriter w = (LogFileWriter) invocation.callRealMethod(); + instrumentWriter.accept(w); + return w; + }).when(activeLog).createNewWriter(); + + final CountDownLatch startLatch = new CountDownLatch(1); + final CountDownLatch doneLatch = new CountDownLatch(producerCount); + final AtomicLong totalProducerSyncs = new AtomicLong(0); + final AtomicLong totalProducerAppends = new AtomicLong(0); + final AtomicLong commitIdSeq = new AtomicLong(0); + ExecutorService pool = Executors.newFixedThreadPool(producerCount); + + try { + for (int p = 0; p < producerCount; p++) { + pool.submit(() -> { + try { + startLatch.await(); + for (int i = 0; i < syncsPerProducer; i++) { + for (int j = 0; j < appendsPerSync; j++) { + long commitId = commitIdSeq.getAndIncrement(); + Mutation put = LogFileTestUtil.newPut("row" + commitId, commitId, cellsPerMutation); + logGroup.append(tableName, commitId, put); + totalProducerAppends.incrementAndGet(); + } + logGroup.sync(); + totalProducerSyncs.incrementAndGet(); + } + } catch (Exception e) { + LOG.error("Producer failed", e); + } finally { + doneLatch.countDown(); + } + }); + } + + long startNs = System.nanoTime(); + startLatch.countDown(); + doneLatch.await(); + long elapsedMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs); + + // Count actual fsyncs invoked across all writers (rotation creates new ones mid-test). + int innerSyncCount = 0; + synchronized (allWriters) { + for (LogFileWriter w : allWriters) { + innerSyncCount += (int) Mockito.mockingDetails(w).getInvocations().stream() + .filter(inv -> "sync".equals(inv.getMethod().getName())).count(); + } + } + int writerCount = allWriters.size(); + long producerSyncs = totalProducerSyncs.get(); + double coalescingRatio = innerSyncCount == 0 ? 0.0 : (double) producerSyncs / innerSyncCount; + + LOG.info( + "Sync coalescing: producers={} syncsPerProducer={} appendsPerSync={} cellsPerMutation={} " + + "totalProducerAppends={} totalProducerSyncs={} innerSyncCalls={} writerCount={} " + + "ratio={} elapsedMs={} innerSyncDelayMs={}", + producerCount, syncsPerProducer, appendsPerSync, cellsPerMutation, + totalProducerAppends.get(), producerSyncs, innerSyncCount, writerCount, + String.format("%.2f", coalescingRatio), elapsedMs, innerSyncDelayMs); + ReplicationLogMetricValues metricValues = logGroup.getMetrics().getCurrentMetricValues(); + // syncTime, fsSyncTime histograms record in milliseconds; ringBufferTime and + // pendingSyncWaitTime record in nanoseconds. Compare the decomposition in nanoseconds to + // avoid ms-truncation losses. Each ms-stored value loses up to ~1ms of precision via floor + // truncation. fsSyncTime appears on the component side (its floor truncation makes + // componentSumNs undershoot), so a ~1ms tolerance is needed for the comparison to hold + // reliably at low absolute values. + long ringBufferTimeNs = metricValues.getRingBufferTimeMax(); + long pendingSyncWaitTimeNs = metricValues.getPendingSyncWaitTimeMax(); + long fsSyncTimeNs = TimeUnit.MILLISECONDS.toNanos(metricValues.getFsSyncTimeMax()); + long syncTimeNs = TimeUnit.MILLISECONDS.toNanos(metricValues.getSyncTimeMax()); + long componentSumNs = ringBufferTimeNs + pendingSyncWaitTimeNs + fsSyncTimeNs; + long truncationToleranceNs = TimeUnit.MILLISECONDS.toNanos(1); + LOG.info( + "Metrics snapshot: maxBatchSize={} maxPendingSyncCount={}" + + " syncTime[p50={}ms p99={}ms max={}ms]" + " ringBuffer[p50={}ns p99={}ns max={}ns]" + + " fsSync[p50={}ms p99={}ms max={}ms]" + " pendingSyncWait[p50={}ns p99={}ns max={}ns]" + + " maxComponentSumNs={} syncTimeWithinBound={}", + metricValues.getBatchSizeMax(), metricValues.getPendingSyncCountMax(), + metricValues.getSyncTimeP50(), metricValues.getSyncTimeP99(), metricValues.getSyncTimeMax(), + metricValues.getRingBufferTimeP50(), metricValues.getRingBufferTimeP99(), + metricValues.getRingBufferTimeMax(), metricValues.getFsSyncTimeP50(), + metricValues.getFsSyncTimeP99(), metricValues.getFsSyncTimeMax(), + metricValues.getPendingSyncWaitTimeP50(), metricValues.getPendingSyncWaitTimeP99(), + metricValues.getPendingSyncWaitTimeMax(), componentSumNs, + syncTimeNs <= componentSumNs + truncationToleranceNs); + } finally { + pool.shutdownNow(); + pool.awaitTermination(5, TimeUnit.SECONDS); + } + } } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImplTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImplTest.java new file mode 100644 index 0000000000..a53c54280e --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImplTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication.metrics; + +import static org.junit.Assert.assertEquals; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Unit tests for {@link MetricsReplicationLogGroupSourceImpl}, focused on verifying that the + * histogram unit conventions documented in {@link MetricsReplicationLogGroupSource} are honored: + * appendTime and pendingSyncWaitTime record nanoseconds verbatim; syncTime, fsSyncTime, + * ringBufferTime, and rotationTime convert input nanoseconds to milliseconds before recording. + */ +public class MetricsReplicationLogGroupSourceImplTest { + + private MetricsReplicationLogGroupSourceImpl source; + + @Before + public void setUp() { + source = new MetricsReplicationLogGroupSourceImpl("testHaGroup"); + } + + @After + public void tearDown() { + source.close(); + } + + @Test + public void testNanosecondHistogramsRecordInputVerbatim() { + source.updateAppendTime(1500L); + assertEquals(1500L, source.getCurrentMetricValues().getAppendTimeMax()); + + source.updatePendingSyncWaitTime(2_500_000L); + assertEquals(2_500_000L, source.getCurrentMetricValues().getPendingSyncWaitTimeMax()); + + source.updateRingBufferTime(7_500_000L); + assertEquals(7_500_000L, source.getCurrentMetricValues().getRingBufferTimeMax()); + } + + @Test + public void testMillisecondHistogramsConvertNsToMs() { + source.updateSyncTime(5_000_000L); + assertEquals(5L, source.getCurrentMetricValues().getSyncTimeMax()); + + source.updateFsSyncTime(1_000_000L); + assertEquals(1L, source.getCurrentMetricValues().getFsSyncTimeMax()); + + source.updateRotationTime(35_000_000L); + assertEquals(35L, source.getCurrentMetricValues().getRotationTimeMax()); + } + + @Test + public void testSubMillisecondInputTruncatesToZero() { + source.updateSyncTime(500_000L); + assertEquals(0L, source.getCurrentMetricValues().getSyncTimeMax()); + } + + @Test + public void testHistogramReportsMaxAcrossSamples() { + source.updateSyncTime(2_000_000L); + source.updateSyncTime(10_000_000L); + source.updateSyncTime(5_000_000L); + assertEquals(10L, source.getCurrentMetricValues().getSyncTimeMax()); + } + + @Test + public void testCounters() { + source.incrementRotationCount(); + source.incrementRotationCount(); + source.incrementRotationFailureCount(); + source.incrementSyncToSafTransitions(); + + ReplicationLogMetricValues v = source.getCurrentMetricValues(); + assertEquals(2L, v.getRotationCount()); + assertEquals(1L, v.getRotationFailuresCount()); + assertEquals(1L, v.getSyncToSafTransitions()); + } + + @Test + public void testBatchSizeAndPendingSyncCount() { + source.updateBatchSize(100L); + source.updatePendingSyncCount(50L); + ReplicationLogMetricValues v = source.getCurrentMetricValues(); + assertEquals(100L, v.getBatchSizeMax()); + assertEquals(50L, v.getPendingSyncCountMax()); + } +}
