This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 01a74bddf8b Pipe: Fixed the bug that disruptor queue close may block
forever by not waiting real-time tsFile close (#15464)
01a74bddf8b is described below
commit 01a74bddf8be0fdb776efae7f69e46d9273fddd5
Author: Caideyipi <[email protected]>
AuthorDate: Wed May 7 17:59:10 2025 +0800
Pipe: Fixed the bug that disruptor queue close may block forever by not
waiting real-time tsFile close (#15464)
---
.../dataregion/realtime/assigner/PipeDataRegionAssigner.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index ce679516dcf..4e97e4eac42 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -230,7 +230,7 @@ public class PipeDataRegionAssigner implements Closeable {
final PipeTsFileInsertionEvent event) {
if (PipeTimePartitionProgressIndexKeeper.getInstance()
.isProgressIndexAfterOrEquals(
- dataRegionId, event.getTimePartitionId(),
event.getProgressIndex())) {
+ dataRegionId, event.getTimePartitionId(),
event.forceGetProgressIndex())) {
event.bindProgressIndex(maxProgressIndexForTsFileInsertionEvent.get());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
@@ -241,7 +241,7 @@ public class PipeDataRegionAssigner implements Closeable {
}
} else {
maxProgressIndexForTsFileInsertionEvent.updateAndGet(
- index ->
index.updateToMinimumEqualOrIsAfterProgressIndex(event.getProgressIndex()));
+ index ->
index.updateToMinimumEqualOrIsAfterProgressIndex(event.forceGetProgressIndex()));
}
}