This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 98ca22d92f4 [To dev/1.3] Pipe: Fix lost transfer time metric & Support 
transfer time tracking for RawTablet events converted from InsertNode/TsFile 
events (#16364) (#16370)
98ca22d92f4 is described below

commit 98ca22d92f420f0a76781fb7a62315005ccc5f2c
Author: nanxiang xia <[email protected]>
AuthorDate: Tue Sep 9 09:42:42 2025 +0800

    [To dev/1.3] Pipe: Fix lost transfer time metric & Support transfer time 
tracking for RawTablet events converted from InsertNode/TsFile events (#16364) 
(#16370)
    
    * fix metric
    
    * fix
    
    * fix
---
 .../tablet/PipeInsertNodeTabletInsertionEvent.java |  4 +++
 .../common/tablet/PipeRawTabletInsertionEvent.java | 19 ++++++++++++
 .../common/tsfile/PipeTsFileInsertionEvent.java    |  4 +++
 .../overview/PipeDataNodeSinglePipeMetrics.java    | 34 ++++++++++++++++++++++
 4 files changed, 61 insertions(+)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 772d811292b..c8390a8baa1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -115,6 +115,10 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
         : null;
   }
 
+  public long getExtractTime() {
+    return extractTime;
+  }
+
   /////////////////////////// EnrichedEvent ///////////////////////////
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 5ae27d5eb3c..2443283c1c6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -158,6 +158,25 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent
     // Actually release the occupied memory.
     tablet = null;
     dataContainer = null;
+
+    // Update metrics of the source event
+    if (needToReport && shouldReportOnCommit && Objects.nonNull(pipeName)) {
+      if (sourceEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+        PipeDataNodeSinglePipeMetrics.getInstance()
+            .updateInsertNodeTransferTimer(
+                pipeName,
+                creationTime,
+                System.nanoTime()
+                    - ((PipeInsertNodeTabletInsertionEvent) 
sourceEvent).getExtractTime());
+      } else if (sourceEvent instanceof PipeTsFileInsertionEvent) {
+        PipeDataNodeSinglePipeMetrics.getInstance()
+            .updateTsFileTransferTimer(
+                pipeName,
+                creationTime,
+                System.nanoTime() - ((PipeTsFileInsertionEvent) 
sourceEvent).getExtractTime());
+      }
+    }
+
     return true;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index baa6ae9a3ba..311190f0181 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -247,6 +247,10 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
     return resource.getTimePartition();
   }
 
+  public long getExtractTime() {
+    return extractTime;
+  }
+
   /////////////////////////// EnrichedEvent ///////////////////////////
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
index 54705fc84cf..8c1ef90f97f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 
 public class PipeDataNodeSinglePipeMetrics implements IMetricSet {
 
@@ -236,7 +237,24 @@ public class PipeDataNodeSinglePipeMetrics implements 
IMetricSet {
         remainingEventAndTimeOperatorMap.computeIfAbsent(
             pipeName + "_" + creationTime,
             k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, 
creationTime));
+
     operator.decreaseInsertNodeEventCount();
+
+    if (transferTime > 0) {
+      operator.getInsertNodeTransferTimer().update(transferTime, 
TimeUnit.NANOSECONDS);
+    }
+  }
+
+  public void updateInsertNodeTransferTimer(
+      final String pipeName, final long creationTime, final long transferTime) 
{
+    if (transferTime > 0) {
+      remainingEventAndTimeOperatorMap
+          .computeIfAbsent(
+              pipeName + "_" + creationTime,
+              k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, 
creationTime))
+          .getInsertNodeTransferTimer()
+          .update(transferTime, TimeUnit.NANOSECONDS);
+    }
   }
 
   public void increaseRawTabletEventCount(final String pipeName, final long 
creationTime) {
@@ -271,6 +289,22 @@ public class PipeDataNodeSinglePipeMetrics implements 
IMetricSet {
             k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, 
creationTime));
 
     operator.decreaseTsFileEventCount();
+
+    if (transferTime > 0) {
+      operator.getTsFileTransferTimer().update(transferTime, 
TimeUnit.NANOSECONDS);
+    }
+  }
+
+  public void updateTsFileTransferTimer(
+      final String pipeName, final long creationTime, final long transferTime) 
{
+    if (transferTime > 0) {
+      remainingEventAndTimeOperatorMap
+          .computeIfAbsent(
+              pipeName + "_" + creationTime,
+              k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, 
creationTime))
+          .getTsFileTransferTimer()
+          .update(transferTime, TimeUnit.NANOSECONDS);
+    }
   }
 
   public void increaseHeartbeatEventCount(final String pipeName, final long 
creationTime) {

Reply via email to