This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rc/2.0.5
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e1162bede4206c8d212344d93b83c033387cbf72
Author: Zhenyu Luo <[email protected]>
AuthorDate: Tue Jul 29 15:51:57 2025 +0800

    Pipe: Modify Sink Batch Metrics (#16018)
    
    * Pipe: Modify Sink Batch Metrics
    
    * update
    
    * update
    
    (cherry picked from commit 226fc6fd684834dd1797fcd83fa732da1472fb79)
---
 .../agent/task/subtask/sink/PipeSinkSubtask.java   |  27 +++++
 .../metric/sink/PipeDataRegionSinkMetrics.java     | 111 +++++++++++++++------
 .../evolvable/batch/PipeTabletEventBatch.java      |  19 +++-
 .../evolvable/batch/PipeTabletEventPlainBatch.java |  17 ++--
 .../batch/PipeTabletEventTsFileBatch.java          |  21 ++--
 .../batch/PipeTransferBatchReqBuilder.java         |  51 +++++++++-
 .../thrift/async/IoTDBDataRegionAsyncSink.java     |  29 ++++++
 .../thrift/sync/IoTDBDataRegionSyncSink.java       |  29 ++++++
 .../commons/pipe/sink/protocol/IoTDBSink.java      |  17 ++++
 9 files changed, 265 insertions(+), 56 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index 4b98ae82d81..ffd484b1306 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -34,6 +34,7 @@ import 
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncS
 import 
org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.utils.ErrorHandlingUtils;
+import org.apache.iotdb.metrics.type.Histogram;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -337,6 +338,32 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
         : 0;
   }
 
+  public void setTabletBatchSizeHistogram(Histogram tabletBatchSizeHistogram) {
+    if (outputPipeConnector instanceof IoTDBSink) {
+      ((IoTDBSink) 
outputPipeConnector).setTabletBatchSizeHistogram(tabletBatchSizeHistogram);
+    }
+  }
+
+  public void setTsFileBatchSizeHistogram(Histogram tsFileBatchSizeHistogram) {
+    if (outputPipeConnector instanceof IoTDBSink) {
+      ((IoTDBSink) 
outputPipeConnector).setTsFileBatchSizeHistogram(tsFileBatchSizeHistogram);
+    }
+  }
+
+  public void setTabletBatchTimeIntervalHistogram(Histogram 
tabletBatchTimeIntervalHistogram) {
+    if (outputPipeConnector instanceof IoTDBSink) {
+      ((IoTDBSink) outputPipeConnector)
+          
.setTabletBatchTimeIntervalHistogram(tabletBatchTimeIntervalHistogram);
+    }
+  }
+
+  public void setTsFileBatchTimeIntervalHistogram(Histogram 
tsFileBatchTimeIntervalHistogram) {
+    if (outputPipeConnector instanceof IoTDBSink) {
+      ((IoTDBSink) outputPipeConnector)
+          
.setTsFileBatchTimeIntervalHistogram(tsFileBatchTimeIntervalHistogram);
+    }
+  }
+
   //////////////////////////// Error report ////////////////////////////
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
index 52a8828b2ee..e1ddc368a5f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.commons.service.metric.enums.Metric;
 import org.apache.iotdb.commons.service.metric.enums.Tag;
 import org.apache.iotdb.db.pipe.agent.task.subtask.sink.PipeSinkSubtask;
 import org.apache.iotdb.metrics.AbstractMetricService;
-import org.apache.iotdb.metrics.impl.DoNothingHistogram;
 import org.apache.iotdb.metrics.metricsets.IMetricSet;
 import org.apache.iotdb.metrics.type.Histogram;
 import org.apache.iotdb.metrics.type.Rate;
@@ -45,14 +44,6 @@ public class PipeDataRegionSinkMetrics implements IMetricSet 
{
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeDataRegionSinkMetrics.class);
 
-  public static Histogram tabletBatchSizeHistogram = new DoNothingHistogram();
-
-  public static Histogram tsFileBatchSizeHistogram = new DoNothingHistogram();
-
-  public static Histogram tabletBatchTimeIntervalHistogram = new 
DoNothingHistogram();
-
-  public static Histogram tsFileBatchTimeIntervalHistogram = new 
DoNothingHistogram();
-
   @SuppressWarnings("java:S3077")
   private volatile AbstractMetricService metricService;
 
