This is an automated email from the ASF dual-hosted git repository.
yschengzi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b3aae802ac8 PipeConsensus: Delete WAL after dropping database & fix
receiver directory recovery (#12738)
b3aae802ac8 is described below
commit b3aae802ac8c6dd43c32cbbc3c9e14622dee5461
Author: yschengzi <[email protected]>
AuthorDate: Fri Jun 14 09:22:27 2024 +0800
PipeConsensus: Delete WAL after dropping database & fix receiver directory
recovery (#12738)
* fix wal deletion & create receiver dir
* update consensus protocol class
* merge judge
* modify directory path
* modify directory path
---
.../protocol/pipeconsensus/PipeConsensusReceiver.java | 19 +++++++++++++------
.../apache/iotdb/db/storageengine/StorageEngine.java | 8 +++++++-
2 files changed, 20 insertions(+), 7 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index 60a0b88beb9..5d9eb0947c1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -93,6 +93,7 @@ public class PipeConsensusReceiver {
private final PipeConsensus pipeConsensus;
private final ConsensusGroupId consensusGroupId;
// Used to buffer TsFile when transfer TsFile asynchronously.
+ private final ConsensusPipeName consensusPipeName;
private final List<String> receiverBaseDirsName;
private final PipeConsensusTsFileWriterPool pipeConsensusTsFileWriterPool =
new PipeConsensusTsFileWriterPool();
@@ -105,13 +106,12 @@ public class PipeConsensusReceiver {
ConsensusPipeName consensusPipeName) {
this.pipeConsensus = pipeConsensus;
this.consensusGroupId = consensusGroupId;
+ this.consensusPipeName = consensusPipeName;
// Each pipeConsensusReceiver has its own base directories. for example, a
default dir path is
- //
data/datanode/system/pipe/consensus/receiver/__consensus{consensusGroupId}_{leaderDataNodeId}_{followerDataNodeId}
+ //
data/datanode/system/pipe/consensus/receiver/__consensus.{consensusGroupId}_{leaderDataNodeId}_{followerDataNodeId}
receiverBaseDirsName =
-
Arrays.stream(IoTDBDescriptor.getInstance().getConfig().getPipeConsensusReceiverFileDirs())
- .map(s -> s + File.separator + consensusPipeName)
- .collect(Collectors.toList());
+
Arrays.asList(IoTDBDescriptor.getInstance().getConfig().getPipeConsensusReceiverFileDirs());
try {
this.folderManager =
@@ -837,8 +837,15 @@ public class PipeConsensusReceiver {
throw new DiskSpaceInsufficientException(receiverBaseDirsName);
}
// Create a new receiver file dir
- final File newReceiverDir = new File(receiverFileBaseDir,
consensusGroupId.toString());
- if (!newReceiverDir.exists() && !newReceiverDir.mkdirs()) {
+ final File newReceiverDir = new File(receiverFileBaseDir,
consensusPipeName.toString());
+ if (newReceiverDir.exists()) {
+ FileUtils.deleteDirectory(newReceiverDir);
+ LOGGER.info(
+ "PipeConsensus-ConsensusGroupId-{}: Origin receiver file dir {}
was deleted.",
+ consensusGroupId,
+ newReceiverDir.getPath());
+ }
+ if (!newReceiverDir.mkdirs()) {
LOGGER.warn(
"PipeConsensus-ConsensusGroupId-{}: Failed to create receiver file
dir {}.",
newReceiverDir.getPath(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index d26304e113c..e30069f70f0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -713,7 +713,13 @@ public class StorageEngine implements IService {
region.abortCompaction();
region.syncDeleteDataFiles();
region.deleteFolder(systemDir);
- if
(CONFIG.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS))
{
+ if
(CONFIG.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
+ || CONFIG
+ .getDataRegionConsensusProtocolClass()
+ .equals(ConsensusFactory.FAST_IOT_CONSENSUS)
+ || CONFIG
+ .getDataRegionConsensusProtocolClass()
+ .equals(ConsensusFactory.IOT_CONSENSUS_V2)) {
// delete wal
WALManager.getInstance()
.deleteWALNode(