This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 2910e107969 Pipe: Fix the problem that the Pipe indicator is
inaccurate and negative (#14627) (#14633)
2910e107969 is described below
commit 2910e1079692d8a54fe803921ec384960489bdd4
Author: Zhenyu Luo <[email protected]>
AuthorDate: Mon Jan 6 12:15:21 2025 +0800
Pipe: Fix the problem that the Pipe indicator is inaccurate and negative
(#14627) (#14633)
(cherry picked from commit d87f9342fabf34c0452ad58fbf2aeff75accbbdb)
---
.../PipeDataNodeRemainingEventAndTimeOperator.java | 20 +++++++++++++-------
1 file changed, 13 insertions(+), 7 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
index 10e605bba08..43bf9bc0932 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
@@ -87,13 +87,19 @@ class PipeDataNodeRemainingEventAndTimeOperator extends
PipeRemainingOperator {
}
long getRemainingEvents() {
- return tsfileEventCount.get()
- + tabletEventCount.get()
- + heartbeatEventCount.get()
- + schemaRegionExtractors.stream()
- .map(IoTDBSchemaRegionExtractor::getUnTransferredEventCount)
- .reduce(Long::sum)
- .orElse(0L);
+ final long remainingEvents =
+ tsfileEventCount.get()
+ + tabletEventCount.get()
+ + heartbeatEventCount.get()
+ + schemaRegionExtractors.stream()
+ .map(IoTDBSchemaRegionExtractor::getUnTransferredEventCount)
+ .reduce(Long::sum)
+ .orElse(0L);
+
+ // There are cases where the indicator is negative. For example, after the
Pipe is restarted,
+ // the Processor SubTask is still collecting Events, resulting in a
negative count. This
+ // situation cannot be avoided because the Pipe may be restarted
internally.
+ return remainingEvents >= 0 ? remainingEvents : 0;
}
/**