This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 002c91eb07f fix no such element error when transfer tsfile (#12889)
002c91eb07f is described below
commit 002c91eb07fcd367611ffa124e45dcecff4134d7
Author: Peng Junzhi <[email protected]>
AuthorDate: Tue Jul 9 23:40:46 2024 -0500
fix no such element error when transfer tsfile (#12889)
---
.../protocol/pipeconsensus/PipeConsensusReceiver.java | 15 +++++++++++----
1 file changed, 11 insertions(+), 4 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index 0f5af27e0a7..832f3389030 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -1041,12 +1041,19 @@ public class PipeConsensusReceiver {
// We should synchronously find the idle writer to avoid concurrency
issues.
try {
lock.lock();
- diskBuffer =
- pipeConsensusTsFileWriterPool.stream().filter(item ->
!item.isUsed()).findFirst();
- // We don't need to check diskBuffer.isPresent() here. Since
diskBuffers' length is equals
- // to ReqExecutor's buffer, so the diskBuffer is always present.
+ // We need to check diskBuffer.isPresent() here. Since there may be
both retry-sent tsfile
+ // events and real-time-sent tsfile events, causing the receiver's
tsFileWriter load to
+ // exceed IOTDB_CONFIG.getPipeConsensusPipelineSize().
+ while (!diskBuffer.isPresent()) {
+ diskBuffer =
+ pipeConsensusTsFileWriterPool.stream().filter(item ->
!item.isUsed()).findFirst();
+ Thread.sleep(RETRY_WAIT_TIME);
+ }
diskBuffer.get().setUsed(true);
diskBuffer.get().setCommitIdOfCorrespondingHolderEvent(commitId);
+ } catch (InterruptedException e) {
+ LOGGER.warn(
+ "PipeConsensus: receiver thread get interrupted when waiting for
borrowing tsFileWriter.");
} finally {
lock.unlock();
}