This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rc/2.0.5
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8ba8b9cb41c77c05c4f680fd7bc64a50cdabc49a
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu Jul 31 18:17:55 2025 +0800

    Pipe: Modify Sink batch event length related metrics (#16066)
    
    (cherry picked from commit f4813087f1ea949f1a0c8710f5b3f6d437534a4b)
---
 .../agent/task/subtask/sink/PipeSinkSubtask.java   |  6 ++++
 .../metric/sink/PipeDataRegionSinkMetrics.java     | 34 +++++++++-------------
 .../evolvable/batch/PipeTabletEventBatch.java      | 14 +++++----
 .../evolvable/batch/PipeTabletEventPlainBatch.java |  3 +-
 .../batch/PipeTabletEventTsFileBatch.java          |  3 +-
 .../batch/PipeTransferBatchReqBuilder.java         | 14 +++++++--
 .../thrift/async/IoTDBDataRegionAsyncSink.java     |  7 +++++
 .../thrift/sync/IoTDBDataRegionSyncSink.java       |  7 +++++
 .../commons/pipe/sink/protocol/IoTDBSink.java      |  4 +++
 9 files changed, 61 insertions(+), 31 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index ffd484b1306..9d01abf24c1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -364,6 +364,12 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
     }
   }
 
+  public void setEventSizeHistogram(Histogram eventSizeHistogram) {
+    if (outputPipeConnector instanceof IoTDBSink) {
+      ((IoTDBSink) 
outputPipeConnector).setBatchEventSizeHistogram(eventSizeHistogram);
+    }
+  }
+
   //////////////////////////// Error report ////////////////////////////
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
index e1ddc368a5f..23024424b92 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
@@ -135,17 +135,6 @@ public class PipeDataRegionSinkMetrics implements 
IMetricSet {
         Tag.CREATION_TIME.toString(),
         String.valueOf(connector.getCreationTime()));
     // Metrics related to IoTDB connector
-    metricService.createAutoGauge(
-        Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(),
-        MetricLevel.IMPORTANT,
-        connector,
-        PipeSinkSubtask::getBatchSize,
-        Tag.NAME.toString(),
-        connector.getAttributeSortedString(),
-        Tag.INDEX.toString(),
-        String.valueOf(connector.getConnectorIndex()),
-        Tag.CREATION_TIME.toString(),
-        String.valueOf(connector.getCreationTime()));
     metricService.createAutoGauge(
         Metric.PIPE_TOTAL_UNCOMPRESSED_SIZE.toString(),
         MetricLevel.IMPORTANT,
@@ -263,6 +252,14 @@ public class PipeDataRegionSinkMetrics implements 
IMetricSet {
             Tag.CREATION_TIME.toString(),
             String.valueOf(connector.getCreationTime()));
     
connector.setTsFileBatchTimeIntervalHistogram(tsFileBatchTimeIntervalHistogram);
+
+    Histogram eventSizeHistogram =
+        metricService.getOrCreateHistogram(
+            Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            connector.getAttributeSortedString());
+    connector.setEventSizeHistogram(eventSizeHistogram);
   }
 
   @Override
@@ -334,15 +331,6 @@ public class PipeDataRegionSinkMetrics implements 
IMetricSet {
         Tag.CREATION_TIME.toString(),
         String.valueOf(connector.getCreationTime()));
     // Metrics related to IoTDB connector
-    metricService.remove(
-        MetricType.AUTO_GAUGE,
-        Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(),
-        Tag.NAME.toString(),
-        connector.getAttributeSortedString(),
-        Tag.INDEX.toString(),
-        String.valueOf(connector.getConnectorIndex()),
-        Tag.CREATION_TIME.toString(),
-        String.valueOf(connector.getCreationTime()));
     metricService.remove(
         MetricType.AUTO_GAUGE,
         Metric.PIPE_TOTAL_UNCOMPRESSED_SIZE.toString(),
@@ -440,6 +428,12 @@ public class PipeDataRegionSinkMetrics implements 
IMetricSet {
         connector.getAttributeSortedString(),
         Tag.CREATION_TIME.toString(),
         String.valueOf(connector.getCreationTime()));
+
+    metricService.remove(
+        MetricType.HISTOGRAM,
+        Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(),
+        Tag.NAME.toString(),
+        connector.getAttributeSortedString());
   }
 
   //////////////////////////// register & deregister (pipe integration) 
////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
index 58316f4c816..f66fb32cf5e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
@@ -36,7 +36,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
-import java.util.function.BiConsumer;
 
 public abstract class PipeTabletEventBatch implements AutoCloseable {
 
@@ -44,7 +43,7 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
   private static PipeModelFixedMemoryBlock pipeModelFixedMemoryBlock = null;
 
   protected final List<EnrichedEvent> events = new ArrayList<>();
-  protected final BiConsumer<Long, Long> recordMetric;
+  protected final TriLongConsumer recordMetric;
 
   private final int maxDelayInMs;
   private long firstEventProcessingTime = Long.MIN_VALUE;
@@ -57,7 +56,7 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
   protected PipeTabletEventBatch(
       final int maxDelayInMs,
       final long requestMaxBatchSizeInBytes,
-      final BiConsumer<Long, Long> recordMetric) {
+      final TriLongConsumer recordMetric) {
     if (pipeModelFixedMemoryBlock == null) {
       init();
     }
@@ -67,7 +66,7 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
       this.recordMetric = recordMetric;
     } else {
       this.recordMetric =
-          (timeInterval, bufferSize) -> {
+          (timeInterval, bufferSize, events) -> {
             // do nothing
           };
     }
@@ -142,7 +141,7 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
     final long diff = System.currentTimeMillis() - firstEventProcessingTime;
     if (totalBufferSize >= getMaxBatchSizeInBytes() || diff >= maxDelayInMs) {
       allocatedMemoryBlock.updateCurrentMemoryEfficiencyAdjustMem((double) 
diff / maxDelayInMs);
-      recordMetric.accept(diff, totalBufferSize);
+      recordMetric.accept(diff, totalBufferSize, events.size());
       return true;
     }
     return false;
@@ -235,4 +234,9 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
               .forceAllocateForModelFixedMemoryBlock(0, 
PipeMemoryBlockType.BATCH);
     }
   }
