This is an automated email from the ASF dual-hosted git repository. justinchen pushed a commit to branch hook-13 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 230034876f910199df8509ce9568cc01b0e98947 Author: Caideyipi <[email protected]> AuthorDate: Wed Mar 25 19:34:22 2026 +0800 new --- .../common/tablet/PipeRawTabletInsertionEvent.java | 25 ++++++++++------------ .../task/progress/interval/PipeCommitInterval.java | 3 ++- 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index 1377c415319..b72815adf9a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -85,14 +85,18 @@ public class PipeRawTabletInsertionEvent extends EnrichedEvent this.allocatedMemoryBlock = PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0); - addOnCommittedHook( - () -> { - if (shouldReportOnCommit) { - eliminateProgressIndex(); - } - }); + triggerAddHook(); } + private void triggerAddHook() { + if (shouldReportOnCommit && needToReport && sourceEvent instanceof PipeTsFileInsertionEvent) { + final PipeTsFileInsertionEvent event = ((PipeTsFileInsertionEvent) sourceEvent); + addOnCommittedHook(event::eliminateProgressIndex); + } + } + + + public PipeRawTabletInsertionEvent( final Tablet tablet, final boolean isAligned, @@ -181,14 +185,6 @@ public class PipeRawTabletInsertionEvent extends EnrichedEvent return true; } - protected void eliminateProgressIndex() { - if (needToReport) { - if (sourceEvent instanceof PipeTsFileInsertionEvent) { - ((PipeTsFileInsertionEvent) sourceEvent).eliminateProgressIndex(); - } - } - } - @Override public void bindProgressIndex(final ProgressIndex overridingProgressIndex) { // Normally not all events need to report progress, but if the overridingProgressIndex @@ -254,6 +250,7 @@ public class PipeRawTabletInsertionEvent extends EnrichedEvent public void markAsNeedToReport() { this.needToReport = true; + triggerAddHook(); } // This getter is reserved for user-defined plugins diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java index 885df4727da..456acd646dc 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.datastructure.interval.Interval; +import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -43,7 +44,7 @@ public class PipeCommitInterval extends Interval<PipeCommitInterval> { this.pipeTaskMeta = pipeTaskMeta; this.currentIndex = Objects.nonNull(currentIndex) ? currentIndex : MinimumProgressIndex.INSTANCE; - this.onCommittedHooks = onCommittedHooks; + this.onCommittedHooks = new ArrayList<>(onCommittedHooks); } @Override