@@ -75,28 +66,13 @@ public class PipeDataRegionSinkMetrics implements 
IMetricSet {
     for (String taskID : taskIDs) {
       createMetrics(taskID);
     }
-
-    tabletBatchSizeHistogram =
-        metricService.getOrCreateHistogram(
-            Metric.PIPE_INSERT_NODE_BATCH_SIZE.toString(), 
MetricLevel.IMPORTANT);
-
-    tsFileBatchSizeHistogram =
-        metricService.getOrCreateHistogram(
-            Metric.PIPE_TSFILE_BATCH_SIZE.toString(), MetricLevel.IMPORTANT);
-
-    tabletBatchTimeIntervalHistogram =
-        metricService.getOrCreateHistogram(
-            Metric.PIPE_INSERT_NODE_BATCH_TIME_COST.toString(), 
MetricLevel.IMPORTANT);
-
-    tsFileBatchTimeIntervalHistogram =
-        metricService.getOrCreateHistogram(
-            Metric.PIPE_TSFILE_BATCH_TIME_COST.toString(), 
MetricLevel.IMPORTANT);
   }
 
   private void createMetrics(final String taskID) {
     createAutoGauge(taskID);
     createRate(taskID);
     createTimer(taskID);
+    createHistogram(taskID);
   }
 
   private void createAutoGauge(final String taskID) {
@@ -245,6 +221,50 @@ public class PipeDataRegionSinkMetrics implements 
IMetricSet {
             String.valueOf(connector.getCreationTime())));
   }
 
