This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch fix-pipe-stuck in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e9605b64abddc763cd73cf04a26b5dd4e77322fc Author: Steve Yurong Su <[email protected]> AuthorDate: Thu Jun 13 11:03:47 2024 +0800 Pipe: fix threads of IoTDB-Pipe-Processor-Executor-Pool stucked by PipeTsFileInsertionEvent#waitForTsFileClose --- .../pipe/event/common/tsfile/PipeTsFileInsertionEvent.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index cdab8d42215..55739bf8f32 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -135,12 +135,22 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns */ public boolean waitForTsFileClose() throws InterruptedException { if (!isClosed.get()) { + isClosed.set(resource.isClosed()); + synchronized (isClosed) { while (!isClosed.get()) { - isClosed.wait(); + isClosed.wait(100); + + final boolean isClosedNow = resource.isClosed(); + if (isClosedNow) { + isClosed.set(true); + isClosed.notifyAll(); + break; + } } } } + // From illustrations above we know If the status is "closed", then the tsFile is flushed // And here we guarantee that the isEmpty() is set before flushing if tsFile is empty // Then we know: "isClosed" --> tsFile flushed --> (isEmpty() <--> tsFile is empty)
