This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 002c91eb07f fix no such element error when transfer tsfile (#12889)
002c91eb07f is described below

commit 002c91eb07fcd367611ffa124e45dcecff4134d7
Author: Peng Junzhi <[email protected]>
AuthorDate: Tue Jul 9 23:40:46 2024 -0500

    fix no such element error when transfer tsfile (#12889)
---
 .../protocol/pipeconsensus/PipeConsensusReceiver.java     | 15 +++++++++++----
 1 file changed, 11 insertions(+), 4 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index 0f5af27e0a7..832f3389030 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -1041,12 +1041,19 @@ public class PipeConsensusReceiver {
         // We should synchronously find the idle writer to avoid concurrency 
issues.
         try {
           lock.lock();
-          diskBuffer =
-              pipeConsensusTsFileWriterPool.stream().filter(item -> 
!item.isUsed()).findFirst();
-          // We don't need to check diskBuffer.isPresent() here. Since 
diskBuffers' length is equals
-          // to ReqExecutor's buffer, so the diskBuffer is always present.
+          // We need to check diskBuffer.isPresent() here. Since there may be 
both retry-sent tsfile
+          // events and real-time-sent tsfile events, causing the receiver's 
tsFileWriter load to
+          // exceed IOTDB_CONFIG.getPipeConsensusPipelineSize().
+          while (!diskBuffer.isPresent()) {
+            diskBuffer =
+                pipeConsensusTsFileWriterPool.stream().filter(item -> 
!item.isUsed()).findFirst();
+            Thread.sleep(RETRY_WAIT_TIME);
+          }
           diskBuffer.get().setUsed(true);
           diskBuffer.get().setCommitIdOfCorrespondingHolderEvent(commitId);
+        } catch (InterruptedException e) {
+          LOGGER.warn(
+              "PipeConsensus: receiver thread get interrupted when waiting for 
borrowing tsFileWriter.");
         } finally {
           lock.unlock();
         }

Reply via email to