+  private void createHistogram(final String taskID) {
+    final PipeSinkSubtask connector = connectorMap.get(taskID);
+
+    final Histogram tabletBatchSizeHistogram =
+        metricService.getOrCreateHistogram(
+            Metric.PIPE_INSERT_NODE_BATCH_SIZE.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            connector.getAttributeSortedString(),
+            Tag.CREATION_TIME.toString(),
+            String.valueOf(connector.getCreationTime()));
+    connector.setTabletBatchSizeHistogram(tabletBatchSizeHistogram);
+
+    final Histogram tsFileBatchSizeHistogram =
+        metricService.getOrCreateHistogram(
+            Metric.PIPE_TSFILE_BATCH_SIZE.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            connector.getAttributeSortedString(),
+            Tag.CREATION_TIME.toString(),
+            String.valueOf(connector.getCreationTime()));
+    connector.setTsFileBatchSizeHistogram(tsFileBatchSizeHistogram);
+
+    final Histogram tabletBatchTimeIntervalHistogram =
+        metricService.getOrCreateHistogram(
+            Metric.PIPE_INSERT_NODE_BATCH_TIME_COST.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            connector.getAttributeSortedString(),
+            Tag.CREATION_TIME.toString(),
+            String.valueOf(connector.getCreationTime()));
+    
connector.setTabletBatchTimeIntervalHistogram(tabletBatchTimeIntervalHistogram);
+
+    final Histogram tsFileBatchTimeIntervalHistogram =
+        metricService.getOrCreateHistogram(
+            Metric.PIPE_TSFILE_BATCH_TIME_COST.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            connector.getAttributeSortedString(),
+            Tag.CREATION_TIME.toString(),
+            String.valueOf(connector.getCreationTime()));
+    
connector.setTsFileBatchTimeIntervalHistogram(tsFileBatchTimeIntervalHistogram);
+  }
+
   @Override
   public void unbindFrom(final AbstractMetricService metricService) {
     final ImmutableSet<String> taskIDs = 
ImmutableSet.copyOf(connectorMap.keySet());
@@ -255,20 +275,13 @@ public class PipeDataRegionSinkMetrics implements 
IMetricSet {
       LOGGER.warn(
           "Failed to unbind from pipe data region connector metrics, connector 
map not empty");
     }
-
-    metricService.remove(MetricType.HISTOGRAM, 
Metric.PIPE_INSERT_NODE_BATCH_SIZE.toString());
-
-    metricService.remove(MetricType.HISTOGRAM, 
Metric.PIPE_TSFILE_BATCH_SIZE.toString());
-
-    metricService.remove(MetricType.HISTOGRAM, 
Metric.PIPE_INSERT_NODE_BATCH_TIME_COST.toString());
-
-    metricService.remove(MetricType.HISTOGRAM, 
Metric.PIPE_TSFILE_BATCH_TIME_COST.toString());
   }
 
   private void removeMetrics(final String taskID) {
     removeAutoGauge(taskID);
     removeRate(taskID);
     removeTimer(taskID);
+    removeHistogram(taskID);
   }
 
   private void removeAutoGauge(final String taskID) {
@@ -397,6 +410,38 @@ public class PipeDataRegionSinkMetrics implements 
IMetricSet {
     compressionTimerMap.remove(connector.getAttributeSortedString());
   }
 
+  private void removeHistogram(final String taskID) {
+    final PipeSinkSubtask connector = connectorMap.get(taskID);
+    metricService.remove(
+        MetricType.HISTOGRAM,
+        Metric.PIPE_INSERT_NODE_BATCH_SIZE.toString(),
+        Tag.NAME.toString(),
+        connector.getAttributeSortedString(),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(connector.getCreationTime()));
+    metricService.remove(
+        MetricType.HISTOGRAM,
+        Metric.PIPE_TSFILE_BATCH_SIZE.toString(),
+        Tag.NAME.toString(),
+        connector.getAttributeSortedString(),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(connector.getCreationTime()));
+    metricService.remove(
+        MetricType.HISTOGRAM,
+        Metric.PIPE_INSERT_NODE_BATCH_TIME_COST.toString(),
+        Tag.NAME.toString(),
+        connector.getAttributeSortedString(),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(connector.getCreationTime()));
+    metricService.remove(
+        MetricType.HISTOGRAM,
+        Metric.PIPE_TSFILE_BATCH_TIME_COST.toString(),
+        Tag.NAME.toString(),
+        connector.getAttributeSortedString(),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(connector.getCreationTime()));
+  }
+
   //////////////////////////// register & deregister (pipe integration) 
////////////////////////////
 
   public void register(@NonNull final PipeSinkSubtask pipeSinkSubtask) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
index c7622c4a95f..58316f4c816 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
@@ -36,6 +36,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import java.util.function.BiConsumer;
 
 public abstract class PipeTabletEventBatch implements AutoCloseable {
 
@@ -43,6 +44,7 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
   private static PipeModelFixedMemoryBlock pipeModelFixedMemoryBlock = null;
 
   protected final List<EnrichedEvent> events = new ArrayList<>();
+  protected final BiConsumer<Long, Long> recordMetric;
 
   private final int maxDelayInMs;
   private long firstEventProcessingTime = Long.MIN_VALUE;
@@ -52,12 +54,23 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
 
   protected volatile boolean isClosed = false;
 
-  protected PipeTabletEventBatch(final int maxDelayInMs, final long 
requestMaxBatchSizeInBytes) {
+  protected PipeTabletEventBatch(
+      final int maxDelayInMs,
+      final long requestMaxBatchSizeInBytes,
+      final BiConsumer<Long, Long> recordMetric) {
     if (pipeModelFixedMemoryBlock == null) {
       init();
     }
 
     this.maxDelayInMs = maxDelayInMs;
+    if (recordMetric != null) {
+      this.recordMetric = recordMetric;
+    } else {
+      this.recordMetric =
+          (timeInterval, bufferSize) -> {
+            // do nothing
+          };
+    }
 
     // limit in buffer size
     this.allocatedMemoryBlock =
@@ -129,14 +142,12 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
     final long diff = System.currentTimeMillis() - firstEventProcessingTime;
     if (totalBufferSize >= getMaxBatchSizeInBytes() || diff >= maxDelayInMs) {
       allocatedMemoryBlock.updateCurrentMemoryEfficiencyAdjustMem((double) 
diff / maxDelayInMs);
-      recordMetric(diff, totalBufferSize);
+      recordMetric.accept(diff, totalBufferSize);
       return true;
     }
     return false;
   }
 
-  protected abstract void recordMetric(final long timeInterval, final long 
bufferSize);
-
   private long getMaxBatchSizeInBytes() {
     return allocatedMemoryBlock.getMemoryUsageInBytes();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
index c637b2bff42..9e3155bb9e8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.db.pipe.sink.payload.evolvable.batch;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
-import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionSinkMetrics;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletBatchReqV2;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
@@ -40,6 +39,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.function.BiConsumer;
 
 public class PipeTabletEventPlainBatch extends PipeTabletEventBatch {
 
@@ -56,7 +56,14 @@ public class PipeTabletEventPlainBatch extends 
PipeTabletEventBatch {
   private final Map<Pair<String, Long>, Long> pipe2BytesAccumulated = new 
HashMap<>();
 
   PipeTabletEventPlainBatch(final int maxDelayInMs, final long 
requestMaxBatchSizeInBytes) {
-    super(maxDelayInMs, requestMaxBatchSizeInBytes);
+    super(maxDelayInMs, requestMaxBatchSizeInBytes, null);
+  }
+
+  PipeTabletEventPlainBatch(
+      final int maxDelayInMs,
+      final long requestMaxBatchSizeInBytes,
+      final BiConsumer<Long, Long> recordMetric) {
+    super(maxDelayInMs, requestMaxBatchSizeInBytes, recordMetric);
   }
 
   @Override
@@ -72,12 +79,6 @@ public class PipeTabletEventPlainBatch extends 
PipeTabletEventBatch {
     return true;
   }
 
-  @Override
-  protected void recordMetric(long timeInterval, long bufferSize) {
-    
PipeDataRegionSinkMetrics.tabletBatchTimeIntervalHistogram.update(timeInterval);
-    PipeDataRegionSinkMetrics.tabletBatchSizeHistogram.update(bufferSize);
-  }
-
   @Override
   public synchronized void onSuccess() {
     super.onSuccess();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
index 660706a4325..cb8c25ef755 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.pipe.sink.payload.evolvable.batch;
 
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
-import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionSinkMetrics;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
 import 
org.apache.iotdb.db.pipe.sink.util.builder.PipeTableModelTsFileBuilderV2;
 import org.apache.iotdb.db.pipe.sink.util.builder.PipeTreeModelTsFileBuilderV2;
@@ -45,6 +44,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
 
 public class PipeTabletEventTsFileBatch extends PipeTabletEventBatch {
 
@@ -59,7 +59,18 @@ public class PipeTabletEventTsFileBatch extends 
PipeTabletEventBatch {
   private final Map<Pair<String, Long>, Double> pipeName2WeightMap = new 
HashMap<>();
 
   public PipeTabletEventTsFileBatch(final int maxDelayInMs, final long 
requestMaxBatchSizeInBytes) {
-    super(maxDelayInMs, requestMaxBatchSizeInBytes);
+    super(maxDelayInMs, requestMaxBatchSizeInBytes, null);
+
+    final AtomicLong tsFileIdGenerator = new AtomicLong(0);
+    treeModeTsFileBuilder = new PipeTreeModelTsFileBuilderV2(currentBatchId, 
tsFileIdGenerator);
+    tableModeTsFileBuilder = new PipeTableModelTsFileBuilderV2(currentBatchId, 
tsFileIdGenerator);
+  }
+
+  public PipeTabletEventTsFileBatch(
+      final int maxDelayInMs,
+      final long requestMaxBatchSizeInBytes,
+      final BiConsumer<Long, Long> recordMetric) {
+    super(maxDelayInMs, requestMaxBatchSizeInBytes, recordMetric);
 
     final AtomicLong tsFileIdGenerator = new AtomicLong(0);
     treeModeTsFileBuilder = new PipeTreeModelTsFileBuilderV2(currentBatchId, 
tsFileIdGenerator);
@@ -126,12 +137,6 @@ public class PipeTabletEventTsFileBatch extends 
PipeTabletEventBatch {
     return true;
   }
 
-  @Override
-  protected void recordMetric(long timeInterval, long bufferSize) {
-    
PipeDataRegionSinkMetrics.tsFileBatchTimeIntervalHistogram.update(timeInterval);
-    PipeDataRegionSinkMetrics.tsFileBatchSizeHistogram.update(bufferSize);
-  }
-
   private void bufferTreeModelTablet(
       final String pipeName,
       final long creationTime,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
index d30acaf146a..9c4b74a6800 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
@@ -25,6 +25,8 @@ import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertio
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.sink.client.IoTDBDataNodeCacheLeaderClientManager;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
+import org.apache.iotdb.metrics.impl.DoNothingHistogram;
+import org.apache.iotdb.metrics.type.Histogram;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -68,6 +70,11 @@ public class PipeTransferBatchReqBuilder implements 
AutoCloseable {
   private final int requestMaxDelayInMs;
   private final long requestMaxBatchSizeInBytes;
 
+  private Histogram tabletBatchSizeHistogram = new DoNothingHistogram();
+  private Histogram tsFileBatchSizeHistogram = new DoNothingHistogram();
+  private Histogram tabletBatchTimeIntervalHistogram = new 
DoNothingHistogram();
+  private Histogram tsFileBatchTimeIntervalHistogram = new 
DoNothingHistogram();
+
   // If the leader cache is disabled (or unable to find the endpoint of event 
in the leader cache),
   // the event will be stored in the default batch.
   private final PipeTabletEventBatch defaultBatch;
@@ -113,8 +120,10 @@ public class PipeTransferBatchReqBuilder implements 
AutoCloseable {
                 : CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE);
     this.defaultBatch =
         usingTsFileBatch
-            ? new PipeTabletEventTsFileBatch(requestMaxDelayInMs, 
requestMaxBatchSizeInBytes)
-            : new PipeTabletEventPlainBatch(requestMaxDelayInMs, 
requestMaxBatchSizeInBytes);
+            ? new PipeTabletEventTsFileBatch(
+                requestMaxDelayInMs, requestMaxBatchSizeInBytes, 
this::recordTsFileMetric)
+            : new PipeTabletEventPlainBatch(
+                requestMaxDelayInMs, requestMaxBatchSizeInBytes, 
this::recordTabletMetric);
   }
 
   /**
@@ -157,7 +166,9 @@ public class PipeTransferBatchReqBuilder implements 
AutoCloseable {
     endPointToBatch
         .computeIfAbsent(
             endPoint,
-            k -> new PipeTabletEventPlainBatch(requestMaxDelayInMs, 
requestMaxBatchSizeInBytes))
+            k ->
+                new PipeTabletEventPlainBatch(
+                    requestMaxDelayInMs, requestMaxBatchSizeInBytes, 
this::recordTabletMetric))
         .onEvent(event);
   }
 
@@ -208,4 +219,38 @@ public class PipeTransferBatchReqBuilder implements 
AutoCloseable {
     defaultBatch.close();
     endPointToBatch.values().forEach(PipeTabletEventPlainBatch::close);
   }
+
+  public void recordTabletMetric(long timeInterval, long bufferSize) {
+    this.tabletBatchTimeIntervalHistogram.update(timeInterval);
+    this.tabletBatchSizeHistogram.update(bufferSize);
+  }
+
+  public void recordTsFileMetric(long timeInterval, long bufferSize) {
+    this.tsFileBatchTimeIntervalHistogram.update(timeInterval);
+    this.tsFileBatchSizeHistogram.update(bufferSize);
+  }
+
+  public void setTabletBatchSizeHistogram(Histogram tabletBatchSizeHistogram) {
+    if (tabletBatchSizeHistogram != null) {
+      this.tabletBatchSizeHistogram = tabletBatchSizeHistogram;
+    }
+  }
+
+  public void setTsFileBatchSizeHistogram(Histogram tsFileBatchSizeHistogram) {
+    if (tsFileBatchSizeHistogram != null) {
+      this.tsFileBatchSizeHistogram = tsFileBatchSizeHistogram;
+    }
+  }
+
+  public void setTabletBatchTimeIntervalHistogram(Histogram 
tabletBatchTimeIntervalHistogram) {
+    if (tabletBatchTimeIntervalHistogram != null) {
+      this.tabletBatchTimeIntervalHistogram = tabletBatchTimeIntervalHistogram;
+    }
+  }
+
+  public void setTsFileBatchTimeIntervalHistogram(Histogram 
tsFileBatchTimeIntervalHistogram) {
+    if (tsFileBatchTimeIntervalHistogram != null) {
+      this.tsFileBatchTimeIntervalHistogram = tsFileBatchTimeIntervalHistogram;
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index 9fd63a054be..38ac7e0201a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -47,6 +47,7 @@ import 
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.handler.PipeTransferT
 import 
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.handler.PipeTransferTsFileHandler;
 import 
org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.metrics.type.Histogram;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.annotation.TableModel;
 import org.apache.iotdb.pipe.api.annotation.TreeModel;
@@ -832,4 +833,32 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
   public void setTransferTsFileCounter(AtomicInteger transferTsFileCounter) {
     this.transferTsFileCounter = transferTsFileCounter;
   }
+
+  @Override
+  public void setTabletBatchSizeHistogram(Histogram tabletBatchSizeHistogram) {
+    if (tabletBatchBuilder != null) {
+      tabletBatchBuilder.setTabletBatchSizeHistogram(tabletBatchSizeHistogram);
+    }
+  }
+
+  @Override
+  public void setTsFileBatchSizeHistogram(Histogram tsFileBatchSizeHistogram) {
+    if (tabletBatchBuilder != null) {
+      tabletBatchBuilder.setTsFileBatchSizeHistogram(tsFileBatchSizeHistogram);
+    }
+  }
+
+  @Override
+  public void setTabletBatchTimeIntervalHistogram(Histogram 
tabletBatchTimeIntervalHistogram) {
+    if (tabletBatchBuilder != null) {
+      
tabletBatchBuilder.setTabletBatchTimeIntervalHistogram(tabletBatchTimeIntervalHistogram);
+    }
+  }
+
+  @Override
+  public void setTsFileBatchTimeIntervalHistogram(Histogram 
tsFileBatchTimeIntervalHistogram) {
+    if (tabletBatchBuilder != null) {
+      
tabletBatchBuilder.setTsFileBatchTimeIntervalHistogram(tsFileBatchTimeIntervalHistogram);
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
index 6b8327170c4..c9185b44d44 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
@@ -48,6 +48,7 @@ import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTsFil
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTsFileSealWithModReq;
 import org.apache.iotdb.db.pipe.sink.util.cacher.LeaderCacheUtils;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.metrics.type.Histogram;
 import org.apache.iotdb.pipe.api.annotation.TableModel;
 import org.apache.iotdb.pipe.api.annotation.TreeModel;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
@@ -626,4 +627,32 @@ public class IoTDBDataRegionSyncSink extends 
IoTDBDataNodeSyncSink {
   public IoTDBDataNodeSyncClientManager getClientManager() {
     return clientManager;
   }
+
+  @Override
+  public void setTabletBatchSizeHistogram(Histogram tabletBatchSizeHistogram) {
+    if (tabletBatchBuilder != null) {
+      tabletBatchBuilder.setTabletBatchSizeHistogram(tabletBatchSizeHistogram);
+    }
+  }
+
+  @Override
+  public void setTsFileBatchSizeHistogram(Histogram tsFileBatchSizeHistogram) {
+    if (tabletBatchBuilder != null) {
+      tabletBatchBuilder.setTsFileBatchSizeHistogram(tsFileBatchSizeHistogram);
+    }
+  }
+
+  @Override
+  public void setTabletBatchTimeIntervalHistogram(Histogram 
tabletBatchTimeIntervalHistogram) {
+    if (tabletBatchBuilder != null) {
+      
tabletBatchBuilder.setTabletBatchTimeIntervalHistogram(tabletBatchTimeIntervalHistogram);
+    }
+  }
+
+  @Override
+  public void setTsFileBatchTimeIntervalHistogram(Histogram 
tsFileBatchTimeIntervalHistogram) {
+    if (tabletBatchBuilder != null) {
+      
tabletBatchBuilder.setTsFileBatchTimeIntervalHistogram(tsFileBatchTimeIntervalHistogram);
+    }
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
index 471dc28bc06..38f3503b66f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
@@ -30,6 +30,7 @@ import 
org.apache.iotdb.commons.pipe.sink.limiter.GlobalRPCRateLimiter;
 import org.apache.iotdb.commons.pipe.sink.limiter.PipeEndPointRateLimiter;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferCompressedReq;
 import org.apache.iotdb.commons.utils.NodeUrlUtils;
+import org.apache.iotdb.metrics.type.Histogram;
 import org.apache.iotdb.metrics.type.Timer;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.annotation.TableModel;
@@ -631,4 +632,20 @@ public abstract class IoTDBSink implements PipeConnector {
   public PipeReceiverStatusHandler statusHandler() {
     return receiverStatusHandler;
   }
+
+  public void setTabletBatchSizeHistogram(Histogram tabletBatchSizeHistogram) {
+    // do nothing by default
+  }
+
+  public void setTsFileBatchSizeHistogram(Histogram tsFileBatchSizeHistogram) {
+    // do nothing by default
+  }
+
+  public void setTabletBatchTimeIntervalHistogram(Histogram 
tabletBatchTimeIntervalHistogram) {
+    // do nothing by default
+  }
+
+  public void setTsFileBatchTimeIntervalHistogram(Histogram 
tsFileBatchTimeIntervalHistogram) {
+    // do nothing by default
+  }
 }

Reply via email to