This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 21269a03863 Pipe: Fixed the bug that disruptor queue close may block 
forever by not waiting real-time tsFile close (#15464) (#15470)
21269a03863 is described below

commit 21269a03863fcb0734ad2c56003ecd61ac44ab5a
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 8 12:20:33 2025 +0800

    Pipe: Fixed the bug that disruptor queue close may block forever by not 
waiting real-time tsFile close (#15464) (#15470)
    
    * Pipe: Fixed the bug that disruptor queue close may block forever by not 
waiting real-time tsFile close (#15464)
    
    * Update PipeTsFileInsertionEvent.java
    
    ---------
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../event/common/tsfile/PipeTsFileInsertionEvent.java    | 16 ++++++++++++++++
 .../realtime/assigner/PipeDataRegionAssigner.java        |  4 ++--
 2 files changed, 18 insertions(+), 2 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index edbad61b631..20dee1bf567 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -318,6 +318,22 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
     }
   }
 
+  /**
+   * Get ProgressIndex without waiting for tsfile close. Can be used in 
getting progressIndex when
+   * memTable becomes immutable.
+   */
+  public ProgressIndex forceGetProgressIndex() {
+    if (resource.isEmpty()) {
+      LOGGER.warn(
+          "Skipping temporary TsFile {}'s progressIndex, will report 
MinimumProgressIndex", tsFile);
+      return MinimumProgressIndex.INSTANCE;
+    }
+    if (Objects.nonNull(overridingProgressIndex)) {
+      return overridingProgressIndex;
+    }
+    return resource.getMaxProgressIndex();
+  }
+
   @Override
   protected void reportProgress() {
     super.reportProgress();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index 7f46a3c1de7..401cf9d4eb6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -187,7 +187,7 @@ public class PipeDataRegionAssigner implements Closeable {
       final PipeTsFileInsertionEvent event) {
     if (PipeTimePartitionProgressIndexKeeper.getInstance()
         .isProgressIndexAfterOrEquals(
-            dataRegionId, event.getTimePartitionId(), 
event.getProgressIndex())) {
+            dataRegionId, event.getTimePartitionId(), 
event.forceGetProgressIndex())) {
       event.bindProgressIndex(maxProgressIndexForTsFileInsertionEvent.get());
       if (LOGGER.isDebugEnabled()) {
         LOGGER.debug(
@@ -198,7 +198,7 @@ public class PipeDataRegionAssigner implements Closeable {
       }
     } else {
       maxProgressIndexForTsFileInsertionEvent.updateAndGet(
-          index -> 
index.updateToMinimumEqualOrIsAfterProgressIndex(event.getProgressIndex()));
+          index -> 
index.updateToMinimumEqualOrIsAfterProgressIndex(event.forceGetProgressIndex()));
     }
   }
 

Reply via email to