This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 21269a03863 Pipe: Fixed the bug that disruptor queue close may block
forever by not waiting real-time tsFile close (#15464) (#15470)
21269a03863 is described below
commit 21269a03863fcb0734ad2c56003ecd61ac44ab5a
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 8 12:20:33 2025 +0800
Pipe: Fixed the bug that disruptor queue close may block forever by not
waiting real-time tsFile close (#15464) (#15470)
* Pipe: Fixed the bug that disruptor queue close may block forever by not
waiting real-time tsFile close (#15464)
* Update PipeTsFileInsertionEvent.java
---------
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../event/common/tsfile/PipeTsFileInsertionEvent.java | 16 ++++++++++++++++
.../realtime/assigner/PipeDataRegionAssigner.java | 4 ++--
2 files changed, 18 insertions(+), 2 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index edbad61b631..20dee1bf567 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -318,6 +318,22 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
}
}
+ /**
+ * Get ProgressIndex without waiting for tsfile close. Can be used in
getting progressIndex when
+ * memTable becomes immutable.
+ */
+ public ProgressIndex forceGetProgressIndex() {
+ if (resource.isEmpty()) {
+ LOGGER.warn(
+ "Skipping temporary TsFile {}'s progressIndex, will report
MinimumProgressIndex", tsFile);
+ return MinimumProgressIndex.INSTANCE;
+ }
+ if (Objects.nonNull(overridingProgressIndex)) {
+ return overridingProgressIndex;
+ }
+ return resource.getMaxProgressIndex();
+ }
+
@Override
protected void reportProgress() {
super.reportProgress();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index 7f46a3c1de7..401cf9d4eb6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -187,7 +187,7 @@ public class PipeDataRegionAssigner implements Closeable {
final PipeTsFileInsertionEvent event) {
if (PipeTimePartitionProgressIndexKeeper.getInstance()
.isProgressIndexAfterOrEquals(
- dataRegionId, event.getTimePartitionId(),
event.getProgressIndex())) {
+ dataRegionId, event.getTimePartitionId(),
event.forceGetProgressIndex())) {
event.bindProgressIndex(maxProgressIndexForTsFileInsertionEvent.get());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
@@ -198,7 +198,7 @@ public class PipeDataRegionAssigner implements Closeable {
}
} else {
maxProgressIndexForTsFileInsertionEvent.updateAndGet(
- index ->
index.updateToMinimumEqualOrIsAfterProgressIndex(event.getProgressIndex()));
+ index ->
index.updateToMinimumEqualOrIsAfterProgressIndex(event.forceGetProgressIndex()));
}
}