This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 2910e107969 Pipe: Fix the problem that the Pipe indicator is 
inaccurate and negative (#14627) (#14633)
2910e107969 is described below

commit 2910e1079692d8a54fe803921ec384960489bdd4
Author: Zhenyu Luo <[email protected]>
AuthorDate: Mon Jan 6 12:15:21 2025 +0800

    Pipe: Fix the problem that the Pipe indicator is inaccurate and negative 
(#14627) (#14633)
    
    (cherry picked from commit d87f9342fabf34c0452ad58fbf2aeff75accbbdb)
---
 .../PipeDataNodeRemainingEventAndTimeOperator.java   | 20 +++++++++++++-------
 1 file changed, 13 insertions(+), 7 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
index 10e605bba08..43bf9bc0932 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
@@ -87,13 +87,19 @@ class PipeDataNodeRemainingEventAndTimeOperator extends 
PipeRemainingOperator {
   }
 
   long getRemainingEvents() {
-    return tsfileEventCount.get()
-        + tabletEventCount.get()
-        + heartbeatEventCount.get()
-        + schemaRegionExtractors.stream()
-            .map(IoTDBSchemaRegionExtractor::getUnTransferredEventCount)
-            .reduce(Long::sum)
-            .orElse(0L);
+    final long remainingEvents =
+        tsfileEventCount.get()
+            + tabletEventCount.get()
+            + heartbeatEventCount.get()
+            + schemaRegionExtractors.stream()
+                .map(IoTDBSchemaRegionExtractor::getUnTransferredEventCount)
+                .reduce(Long::sum)
+                .orElse(0L);
+
+    // There are cases where the indicator is negative. For example, after the 
Pipe is restarted,
+    // the Processor SubTask is still collecting Events, resulting in a 
negative count. This
+    // situation cannot be avoided because the Pipe may be restarted 
internally.
+    return remainingEvents >= 0 ? remainingEvents : 0;
   }
 
   /**

Reply via email to