This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
new 54f46a06ece [To rc/1.3.3]PipeConsensus: build separate dir for each
receiver thread. (#13272)
54f46a06ece is described below
commit 54f46a06eced106cdee95f3cd3ae3040e48f4832
Author: Peng Junzhi <[email protected]>
AuthorDate: Fri Aug 23 12:27:31 2024 +0800
[To rc/1.3.3]PipeConsensus: build separate dir for each receiver thread.
(#13272)
---
.../pipeconsensus/PipeConsensusReceiver.java | 59 ++++++++++++++++++----
1 file changed, 50 insertions(+), 9 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index f5a3c004f7f..e514a913aaa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -101,8 +101,7 @@ public class PipeConsensusReceiver {
private final ConsensusPipeName consensusPipeName;
private final List<String> receiverBaseDirsName;
// Used to buffer TsFile when transfer TsFile asynchronously.
- private final PipeConsensusTsFileWriterPool pipeConsensusTsFileWriterPool =
- new PipeConsensusTsFileWriterPool();
+ private final PipeConsensusTsFileWriterPool pipeConsensusTsFileWriterPool;
private final AtomicReference<File> receiverFileDirWithIdSuffix = new
AtomicReference<>();
private final PipeConsensusReceiverMetrics pipeConsensusReceiverMetrics;
private final FolderManager folderManager;
@@ -114,8 +113,6 @@ public class PipeConsensusReceiver {
this.pipeConsensus = pipeConsensus;
this.consensusGroupId = consensusGroupId;
this.pipeConsensusReceiverMetrics = new PipeConsensusReceiverMetrics(this);
- this.requestExecutor =
- new RequestExecutor(pipeConsensusReceiverMetrics,
pipeConsensusTsFileWriterPool);
this.consensusPipeName = consensusPipeName;
MetricService.getInstance().addMetricSet(pipeConsensusReceiverMetrics);
@@ -136,10 +133,15 @@ public class PipeConsensusReceiver {
try {
initiateTsFileBufferFolder();
+ this.pipeConsensusTsFileWriterPool =
+ new PipeConsensusTsFileWriterPool(
+ consensusPipeName, receiverFileDirWithIdSuffix.get().getPath());
} catch (Exception e) {
LOGGER.error("Fail to initiate file buffer folder, Error msg: {}",
e.getMessage());
throw new RuntimeException(e);
}
+ this.requestExecutor =
+ new RequestExecutor(pipeConsensusReceiverMetrics,
pipeConsensusTsFileWriterPool);
}
/**
@@ -882,8 +884,9 @@ public class PipeConsensusReceiver {
receiverFileDirWithIdSuffix.get().getPath());
}
}
-
- tsFileWriter.setWritingFile(new File(receiverFileDirWithIdSuffix.get(),
fileName));
+ // Every tsFileWriter has its own writing path.
+ // 1 Thread --> 1 connection --> 1 tsFileWriter --> 1 path
+ tsFileWriter.setWritingFile(new
File(tsFileWriter.getLocalWritingDirPath(), fileName));
tsFileWriter.setWritingFileWriter(new
RandomAccessFile(tsFileWriter.getWritingFile(), "rw"));
LOGGER.info(
"PipeConsensus-PipeName-{}: Writing file {} was created. Ready to
write file pieces.",
@@ -1033,10 +1036,16 @@ public class PipeConsensusReceiver {
private static class PipeConsensusTsFileWriterPool {
private final Lock lock = new ReentrantLock();
private final List<PipeConsensusTsFileWriter>
pipeConsensusTsFileWriterPool = new ArrayList<>();
+ private final ConsensusPipeName consensusPipeName;
- public PipeConsensusTsFileWriterPool() {
+ public PipeConsensusTsFileWriterPool(
+ ConsensusPipeName consensusPipeName, String receiverBasePath) throws
IOException {
+ this.consensusPipeName = consensusPipeName;
for (int i = 0; i < IOTDB_CONFIG.getPipeConsensusPipelineSize(); i++) {
- pipeConsensusTsFileWriterPool.add(new PipeConsensusTsFileWriter(i));
+ PipeConsensusTsFileWriter tsFileWriter =
+ new PipeConsensusTsFileWriter(i, consensusPipeName);
+ tsFileWriter.setFilePath(receiverBasePath);
+ pipeConsensusTsFileWriterPool.add(tsFileWriter);
}
}
@@ -1103,7 +1112,9 @@ public class PipeConsensusReceiver {
}
private static class PipeConsensusTsFileWriter {
+ private final ConsensusPipeName consensusPipeName;
private final int index;
+ private String localWritingDirPath;
// whether this buffer is used. this will be updated when first transfer
tsFile piece or
// when transfer seal.
private boolean isUsed = false;
@@ -1112,8 +1123,38 @@ public class PipeConsensusReceiver {
private File writingFile;
private RandomAccessFile writingFileWriter;
- public PipeConsensusTsFileWriter(int index) {
+ public PipeConsensusTsFileWriter(int index, ConsensusPipeName
consensusPipeName) {
this.index = index;
+ this.consensusPipeName = consensusPipeName;
+ }
+
+ public void setFilePath(String receiverBasePath) throws IOException {
+ this.localWritingDirPath = receiverBasePath + File.separator + index;
+ File tsFileWriterDirectory = new File(this.localWritingDirPath);
+ // Remove exists dir
+ if (tsFileWriterDirectory.exists()) {
+ FileUtils.deleteDirectory(tsFileWriterDirectory);
+ LOGGER.info(
+ "PipeConsensus-PipeName-{}: Origin receiver tsFileWriter-{} file
dir {} was deleted.",
+ consensusPipeName,
+ index,
+ tsFileWriterDirectory.getPath());
+ }
+ if (!tsFileWriterDirectory.mkdirs()) {
+ LOGGER.warn(
+ "PipeConsensus-PipeName-{}: Failed to create receiver
tsFileWriter-{} file dir {}. May because authority or dir already exists etc.",
+ consensusPipeName,
+ index,
+ tsFileWriterDirectory.getPath());
+ throw new IOException(
+ String.format(
+ "PipeConsensus-PipeName-%s: Failed to create tsFileWriter-%d
receiver file dir %s. May because authority or dir already exists etc.",
+ consensusPipeName, index, tsFileWriterDirectory.getPath()));
+ }
+ }
+
+ public String getLocalWritingDirPath() {
+ return localWritingDirPath;
}
public File getWritingFile() {