This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rc/2.0.4
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f9922cad13a5b28e9b92e6d80ff2e9a89bb63503
Author: Caideyipi <[email protected]>
AuthorDate: Wed May 7 17:59:10 2025 +0800

    Pipe: Fixed the bug that disruptor queue close may block forever by not 
waiting real-time tsFile close (#15464)
    
    (cherry picked from commit 01a74bddf8be0fdb776efae7f69e46d9273fddd5)
---
 .../dataregion/realtime/assigner/PipeDataRegionAssigner.java          | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index ce679516dcf..4e97e4eac42 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -230,7 +230,7 @@ public class PipeDataRegionAssigner implements Closeable {
       final PipeTsFileInsertionEvent event) {
     if (PipeTimePartitionProgressIndexKeeper.getInstance()
         .isProgressIndexAfterOrEquals(
-            dataRegionId, event.getTimePartitionId(), 
event.getProgressIndex())) {
+            dataRegionId, event.getTimePartitionId(), 
event.forceGetProgressIndex())) {
       event.bindProgressIndex(maxProgressIndexForTsFileInsertionEvent.get());
       if (LOGGER.isDebugEnabled()) {
         LOGGER.debug(
@@ -241,7 +241,7 @@ public class PipeDataRegionAssigner implements Closeable {
       }
     } else {
       maxProgressIndexForTsFileInsertionEvent.updateAndGet(
-          index -> 
index.updateToMinimumEqualOrIsAfterProgressIndex(event.getProgressIndex()));
+          index -> 
index.updateToMinimumEqualOrIsAfterProgressIndex(event.forceGetProgressIndex()));
     }
   }
 

Reply via email to