This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 8179015e8c1 [To dev/1.3] Pipe: Add InsertNode & tsFile transmission 
time metrics (#15668) (#15670)
8179015e8c1 is described below

commit 8179015e8c1f012e0a63357a2dc58f72ec86ced4
Author: Caideyipi <[email protected]>
AuthorDate: Sat Jun 7 17:02:51 2025 +0800

    [To dev/1.3] Pipe: Add InsertNode & tsFile transmission time metrics 
(#15668) (#15670)
    
    * Pipe: Add InsertNode transmission time
    
    * Pipe: Add InsertNode transmission time
    
    * update
    
    * update
    
    * fix
    
    * fix
    
    * fix
    
    Co-authored-by: Zhenyu Luo <[email protected]>
---
 .../tablet/PipeInsertNodeTabletInsertionEvent.java |  5 +-
 .../common/tsfile/PipeTsFileInsertionEvent.java    |  4 +-
 .../PipeDataNodeRemainingEventAndTimeMetrics.java  | 83 +++++++++++++++++++---
 .../PipeDataNodeRemainingEventAndTimeOperator.java | 21 ++++++
 .../iotdb/commons/service/metric/enums/Metric.java |  3 +
 5 files changed, 104 insertions(+), 12 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 5ead2164dbf..e7814916e5d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -87,6 +87,8 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
 
   private ProgressIndex progressIndex;
 
+  private long extractTime = 0;
+
   public PipeInsertNodeTabletInsertionEvent(
       final WALEntryHandler walEntryHandler,
       final PartialPath devicePath,
@@ -154,6 +156,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
 
   @Override
   public boolean internallyIncreaseResourceReferenceCount(final String 
holderMessage) {
+    extractTime = System.nanoTime();
     try {
       PipeDataNodeResourceManager.wal().pin(walEntryHandler);
       if (Objects.nonNull(pipeName)) {
@@ -194,7 +197,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
       if (Objects.nonNull(pipeName)) {
         PipeDataNodeAgent.task().decreaseFloatingMemoryUsageInByte(pipeName, 
ramBytesUsed());
         PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
-            .decreaseInsertNodeEventCount(pipeName, creationTime);
+            .decreaseInsertNodeEventCount(pipeName, creationTime, 
System.nanoTime() - extractTime);
       }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index a2843392636..3aa9b4a6b13 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -67,6 +67,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
 
   private final TsFileResource resource;
   private File tsFile;
+  private long extractTime = 0;
 
   // This is true iff the modFile exists and should be transferred
   private boolean isWithMod;
@@ -249,6 +250,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
 
   @Override
   public boolean internallyIncreaseResourceReferenceCount(final String 
holderMessage) {
+    extractTime = System.nanoTime();
     try {
       tsFile = 
PipeDataNodeResourceManager.tsfile().increaseFileReference(tsFile, true, 
resource);
       if (isWithMod) {
@@ -289,7 +291,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
     } finally {
       if (Objects.nonNull(pipeName)) {
         PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
-            .decreaseTsFileEventCount(pipeName, creationTime);
+            .decreaseTsFileEventCount(pipeName, creationTime, 
System.nanoTime() - extractTime);
       }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeMetrics.java
index 045f0201fcc..354a980edfd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeMetrics.java
@@ -26,7 +26,9 @@ import org.apache.iotdb.commons.service.metric.enums.Tag;
 import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.schemaregion.IoTDBSchemaRegionExtractor;
 import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
 import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Histogram;
 import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.metrics.utils.MetricType;
 
@@ -38,6 +40,7 @@ import org.slf4j.LoggerFactory;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 
 public class PipeDataNodeRemainingEventAndTimeMetrics implements IMetricSet {
 
@@ -50,11 +53,28 @@ public class PipeDataNodeRemainingEventAndTimeMetrics 
implements IMetricSet {
   private final Map<String, PipeDataNodeRemainingEventAndTimeOperator>
       remainingEventAndTimeOperatorMap = new ConcurrentHashMap<>();
 
+  private static Histogram PIPE_DATANODE_INSERTNODE_TRANSFER_TIME_HISTOGRAM =
+      DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
+  private static Histogram PIPE_DATANODE_EVENT_TRANSFER_TIME_HISTOGRAM =
+      DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
+
   //////////////////////////// bindTo & unbindFrom (metric framework) 
////////////////////////////
 
   @Override
   public void bindTo(final AbstractMetricService metricService) {
     this.metricService = metricService;
+    PIPE_DATANODE_INSERTNODE_TRANSFER_TIME_HISTOGRAM =
+        metricService.getOrCreateHistogram(
+            Metric.PIPE_DATANODE_EVENT_TRANSFER.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            "insert_node");
+    PIPE_DATANODE_EVENT_TRANSFER_TIME_HISTOGRAM =
+        metricService.getOrCreateHistogram(
+            Metric.PIPE_DATANODE_EVENT_TRANSFER.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            "tsfile");
     
ImmutableSet.copyOf(remainingEventAndTimeOperatorMap.keySet()).forEach(this::createMetrics);
   }
 
@@ -83,6 +103,20 @@ public class PipeDataNodeRemainingEventAndTimeMetrics 
implements IMetricSet {
         operator.getPipeName(),
         Tag.CREATION_TIME.toString(),
         String.valueOf(operator.getCreationTime()));
+
+    operator.setInsertNodeTransferTimer(
+        metricService.getOrCreateTimer(
+            Metric.PIPE_INSERT_NODE_EVENT_TRANSFER_TIME.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            operator.getPipeName()));
+
+    operator.setTsFileTransferTimer(
+        metricService.getOrCreateTimer(
+            Metric.PIPE_TSFILE_EVENT_TRANSFER_TIME.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            operator.getPipeName()));
   }
 
   public boolean mayRemainingInsertEventExceedLimit(final String pipeID) {
@@ -117,6 +151,17 @@ public class PipeDataNodeRemainingEventAndTimeMetrics 
implements IMetricSet {
       LOGGER.warn(
           "Failed to unbind from pipe remaining event and time metrics, 
RemainingEventAndTimeOperator map not empty");
     }
+    metricService.remove(
+        MetricType.HISTOGRAM,
+        Metric.PIPE_DATANODE_EVENT_TRANSFER.toString(),
+        Tag.NAME.toString(),
+        "insert_node");
+
+    metricService.remove(
+        MetricType.HISTOGRAM,
+        Metric.PIPE_DATANODE_EVENT_TRANSFER.toString(),
+        Tag.NAME.toString(),
+        "tsfile");
   }
 
   private void removeMetrics(final String pipeID) {
@@ -140,6 +185,16 @@ public class PipeDataNodeRemainingEventAndTimeMetrics 
implements IMetricSet {
         operator.getPipeName(),
         Tag.CREATION_TIME.toString(),
         String.valueOf(operator.getCreationTime()));
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_INSERT_NODE_EVENT_TRANSFER_TIME.toString(),
+        Tag.NAME.toString(),
+        operator.getPipeName());
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_TSFILE_EVENT_TRANSFER_TIME.toString(),
+        Tag.NAME.toString(),
+        operator.getPipeName());
     remainingEventAndTimeOperatorMap.remove(pipeID);
   }
 
@@ -181,12 +236,16 @@ public class PipeDataNodeRemainingEventAndTimeMetrics 
implements IMetricSet {
         .increaseInsertNodeEventCount();
   }
 
-  public void decreaseInsertNodeEventCount(final String pipeName, final long 
creationTime) {
-    remainingEventAndTimeOperatorMap
-        .computeIfAbsent(
+  public void decreaseInsertNodeEventCount(
+      final String pipeName, final long creationTime, final long transferTime) 
{
+    PipeDataNodeRemainingEventAndTimeOperator operator =
+        remainingEventAndTimeOperatorMap.computeIfAbsent(
             pipeName + "_" + creationTime,
-            k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, 
creationTime))
-        .decreaseInsertNodeEventCount();
+            k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, 
creationTime));
+    operator.decreaseInsertNodeEventCount();
+
+    operator.getInsertNodeTransferTimer().update(transferTime, 
TimeUnit.NANOSECONDS);
+    PIPE_DATANODE_INSERTNODE_TRANSFER_TIME_HISTOGRAM.update(transferTime);
   }
 
   public void increaseRawTabletEventCount(final String pipeName, final long 
creationTime) {
@@ -213,12 +272,16 @@ public class PipeDataNodeRemainingEventAndTimeMetrics 
implements IMetricSet {
         .increaseTsFileEventCount();
   }
 
-  public void decreaseTsFileEventCount(final String pipeName, final long 
creationTime) {
-    remainingEventAndTimeOperatorMap
-        .computeIfAbsent(
+  public void decreaseTsFileEventCount(
+      final String pipeName, final long creationTime, final long transferTime) 
{
+    final PipeDataNodeRemainingEventAndTimeOperator operator =
+        remainingEventAndTimeOperatorMap.computeIfAbsent(
             pipeName + "_" + creationTime,
-            k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, 
creationTime))
-        .decreaseTsFileEventCount();
+            k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, 
creationTime));
+
+    operator.decreaseTsFileEventCount();
+    operator.getTsFileTransferTimer().update(transferTime, 
TimeUnit.NANOSECONDS);
+    PIPE_DATANODE_EVENT_TRANSFER_TIME_HISTOGRAM.update(transferTime);
   }
 
   public void increaseHeartbeatEventCount(final String pipeName, final long 
creationTime) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
index 7aee55df429..2de7e0053f8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
@@ -25,6 +25,8 @@ import 
org.apache.iotdb.commons.pipe.metric.PipeRemainingOperator;
 import 
org.apache.iotdb.db.pipe.extractor.schemaregion.IoTDBSchemaRegionExtractor;
 import org.apache.iotdb.metrics.core.IoTDBMetricManager;
 import org.apache.iotdb.metrics.core.type.IoTDBHistogram;
+import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
+import org.apache.iotdb.metrics.type.Timer;
 import org.apache.iotdb.pipe.api.event.Event;
 
 import com.codahale.metrics.Clock;
@@ -54,6 +56,9 @@ class PipeDataNodeRemainingEventAndTimeOperator extends 
PipeRemainingOperator {
   private final IoTDBHistogram collectInvocationHistogram =
       (IoTDBHistogram) IoTDBMetricManager.getInstance().createHistogram();
 
+  private Timer insertNodeTransferTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer tsfileTransferTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+
   private volatile long lastInsertNodeEventCountSmoothingTime = Long.MIN_VALUE;
   private final Meter insertNodeEventCountMeter =
       new Meter(new ExponentialMovingAverages(), Clock.defaultClock());
@@ -230,6 +235,22 @@ class PipeDataNodeRemainingEventAndTimeOperator extends 
PipeRemainingOperator {
     collectInvocationHistogram.update(Math.max(collectInvocationCount, 1));
   }
 
+  public void setInsertNodeTransferTimer(Timer insertNodeTransferTimer) {
+    this.insertNodeTransferTimer = insertNodeTransferTimer;
+  }
+
+  public Timer getInsertNodeTransferTimer() {
+    return insertNodeTransferTimer;
+  }
+
+  public void setTsFileTransferTimer(Timer tsFileTransferTimer) {
+    this.tsfileTransferTimer = tsFileTransferTimer;
+  }
+
+  public Timer getTsFileTransferTimer() {
+    return tsfileTransferTimer;
+  }
+
   //////////////////////////// Switch ////////////////////////////
 
   // Thread-safe & Idempotent
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index a0f131eefec..145b4adaf94 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -174,6 +174,9 @@ public enum Metric {
   PIPE_CONNECTOR_SCHEMA_TRANSFER("pipe_connector_schema_transfer"),
   PIPE_DATANODE_REMAINING_EVENT_COUNT("pipe_datanode_remaining_event_count"),
   PIPE_DATANODE_REMAINING_TIME("pipe_datanode_remaining_time"),
+  PIPE_INSERT_NODE_EVENT_TRANSFER_TIME("pipe_insert_node_event_transfer_time"),
+  PIPE_TSFILE_EVENT_TRANSFER_TIME("pipe_tsfile_event_transfer_time"),
+  PIPE_DATANODE_EVENT_TRANSFER("pipe_datanode_event_transfer"),
   PIPE_CONFIG_LINKED_QUEUE_SIZE("pipe_config_linked_queue_size"),
   UNTRANSFERRED_CONFIG_COUNT("untransferred_config_count"),
   PIPE_CONNECTOR_CONFIG_TRANSFER("pipe_connector_config_transfer"),

Reply via email to