This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/2.0.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8ba8b9cb41c77c05c4f680fd7bc64a50cdabc49a Author: Zhenyu Luo <[email protected]> AuthorDate: Thu Jul 31 18:17:55 2025 +0800 Pipe: Modify Sink batch event length related metrics (#16066) (cherry picked from commit f4813087f1ea949f1a0c8710f5b3f6d437534a4b) --- .../agent/task/subtask/sink/PipeSinkSubtask.java | 6 ++++ .../metric/sink/PipeDataRegionSinkMetrics.java | 34 +++++++++------------- .../evolvable/batch/PipeTabletEventBatch.java | 14 +++++---- .../evolvable/batch/PipeTabletEventPlainBatch.java | 3 +- .../batch/PipeTabletEventTsFileBatch.java | 3 +- .../batch/PipeTransferBatchReqBuilder.java | 14 +++++++-- .../thrift/async/IoTDBDataRegionAsyncSink.java | 7 +++++ .../thrift/sync/IoTDBDataRegionSyncSink.java | 7 +++++ .../commons/pipe/sink/protocol/IoTDBSink.java | 4 +++ 9 files changed, 61 insertions(+), 31 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java index ffd484b1306..9d01abf24c1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java @@ -364,6 +364,12 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask { } } + public void setEventSizeHistogram(Histogram eventSizeHistogram) { + if (outputPipeConnector instanceof IoTDBSink) { + ((IoTDBSink) outputPipeConnector).setBatchEventSizeHistogram(eventSizeHistogram); + } + } + //////////////////////////// Error report //////////////////////////// @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java index e1ddc368a5f..23024424b92 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java @@ -135,17 +135,6 @@ public class PipeDataRegionSinkMetrics implements IMetricSet { Tag.CREATION_TIME.toString(), String.valueOf(connector.getCreationTime())); // Metrics related to IoTDB connector - metricService.createAutoGauge( - Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(), - MetricLevel.IMPORTANT, - connector, - PipeSinkSubtask::getBatchSize, - Tag.NAME.toString(), - connector.getAttributeSortedString(), - Tag.INDEX.toString(), - String.valueOf(connector.getConnectorIndex()), - Tag.CREATION_TIME.toString(), - String.valueOf(connector.getCreationTime())); metricService.createAutoGauge( Metric.PIPE_TOTAL_UNCOMPRESSED_SIZE.toString(), MetricLevel.IMPORTANT, @@ -263,6 +252,14 @@ public class PipeDataRegionSinkMetrics implements IMetricSet { Tag.CREATION_TIME.toString(), String.valueOf(connector.getCreationTime())); connector.setTsFileBatchTimeIntervalHistogram(tsFileBatchTimeIntervalHistogram); + + Histogram eventSizeHistogram = + metricService.getOrCreateHistogram( + Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + connector.getAttributeSortedString()); + connector.setEventSizeHistogram(eventSizeHistogram); } @Override @@ -334,15 +331,6 @@ public class PipeDataRegionSinkMetrics implements IMetricSet { Tag.CREATION_TIME.toString(), String.valueOf(connector.getCreationTime())); // Metrics related to IoTDB connector - metricService.remove( - MetricType.AUTO_GAUGE, - Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(), - Tag.NAME.toString(), - connector.getAttributeSortedString(), - Tag.INDEX.toString(), - String.valueOf(connector.getConnectorIndex()), - Tag.CREATION_TIME.toString(), - String.valueOf(connector.getCreationTime())); metricService.remove( MetricType.AUTO_GAUGE, Metric.PIPE_TOTAL_UNCOMPRESSED_SIZE.toString(), @@ -440,6 +428,12 @@ public class PipeDataRegionSinkMetrics implements IMetricSet { connector.getAttributeSortedString(), Tag.CREATION_TIME.toString(), String.valueOf(connector.getCreationTime())); + + metricService.remove( + MetricType.HISTOGRAM, + Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(), + Tag.NAME.toString(), + connector.getAttributeSortedString()); } //////////////////////////// register & deregister (pipe integration) //////////////////////////// diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java index 58316f4c816..f66fb32cf5e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java @@ -36,7 +36,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Objects; -import java.util.function.BiConsumer; public abstract class PipeTabletEventBatch implements AutoCloseable { @@ -44,7 +43,7 @@ public abstract class PipeTabletEventBatch implements AutoCloseable { private static PipeModelFixedMemoryBlock pipeModelFixedMemoryBlock = null; protected final List<EnrichedEvent> events = new ArrayList<>(); - protected final BiConsumer<Long, Long> recordMetric; + protected final TriLongConsumer recordMetric; private final int maxDelayInMs; private long firstEventProcessingTime = Long.MIN_VALUE; @@ -57,7 +56,7 @@ public abstract class PipeTabletEventBatch implements AutoCloseable { protected PipeTabletEventBatch( final int maxDelayInMs, final long requestMaxBatchSizeInBytes, - final BiConsumer<Long, Long> recordMetric) { + final TriLongConsumer recordMetric) { if (pipeModelFixedMemoryBlock == null) { init(); } @@ -67,7 +66,7 @@ public abstract class PipeTabletEventBatch implements AutoCloseable { this.recordMetric = recordMetric; } else { this.recordMetric = - (timeInterval, bufferSize) -> { + (timeInterval, bufferSize, events) -> { // do nothing }; } @@ -142,7 +141,7 @@ public abstract class PipeTabletEventBatch implements AutoCloseable { final long diff = System.currentTimeMillis() - firstEventProcessingTime; if (totalBufferSize >= getMaxBatchSizeInBytes() || diff >= maxDelayInMs) { allocatedMemoryBlock.updateCurrentMemoryEfficiencyAdjustMem((double) diff / maxDelayInMs); - recordMetric.accept(diff, totalBufferSize); + recordMetric.accept(diff, totalBufferSize, events.size()); return true; } return false; @@ -235,4 +234,9 @@ public abstract class PipeTabletEventBatch implements AutoCloseable { .forceAllocateForModelFixedMemoryBlock(0, PipeMemoryBlockType.BATCH); } } + + @FunctionalInterface + public interface TriLongConsumer { + void accept(long l1, long l2, long l3); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java index 9e3155bb9e8..31b10736499 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java @@ -39,7 +39,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.function.BiConsumer; public class PipeTabletEventPlainBatch extends PipeTabletEventBatch { @@ -62,7 +61,7 @@ public class PipeTabletEventPlainBatch extends PipeTabletEventBatch { PipeTabletEventPlainBatch( final int maxDelayInMs, final long requestMaxBatchSizeInBytes, - final BiConsumer<Long, Long> recordMetric) { + final TriLongConsumer recordMetric) { super(maxDelayInMs, requestMaxBatchSizeInBytes, recordMetric); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java index cb8c25ef755..275bc694397 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java @@ -44,7 +44,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BiConsumer; public class PipeTabletEventTsFileBatch extends PipeTabletEventBatch { @@ -69,7 +68,7 @@ public class PipeTabletEventTsFileBatch extends PipeTabletEventBatch { public PipeTabletEventTsFileBatch( final int maxDelayInMs, final long requestMaxBatchSizeInBytes, - final BiConsumer<Long, Long> recordMetric) { + final TriLongConsumer recordMetric) { super(maxDelayInMs, requestMaxBatchSizeInBytes, recordMetric); final AtomicLong tsFileIdGenerator = new AtomicLong(0); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java index 9c4b74a6800..dd4d4fe1ce6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java @@ -75,6 +75,8 @@ public class PipeTransferBatchReqBuilder implements AutoCloseable { private Histogram tabletBatchTimeIntervalHistogram = new DoNothingHistogram(); private Histogram tsFileBatchTimeIntervalHistogram = new DoNothingHistogram(); + private Histogram eventSizeHistogram = new DoNothingHistogram(); + // If the leader cache is disabled (or unable to find the endpoint of event in the leader cache), // the event will be stored in the default batch. private final PipeTabletEventBatch defaultBatch; @@ -220,14 +222,16 @@ public class PipeTransferBatchReqBuilder implements AutoCloseable { endPointToBatch.values().forEach(PipeTabletEventPlainBatch::close); } - public void recordTabletMetric(long timeInterval, long bufferSize) { + public void recordTabletMetric(long timeInterval, long bufferSize, long eventSize) { this.tabletBatchTimeIntervalHistogram.update(timeInterval); this.tabletBatchSizeHistogram.update(bufferSize); + this.eventSizeHistogram.update(eventSize); } - public void recordTsFileMetric(long timeInterval, long bufferSize) { + public void recordTsFileMetric(long timeInterval, long bufferSize, long eventSize) { this.tsFileBatchTimeIntervalHistogram.update(timeInterval); this.tsFileBatchSizeHistogram.update(bufferSize); + this.eventSizeHistogram.update(eventSize); } public void setTabletBatchSizeHistogram(Histogram tabletBatchSizeHistogram) { @@ -253,4 +257,10 @@ public class PipeTransferBatchReqBuilder implements AutoCloseable { this.tsFileBatchTimeIntervalHistogram = tsFileBatchTimeIntervalHistogram; } } + + public void setEventSizeHistogram(Histogram eventSizeHistogram) { + if (eventSizeHistogram != null) { + this.eventSizeHistogram = eventSizeHistogram; + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java index 38ac7e0201a..21fb0c9a730 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java @@ -861,4 +861,11 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { tabletBatchBuilder.setTsFileBatchTimeIntervalHistogram(tsFileBatchTimeIntervalHistogram); } } + + @Override + public void setBatchEventSizeHistogram(Histogram eventSizeHistogram) { + if (tabletBatchBuilder != null) { + tabletBatchBuilder.setEventSizeHistogram(eventSizeHistogram); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java index c9185b44d44..b3bded4a2bd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java @@ -655,4 +655,11 @@ public class IoTDBDataRegionSyncSink extends IoTDBDataNodeSyncSink { tabletBatchBuilder.setTsFileBatchTimeIntervalHistogram(tsFileBatchTimeIntervalHistogram); } } + + @Override + public void setBatchEventSizeHistogram(Histogram eventSizeHistogram) { + if (tabletBatchBuilder != null) { + tabletBatchBuilder.setEventSizeHistogram(eventSizeHistogram); + } + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java index 38f3503b66f..efee7c9c311 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java @@ -648,4 +648,8 @@ public abstract class IoTDBSink implements PipeConnector { public void setTsFileBatchTimeIntervalHistogram(Histogram tsFileBatchTimeIntervalHistogram) { // do nothing by default } + + public void setBatchEventSizeHistogram(Histogram tsFileBatchTimeIntervalHistogram) { + // do nothing by default + } }
