This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
     new 54f46a06ece [To rc/1.3.3]PipeConsensus: build separate dir for each 
receiver thread. (#13272)
54f46a06ece is described below

commit 54f46a06eced106cdee95f3cd3ae3040e48f4832
Author: Peng Junzhi <[email protected]>
AuthorDate: Fri Aug 23 12:27:31 2024 +0800

    [To rc/1.3.3]PipeConsensus: build separate dir for each receiver thread. 
(#13272)
---
 .../pipeconsensus/PipeConsensusReceiver.java       | 59 ++++++++++++++++++----
 1 file changed, 50 insertions(+), 9 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index f5a3c004f7f..e514a913aaa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -101,8 +101,7 @@ public class PipeConsensusReceiver {
   private final ConsensusPipeName consensusPipeName;
   private final List<String> receiverBaseDirsName;
   // Used to buffer TsFile when transfer TsFile asynchronously.
-  private final PipeConsensusTsFileWriterPool pipeConsensusTsFileWriterPool =
-      new PipeConsensusTsFileWriterPool();
+  private final PipeConsensusTsFileWriterPool pipeConsensusTsFileWriterPool;
   private final AtomicReference<File> receiverFileDirWithIdSuffix = new 
AtomicReference<>();
   private final PipeConsensusReceiverMetrics pipeConsensusReceiverMetrics;
   private final FolderManager folderManager;
@@ -114,8 +113,6 @@ public class PipeConsensusReceiver {
     this.pipeConsensus = pipeConsensus;
     this.consensusGroupId = consensusGroupId;
     this.pipeConsensusReceiverMetrics = new PipeConsensusReceiverMetrics(this);
-    this.requestExecutor =
-        new RequestExecutor(pipeConsensusReceiverMetrics, 
pipeConsensusTsFileWriterPool);
     this.consensusPipeName = consensusPipeName;
     MetricService.getInstance().addMetricSet(pipeConsensusReceiverMetrics);
 
@@ -136,10 +133,15 @@ public class PipeConsensusReceiver {
 
     try {
       initiateTsFileBufferFolder();
+      this.pipeConsensusTsFileWriterPool =
+          new PipeConsensusTsFileWriterPool(
+              consensusPipeName, receiverFileDirWithIdSuffix.get().getPath());
     } catch (Exception e) {
       LOGGER.error("Fail to initiate file buffer folder, Error msg: {}", 
e.getMessage());
       throw new RuntimeException(e);
     }
+    this.requestExecutor =
+        new RequestExecutor(pipeConsensusReceiverMetrics, 
pipeConsensusTsFileWriterPool);
   }
 
   /**
@@ -882,8 +884,9 @@ public class PipeConsensusReceiver {
             receiverFileDirWithIdSuffix.get().getPath());
       }
     }
-
-    tsFileWriter.setWritingFile(new File(receiverFileDirWithIdSuffix.get(), 
fileName));
+    // Every tsFileWriter has its own writing path.
+    // 1 Thread --> 1 connection --> 1 tsFileWriter --> 1 path
+    tsFileWriter.setWritingFile(new 
File(tsFileWriter.getLocalWritingDirPath(), fileName));
     tsFileWriter.setWritingFileWriter(new 
RandomAccessFile(tsFileWriter.getWritingFile(), "rw"));
     LOGGER.info(
         "PipeConsensus-PipeName-{}: Writing file {} was created. Ready to 
write file pieces.",
@@ -1033,10 +1036,16 @@ public class PipeConsensusReceiver {
   private static class PipeConsensusTsFileWriterPool {
     private final Lock lock = new ReentrantLock();
     private final List<PipeConsensusTsFileWriter> 
pipeConsensusTsFileWriterPool = new ArrayList<>();
+    private final ConsensusPipeName consensusPipeName;
 
-    public PipeConsensusTsFileWriterPool() {
+    public PipeConsensusTsFileWriterPool(
+        ConsensusPipeName consensusPipeName, String receiverBasePath) throws 
IOException {
+      this.consensusPipeName = consensusPipeName;
       for (int i = 0; i < IOTDB_CONFIG.getPipeConsensusPipelineSize(); i++) {
-        pipeConsensusTsFileWriterPool.add(new PipeConsensusTsFileWriter(i));
+        PipeConsensusTsFileWriter tsFileWriter =
+            new PipeConsensusTsFileWriter(i, consensusPipeName);
+        tsFileWriter.setFilePath(receiverBasePath);
+        pipeConsensusTsFileWriterPool.add(tsFileWriter);
       }
     }
 
@@ -1103,7 +1112,9 @@ public class PipeConsensusReceiver {
   }
 
   private static class PipeConsensusTsFileWriter {
+    private final ConsensusPipeName consensusPipeName;
     private final int index;
+    private String localWritingDirPath;
     // whether this buffer is used. this will be updated when first transfer 
tsFile piece or
     // when transfer seal.
     private boolean isUsed = false;
@@ -1112,8 +1123,38 @@ public class PipeConsensusReceiver {
     private File writingFile;
     private RandomAccessFile writingFileWriter;
 
-    public PipeConsensusTsFileWriter(int index) {
+    public PipeConsensusTsFileWriter(int index, ConsensusPipeName 
consensusPipeName) {
       this.index = index;
+      this.consensusPipeName = consensusPipeName;
+    }
+
+    public void setFilePath(String receiverBasePath) throws IOException {
+      this.localWritingDirPath = receiverBasePath + File.separator + index;
+      File tsFileWriterDirectory = new File(this.localWritingDirPath);
+      // Remove exists dir
+      if (tsFileWriterDirectory.exists()) {
+        FileUtils.deleteDirectory(tsFileWriterDirectory);
+        LOGGER.info(
+            "PipeConsensus-PipeName-{}: Origin receiver tsFileWriter-{} file 
dir {} was deleted.",
+            consensusPipeName,
+            index,
+            tsFileWriterDirectory.getPath());
+      }
+      if (!tsFileWriterDirectory.mkdirs()) {
+        LOGGER.warn(
+            "PipeConsensus-PipeName-{}: Failed to create receiver 
tsFileWriter-{} file dir {}. May because authority or dir already exists etc.",
+            consensusPipeName,
+            index,
+            tsFileWriterDirectory.getPath());
+        throw new IOException(
+            String.format(
+                "PipeConsensus-PipeName-%s: Failed to create tsFileWriter-%d 
receiver file dir %s. May because authority or dir already exists etc.",
+                consensusPipeName, index, tsFileWriterDirectory.getPath()));
+      }
+    }
+
+    public String getLocalWritingDirPath() {
+      return localWritingDirPath;
     }
 
     public File getWritingFile() {

Reply via email to