This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 81397b2156c [To dev/1.3] Pipe: Fixed the on committed hook square bug 
& Trimmed the raw tablet hook & Fixed the premature report for source event & 
Skipped the parsing of time-covered tsFile (#17360) (#17362)
81397b2156c is described below

commit 81397b2156c4cad07fdecaea59e1888e6c9d1d79
Author: Caideyipi <[email protected]>
AuthorDate: Thu Mar 26 09:31:50 2026 +0800

    [To dev/1.3] Pipe: Fixed the on committed hook square bug & Trimmed the raw 
tablet hook & Fixed the premature report for source event & Skipped the parsing 
of time-covered tsFile (#17360) (#17362)
    
    * new
    
    * fix-data
    
    * fix
    
    * revert
    
    * fix
---
 .../common/tablet/PipeRawTabletInsertionEvent.java | 33 +++++++++++++++-------
 .../common/tsfile/PipeTsFileInsertionEvent.java    | 11 ++++++++
 .../task/progress/interval/PipeCommitInterval.java |  3 +-
 3 files changed, 36 insertions(+), 11 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 1377c415319..d322291934f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -85,12 +85,14 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent
     this.allocatedMemoryBlock =
         
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
 
-    addOnCommittedHook(
-        () -> {
-          if (shouldReportOnCommit) {
-            eliminateProgressIndex();
-          }
-        });
+    if (needToReport) {
+      addOnCommittedHook(
+          () -> {
+            if (shouldReportOnCommit) {
+              eliminateProgressIndex();
+            }
+          });
+    }
   }
 
   public PipeRawTabletInsertionEvent(
@@ -182,10 +184,8 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent
   }
 
   protected void eliminateProgressIndex() {
-    if (needToReport) {
-      if (sourceEvent instanceof PipeTsFileInsertionEvent) {
-        ((PipeTsFileInsertionEvent) sourceEvent).eliminateProgressIndex();
-      }
+    if (sourceEvent instanceof PipeTsFileInsertionEvent) {
+      ((PipeTsFileInsertionEvent) sourceEvent).eliminateProgressIndex();
     }
   }
 
@@ -253,6 +253,14 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent
   }
 
   public void markAsNeedToReport() {
+    if (!needToReport) {
+      addOnCommittedHook(
+          () -> {
+            if (shouldReportOnCommit) {
+              eliminateProgressIndex();
+            }
+          });
+    }
     this.needToReport = true;
   }
 
@@ -270,6 +278,11 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent
     return sourceEvent;
   }
 
+  @Override
+  public boolean isShouldReportOnCommit() {
+    return shouldReportOnCommit && needToReport;
+  }
+
   /////////////////////////// TabletInsertionEvent ///////////////////////////
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 858c425c3e6..f099301a63c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -375,6 +375,17 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
         || startTime <= resource.getFileEndTime() && 
resource.getFileStartTime() <= endTime;
   }
 
+  @Override
+  public boolean shouldParseTime() {
+    if (!isTimeParsed
+        && Objects.nonNull(resource)
+        && startTime <= resource.getFileStartTime()
+        && resource.getFileEndTime() <= endTime) {
+      isTimeParsed = true;
+    }
+    return !isTimeParsed;
+  }
+
   @Override
   public boolean mayEventPathsOverlappedWithPattern() {
     if (Objects.isNull(resource) || !resource.isClosed()) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java
index 885df4727da..456acd646dc 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.datastructure.interval.Interval;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
@@ -43,7 +44,7 @@ public class PipeCommitInterval extends 
Interval<PipeCommitInterval> {
     this.pipeTaskMeta = pipeTaskMeta;
     this.currentIndex =
         Objects.nonNull(currentIndex) ? currentIndex : 
MinimumProgressIndex.INSTANCE;
-    this.onCommittedHooks = onCommittedHooks;
+    this.onCommittedHooks = new ArrayList<>(onCommittedHooks);
   }
 
   @Override

Reply via email to