+
+  @FunctionalInterface
+  public interface TriLongConsumer {
+    void accept(long l1, long l2, long l3);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
index 9e3155bb9e8..31b10736499 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
@@ -39,7 +39,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.function.BiConsumer;
 
 public class PipeTabletEventPlainBatch extends PipeTabletEventBatch {
 
@@ -62,7 +61,7 @@ public class PipeTabletEventPlainBatch extends 
PipeTabletEventBatch {
   PipeTabletEventPlainBatch(
       final int maxDelayInMs,
       final long requestMaxBatchSizeInBytes,
-      final BiConsumer<Long, Long> recordMetric) {
+      final TriLongConsumer recordMetric) {
     super(maxDelayInMs, requestMaxBatchSizeInBytes, recordMetric);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
index cb8c25ef755..275bc694397 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
@@ -44,7 +44,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.BiConsumer;
 
 public class PipeTabletEventTsFileBatch extends PipeTabletEventBatch {
 
@@ -69,7 +68,7 @@ public class PipeTabletEventTsFileBatch extends 
PipeTabletEventBatch {
   public PipeTabletEventTsFileBatch(
       final int maxDelayInMs,
       final long requestMaxBatchSizeInBytes,
-      final BiConsumer<Long, Long> recordMetric) {
+      final TriLongConsumer recordMetric) {
     super(maxDelayInMs, requestMaxBatchSizeInBytes, recordMetric);
 
     final AtomicLong tsFileIdGenerator = new AtomicLong(0);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
index 9c4b74a6800..dd4d4fe1ce6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
@@ -75,6 +75,8 @@ public class PipeTransferBatchReqBuilder implements 
AutoCloseable {
   private Histogram tabletBatchTimeIntervalHistogram = new 
DoNothingHistogram();
   private Histogram tsFileBatchTimeIntervalHistogram = new 
DoNothingHistogram();
 
+  private Histogram eventSizeHistogram = new DoNothingHistogram();
+
   // If the leader cache is disabled (or unable to find the endpoint of event 
in the leader cache),
   // the event will be stored in the default batch.
   private final PipeTabletEventBatch defaultBatch;
@@ -220,14 +222,16 @@ public class PipeTransferBatchReqBuilder implements 
AutoCloseable {
     endPointToBatch.values().forEach(PipeTabletEventPlainBatch::close);
   }
 
-  public void recordTabletMetric(long timeInterval, long bufferSize) {
+  public void recordTabletMetric(long timeInterval, long bufferSize, long 
eventSize) {
     this.tabletBatchTimeIntervalHistogram.update(timeInterval);
     this.tabletBatchSizeHistogram.update(bufferSize);
+    this.eventSizeHistogram.update(eventSize);
   }
 
-  public void recordTsFileMetric(long timeInterval, long bufferSize) {
+  public void recordTsFileMetric(long timeInterval, long bufferSize, long 
eventSize) {
     this.tsFileBatchTimeIntervalHistogram.update(timeInterval);
     this.tsFileBatchSizeHistogram.update(bufferSize);
+    this.eventSizeHistogram.update(eventSize);
   }
 
   public void setTabletBatchSizeHistogram(Histogram tabletBatchSizeHistogram) {
@@ -253,4 +257,10 @@ public class PipeTransferBatchReqBuilder implements 
AutoCloseable {
       this.tsFileBatchTimeIntervalHistogram = tsFileBatchTimeIntervalHistogram;
     }
   }
+
+  public void setEventSizeHistogram(Histogram eventSizeHistogram) {
+    if (eventSizeHistogram != null) {
+      this.eventSizeHistogram = eventSizeHistogram;
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index 38ac7e0201a..21fb0c9a730 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -861,4 +861,11 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
       
tabletBatchBuilder.setTsFileBatchTimeIntervalHistogram(tsFileBatchTimeIntervalHistogram);
     }
   }
+
+  @Override
+  public void setBatchEventSizeHistogram(Histogram eventSizeHistogram) {
+    if (tabletBatchBuilder != null) {
+      tabletBatchBuilder.setEventSizeHistogram(eventSizeHistogram);
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
index c9185b44d44..b3bded4a2bd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
@@ -655,4 +655,11 @@ public class IoTDBDataRegionSyncSink extends 
IoTDBDataNodeSyncSink {
       
tabletBatchBuilder.setTsFileBatchTimeIntervalHistogram(tsFileBatchTimeIntervalHistogram);
     }
   }
+
+  @Override
+  public void setBatchEventSizeHistogram(Histogram eventSizeHistogram) {
+    if (tabletBatchBuilder != null) {
+      tabletBatchBuilder.setEventSizeHistogram(eventSizeHistogram);
+    }
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
index 38f3503b66f..efee7c9c311 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
@@ -648,4 +648,8 @@ public abstract class IoTDBSink implements PipeConnector {
   public void setTsFileBatchTimeIntervalHistogram(Histogram 
tsFileBatchTimeIntervalHistogram) {
     // do nothing by default
   }
+
+  public void setBatchEventSizeHistogram(Histogram 
tsFileBatchTimeIntervalHistogram) {
+    // do nothing by default
+  }
 }

Reply via email to