This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/2.0.4 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f9922cad13a5b28e9b92e6d80ff2e9a89bb63503 Author: Caideyipi <[email protected]> AuthorDate: Wed May 7 17:59:10 2025 +0800 Pipe: Fixed the bug that disruptor queue close may block forever by not waiting real-time tsFile close (#15464) (cherry picked from commit 01a74bddf8be0fdb776efae7f69e46d9273fddd5) --- .../dataregion/realtime/assigner/PipeDataRegionAssigner.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java index ce679516dcf..4e97e4eac42 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java @@ -230,7 +230,7 @@ public class PipeDataRegionAssigner implements Closeable { final PipeTsFileInsertionEvent event) { if (PipeTimePartitionProgressIndexKeeper.getInstance() .isProgressIndexAfterOrEquals( - dataRegionId, event.getTimePartitionId(), event.getProgressIndex())) { + dataRegionId, event.getTimePartitionId(), event.forceGetProgressIndex())) { event.bindProgressIndex(maxProgressIndexForTsFileInsertionEvent.get()); if (LOGGER.isDebugEnabled()) { LOGGER.debug( @@ -241,7 +241,7 @@ public class PipeDataRegionAssigner implements Closeable { } } else { maxProgressIndexForTsFileInsertionEvent.updateAndGet( - index -> index.updateToMinimumEqualOrIsAfterProgressIndex(event.getProgressIndex())); + index -> index.updateToMinimumEqualOrIsAfterProgressIndex(event.forceGetProgressIndex())); } }
