This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d92f475296 [region migration] Make peer list correct when 
IoTConsensus & IoTConsensusV2 & Ratis starting (#14505)
7d92f475296 is described below

commit 7d92f47529645f3399727a5dbc02db2de7052021
Author: Li Yu Heng <[email protected]>
AuthorDate: Mon Dec 23 20:03:56 2024 +0800

    [region migration] Make peer list correct when IoTConsensus & 
IoTConsensusV2 & Ratis starting (#14505)
    
    * done
    
    * for IoTV2 ?
    
    * test conf
    
    * fix iotv2
    
    * ˜ø
    
    * still to handle ratis
    
    * all done
    
    * rename start to starting
    
    * use string var
    
    * spotless
    
    * tan review
    
    * tan review
    
    * add UT for iotv1
    
    * fix review
    
    * fix RatisConsensus
    
    * RatisConsensus only sendConfiguration if necessary
    
    * done
    
    * add peer list
    
    ---------
    
    Co-authored-by: Peng Junzhi <[email protected]>
    Co-authored-by: Peng Junzhi <[email protected]>
---
 .../iotdb/confignode/manager/node/NodeManager.java |   8 +-
 .../confignode/persistence/pipe/PipeTaskInfo.java  |  12 +-
 .../org/apache/iotdb/consensus/IConsensus.java     |  21 ++--
 .../apache/iotdb/consensus/iot/IoTConsensus.java   |  93 ++++++++------
 .../consensus/iot/IoTConsensusServerImpl.java      |  11 +-
 .../service/IoTConsensusRPCServiceProcessor.java   |   9 +-
 .../apache/iotdb/consensus/pipe/PipeConsensus.java | 139 ++++++++++++++-------
 .../iotdb/consensus/ratis/RatisConsensus.java      |  78 +++++++-----
 .../iotdb/consensus/simple/SimpleConsensus.java    |  12 +-
 .../apache/iotdb/consensus/iot/ReplicateTest.java  |   4 -
 .../apache/iotdb/consensus/iot/StabilityTest.java  |  82 +++++++++---
 .../iotdb/consensus/ratis/RatisConsensusTest.java  |   4 -
 .../consensus/ConsensusPipeDataNodeDispatcher.java |  11 ++
 .../java/org/apache/iotdb/db/service/DataNode.java |  91 +++++++-------
 .../config/constant/PipeRPCMessageConstant.java    |  31 +++++
 .../src/main/thrift/confignode.thrift              |   2 +-
 16 files changed, 375 insertions(+), 233 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 3bba8ff8da5..ecdb8e4ba45 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.confignode.manager.node;
 
 import org.apache.iotdb.common.rpc.thrift.TAINodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
@@ -362,11 +361,8 @@ public class NodeManager {
 
     resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART);
     resp.setRuntimeConfiguration(getRuntimeConfiguration());
-    List<TConsensusGroupId> consensusGroupIds =
-        getPartitionManager().getAllReplicaSets(nodeId).stream()
-            .map(TRegionReplicaSet::getRegionId)
-            .collect(Collectors.toList());
-    resp.setConsensusGroupIds(consensusGroupIds);
+
+    
resp.setCorrectConsensusGroups(getPartitionManager().getAllReplicaSets(nodeId));
     return resp;
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 70d23a47cc7..372209bfd74 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -78,6 +78,8 @@ import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
 import static 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeRPCMessageConstant.PIPE_ALREADY_EXIST_MSG;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeRPCMessageConstant.PIPE_NOT_EXIST_MSG;
 
 public class PipeTaskInfo implements SnapshotProcessor {
 
@@ -179,8 +181,8 @@ public class PipeTaskInfo implements SnapshotProcessor {
 
     final String exceptionMessage =
         String.format(
-            "Failed to create pipe %s, the pipe with the same name has been 
created",
-            createPipeRequest.getPipeName());
+            "Failed to create pipe %s, %s",
+            createPipeRequest.getPipeName(), PIPE_ALREADY_EXIST_MSG);
     LOGGER.warn(exceptionMessage);
     throw new PipeException(exceptionMessage);
   }
@@ -204,7 +206,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
 
       final String exceptionMessage =
           String.format(
-              "Failed to alter pipe %s, the pipe does not exist", 
alterPipeRequest.getPipeName());
+              "Failed to alter pipe %s, %s", alterPipeRequest.getPipeName(), 
PIPE_NOT_EXIST_MSG);
       LOGGER.warn(exceptionMessage);
       throw new PipeException(exceptionMessage);
     }
@@ -281,7 +283,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
   private void checkBeforeStartPipeInternal(final String pipeName) throws 
PipeException {
     if (!isPipeExisted(pipeName)) {
       final String exceptionMessage =
-          String.format("Failed to start pipe %s, the pipe does not exist", 
pipeName);
+          String.format("Failed to start pipe %s, %s", pipeName, 
PIPE_NOT_EXIST_MSG);
       LOGGER.warn(exceptionMessage);
       throw new PipeException(exceptionMessage);
     }
@@ -307,7 +309,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
   private void checkBeforeStopPipeInternal(final String pipeName) throws 
PipeException {
     if (!isPipeExisted(pipeName)) {
       final String exceptionMessage =
-          String.format("Failed to stop pipe %s, the pipe does not exist", 
pipeName);
+          String.format("Failed to stop pipe %s, %s", pipeName, 
PIPE_NOT_EXIST_MSG);
       LOGGER.warn(exceptionMessage);
       throw new PipeException(exceptionMessage);
     }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
index 0e3af42d802..8f49af524bc 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
@@ -37,6 +37,7 @@ import javax.annotation.concurrent.ThreadSafe;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 /** Consensus module base interface. */
 @ThreadSafe
@@ -145,6 +146,15 @@ public interface IConsensus {
    */
   void removeRemotePeer(ConsensusGroupId groupId, Peer peer) throws 
ConsensusException;
 
+  /**
+   * Record the correct peer list (likely got from the ConfigNode) for future 
use in resetPeerList.
+   * Only use this method if necessary. If it is called, it should be called 
before {@link
+   * #start()}.
+   *
+   * @param correctPeerList The correct consensus group member list
+   */
+  void recordCorrectPeerListBeforeStarting(Map<ConsensusGroupId, List<Peer>> 
correctPeerList);
+
   /**
    * Reset the peer list of the corresponding consensus group. Currently only 
used in the automatic
    * cleanup of region migration as a rollback for {@link 
#addRemotePeer(ConsensusGroupId, Peer)},
@@ -226,17 +236,6 @@ public interface IConsensus {
    */
   List<ConsensusGroupId> getAllConsensusGroupIds();
 
-  /**
-   * Return all consensus group ids from disk.
-   *
-   * <p>We need to parse all the RegionGroupIds from the disk directory before 
starting the
-   * consensus layer, and {@link #getAllConsensusGroupIds()} returns an empty 
list, so we need to
-   * add a new interface.
-   *
-   * @return consensusGroupId list
-   */
-  List<ConsensusGroupId> getAllConsensusGroupIdsWithoutStarting();
-
   /**
    * Return the region directory of the corresponding consensus group.
    *
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index 7f1222e5c67..987ba658538 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -80,6 +80,8 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
 
 public class IoTConsensus implements IConsensus {
 
@@ -99,6 +101,7 @@ public class IoTConsensus implements IConsensus {
   private final IClientManager<TEndPoint, SyncIoTConsensusServiceClient> 
syncClientManager;
   private final ScheduledExecutorService backgroundTaskService;
   private Future<?> updateReaderFuture;
+  private Map<ConsensusGroupId, List<Peer>> correctPeerListBeforeStart = null;
 
   public IoTConsensus(ConsensusConfig config, Registry registry) {
     this.thisNode = config.getThisNodeEndPoint();
@@ -178,10 +181,32 @@ public class IoTConsensus implements IConsensus {
                   syncClientManager,
                   config);
           stateMachineMap.put(consensusGroupId, consensus);
-          consensus.start();
         }
       }
     }
+    if (correctPeerListBeforeStart != null) {
+      BiConsumer<ConsensusGroupId, List<Peer>> resetPeerListWithoutThrow =
+          (consensusGroupId, peers) -> {
+            try {
+              resetPeerList(consensusGroupId, peers);
+            } catch (ConsensusGroupNotExistException ignore) {
+
+            } catch (Exception e) {
+              logger.warn("Failed to reset peer list while start", e);
+            }
+          };
+      // make peers which are in list correct
+      correctPeerListBeforeStart.forEach(resetPeerListWithoutThrow);
+      // clear peers which are not in the list
+      stateMachineMap.keySet().stream()
+          .filter(consensusGroupId -> 
!correctPeerListBeforeStart.containsKey(consensusGroupId))
+          // copy to a new list to avoid concurrent modification
+          .collect(Collectors.toList())
+          .forEach(
+              consensusGroupId ->
+                  resetPeerListWithoutThrow.accept(consensusGroupId, 
Collections.emptyList()));
+    }
+    stateMachineMap.values().forEach(IoTConsensusServerImpl::start);
   }
 
   @Override
@@ -435,36 +460,6 @@ public class IoTConsensus implements IConsensus {
     return new ArrayList<>(stateMachineMap.keySet());
   }
 
-  @Override
-  public List<ConsensusGroupId> getAllConsensusGroupIdsWithoutStarting() {
-    return getConsensusGroupIdsFromDir(storageDir, logger);
-  }
-
-  public static List<ConsensusGroupId> getConsensusGroupIdsFromDir(File 
storageDir, Logger logger) {
-    if (!storageDir.exists()) {
-      return Collections.emptyList();
-    }
-    List<ConsensusGroupId> consensusGroupIds = new ArrayList<>();
-    try (DirectoryStream<Path> stream = 
Files.newDirectoryStream(storageDir.toPath())) {
-      for (Path path : stream) {
-        try {
-          String[] items = path.getFileName().toString().split("_");
-          ConsensusGroupId consensusGroupId =
-              ConsensusGroupId.Factory.create(
-                  Integer.parseInt(items[0]), Integer.parseInt(items[1]));
-          consensusGroupIds.add(consensusGroupId);
-        } catch (Exception e) {
-          logger.info(
-              "The directory {} is not a group directory;" + " ignoring it. ",
-              path.getFileName().toString());
-        }
-      }
-    } catch (IOException e) {
-      logger.error("Failed to get all consensus group ids from disk", e);
-    }
-    return consensusGroupIds;
-  }
-
   @Override
   public String getRegionDirFromConsensusGroupId(ConsensusGroupId groupId) {
     return buildPeerDir(storageDir, groupId);
@@ -483,10 +478,16 @@ public class IoTConsensus implements IConsensus {
         
.init(config.getReplication().getRegionMigrationSpeedLimitBytesPerSecond());
   }
 
+  @Override
+  public void recordCorrectPeerListBeforeStarting(
+      Map<ConsensusGroupId, List<Peer>> correctPeerList) {
+    logger.info("Record correct peer list: {}", correctPeerList);
+    this.correctPeerListBeforeStart = correctPeerList;
+  }
+
   @Override
   public void resetPeerList(ConsensusGroupId groupId, List<Peer> correctPeers)
       throws ConsensusException {
-    logger.info("[RESET PEER LIST] Start to reset peer list to {}", 
correctPeers);
     IoTConsensusServerImpl impl =
         Optional.ofNullable(stateMachineMap.get(groupId))
             .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
@@ -501,27 +502,37 @@ public class IoTConsensus implements IConsensus {
     }
 
     synchronized (impl) {
+      // remove invalid peer
       ImmutableList<Peer> currentMembers = 
ImmutableList.copyOf(impl.getConfiguration());
       String previousPeerListStr = currentMembers.toString();
       for (Peer peer : currentMembers) {
         if (!correctPeers.contains(peer)) {
           if (!impl.removeSyncLogChannel(peer)) {
-            logger.error(
-                "[RESET PEER LIST] Failed to remove peer {}'s sync log channel 
from group {}",
-                peer,
-                groupId);
+            logger.error("[RESET PEER LIST] Failed to remove sync channel 
with: {}", peer);
+          } else {
+            logger.info("[RESET PEER LIST] Remove sync channel with: {}", 
peer);
           }
         }
       }
-      logger.info(
-          "[RESET PEER LIST] Local peer list has been reset: {} -> {}",
-          previousPeerListStr,
-          impl.getConfiguration());
+      // add correct peer
       for (Peer peer : correctPeers) {
         if (!impl.getConfiguration().contains(peer)) {
-          logger.warn("[RESET PEER LIST] \"Correct peer\" {} is not in local 
peer list", peer);
+          impl.buildSyncLogChannel(peer);
+          logger.info("[RESET PEER LIST] Build sync channel with: {}", peer);
         }
       }
+      // show result
+      String newPeerListStr = impl.getConfiguration().toString();
+      if (!previousPeerListStr.equals(newPeerListStr)) {
+        logger.info(
+            "[RESET PEER LIST] Local peer list has been reset: {} -> {}",
+            previousPeerListStr,
+            newPeerListStr);
+      } else {
+        logger.info(
+            "[RESET PEER LIST] The current peer list is correct, nothing need 
to be reset: {}",
+            previousPeerListStr);
+      }
     }
   }
 
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 8ea522aea27..1a32248853a 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -645,17 +645,12 @@ public class IoTConsensusServerImpl {
     return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
   }
 
-  /**
-   * build SyncLog channel with safeIndex as the default initial sync index.
-   *
-   * @throws ConsensusGroupModifyPeerException
-   */
-  public void buildSyncLogChannel(Peer targetPeer) throws 
ConsensusGroupModifyPeerException {
+  /** build SyncLog channel with safeIndex as the default initial sync index. 
*/
+  public void buildSyncLogChannel(Peer targetPeer) {
     buildSyncLogChannel(targetPeer, getMinSyncIndex());
   }
 
-  public void buildSyncLogChannel(Peer targetPeer, long initialSyncIndex)
-      throws ConsensusGroupModifyPeerException {
+  public void buildSyncLogChannel(Peer targetPeer, long initialSyncIndex) {
     KillPoint.setKillPoint(DataNodeKillPoints.ORIGINAL_ADD_PEER_DONE);
     // step 1, build sync channel in LogDispatcher
     logger.info(
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
index a76b0e97b06..dc88d8eb98b 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
@@ -185,13 +185,8 @@ public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Ifa
       return new TBuildSyncLogChannelRes(status);
     }
     TSStatus responseStatus;
-    try {
-      impl.buildSyncLogChannel(new Peer(groupId, req.nodeId, req.endPoint));
-      responseStatus = new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-    } catch (ConsensusGroupModifyPeerException e) {
-      responseStatus = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
-      responseStatus.setMessage(e.getMessage());
-    }
+    impl.buildSyncLogChannel(new Peer(groupId, req.nodeId, req.endPoint));
+    responseStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     return new TBuildSyncLogChannelRes(responseStatus);
   }
 
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
index ce5acf73cc3..5328a25f9dc 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
@@ -69,6 +69,7 @@ import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -77,10 +78,9 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
-import static 
org.apache.iotdb.consensus.iot.IoTConsensus.getConsensusGroupIdsFromDir;
-
 public class PipeConsensus implements IConsensus {
   private static final String CONSENSUS_PIPE_GUARDIAN_TASK_ID = 
"consensus_pipe_guardian";
   private static final String CLASS_NAME = PipeConsensus.class.getSimpleName();
@@ -102,6 +102,7 @@ public class PipeConsensus implements IConsensus {
   private final ConsensusPipeGuardian consensusPipeGuardian;
   private final IClientManager<TEndPoint, AsyncPipeConsensusServiceClient> 
asyncClientManager;
   private final IClientManager<TEndPoint, SyncPipeConsensusServiceClient> 
syncClientManager;
+  private Map<ConsensusGroupId, List<Peer>> correctPeerListBeforeStart = null;
 
   public PipeConsensus(ConsensusConfig config, IStateMachine.Registry 
registry) {
     this.thisNode = config.getThisNodeEndPoint();
@@ -146,33 +147,64 @@ public class PipeConsensus implements IConsensus {
       }
     } else {
       // asynchronously recover, retry logic is implemented at 
PipeConsensusImpl
-      CompletableFuture.runAsync(
-              () -> {
-                try (DirectoryStream<Path> stream = 
Files.newDirectoryStream(storageDir.toPath())) {
-                  for (Path path : stream) {
-                    ConsensusGroupId consensusGroupId =
-                        parsePeerFileName(path.getFileName().toString());
-                    PipeConsensusServerImpl consensus =
-                        new PipeConsensusServerImpl(
-                            new Peer(consensusGroupId, thisNodeId, thisNode),
-                            registry.apply(consensusGroupId),
-                            path.toString(),
-                            new ArrayList<>(),
-                            config,
-                            consensusPipeManager,
-                            syncClientManager);
-                    stateMachineMap.put(consensusGroupId, consensus);
-                    consensus.start(true);
-                  }
-                } catch (Exception e) {
-                  LOGGER.error("Failed to recover consensus from {}", 
storageDir, e);
-                }
-              })
-          .exceptionally(
-              e -> {
-                LOGGER.error("Failed to recover consensus from {}", 
storageDir, e);
-                return null;
-              });
+      CompletableFuture<Void> future =
+          CompletableFuture.runAsync(
+                  () -> {
+                    try (DirectoryStream<Path> stream =
+                        Files.newDirectoryStream(storageDir.toPath())) {
+                      for (Path path : stream) {
+                        ConsensusGroupId consensusGroupId =
+                            parsePeerFileName(path.getFileName().toString());
+                        PipeConsensusServerImpl consensus =
+                            new PipeConsensusServerImpl(
+                                new Peer(consensusGroupId, thisNodeId, 
thisNode),
+                                registry.apply(consensusGroupId),
+                                path.toString(),
+                                new ArrayList<>(),
+                                config,
+                                consensusPipeManager,
+                                syncClientManager);
+                        stateMachineMap.put(consensusGroupId, consensus);
+                        checkPeerListAndStartIfEligible(consensusGroupId, 
consensus);
+                      }
+                    } catch (Exception e) {
+                      LOGGER.error("Failed to recover consensus from {}", 
storageDir, e);
+                    }
+                  })
+              .exceptionally(
+                  e -> {
+                    LOGGER.error("Failed to recover consensus from {}", 
storageDir, e);
+                    return null;
+                  });
+    }
+  }
+
+  private void checkPeerListAndStartIfEligible(
+      ConsensusGroupId consensusGroupId, PipeConsensusServerImpl consensus) 
throws IOException {
+    BiConsumer<ConsensusGroupId, List<Peer>> resetPeerListWithoutThrow =
+        (dataRegionId, peers) -> {
+          try {
+            resetPeerList(dataRegionId, peers);
+          } catch (ConsensusGroupNotExistException ignore) {
+
+          } catch (Exception e) {
+            LOGGER.warn("Failed to reset peer list while start", e);
+          }
+        };
+
+    if (correctPeerListBeforeStart != null) {
+      if (correctPeerListBeforeStart.containsKey(consensusGroupId)) {
+        // make peers which are in list correct
+        resetPeerListWithoutThrow.accept(
+            consensusGroupId, 
correctPeerListBeforeStart.get(consensusGroupId));
+        consensus.start(true);
+      } else {
+        // clear peers which are not in the list
+        resetPeerListWithoutThrow.accept(consensusGroupId, 
Collections.emptyList());
+      }
+
+    } else {
+      consensus.start(true);
     }
   }
 
@@ -412,13 +444,20 @@ public class PipeConsensus implements IConsensus {
     KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.FINISH);
   }
 
+  @Override
+  public void recordCorrectPeerListBeforeStarting(
+      Map<ConsensusGroupId, List<Peer>> correctPeerList) {
+    LOGGER.info("Record correct peer list: {}", correctPeerList);
+    this.correctPeerListBeforeStart = correctPeerList;
+  }
+
   @Override
   public void resetPeerList(ConsensusGroupId groupId, List<Peer> correctPeers)
       throws ConsensusException {
-    LOGGER.info("[RESET PEER LIST] Start to reset peer list to {}", 
correctPeers);
     PipeConsensusServerImpl impl =
         Optional.ofNullable(stateMachineMap.get(groupId))
             .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+
     if (!correctPeers.contains(new Peer(groupId, thisNodeId, thisNode))) {
       LOGGER.warn(
           "[RESET PEER LIST] Local peer is not in the correct configuration, 
delete local peer {}",
@@ -426,30 +465,43 @@ public class PipeConsensus implements IConsensus {
       deleteLocalPeer(groupId);
       return;
     }
+
     ImmutableList<Peer> currentPeers = ImmutableList.copyOf(impl.getPeers());
     String previousPeerListStr = impl.getPeers().toString();
+    // remove invalid peer
     for (Peer peer : currentPeers) {
       if (!correctPeers.contains(peer)) {
         try {
           impl.dropConsensusPipeToTargetPeer(peer);
+          LOGGER.info("[RESET PEER LIST] Remove sync channel with: {}", peer);
         } catch (ConsensusGroupModifyPeerException e) {
-          LOGGER.error(
-              "[RESET PEER LIST] Failed to remove peer {}'s consensus pipe 
from group {}",
-              peer,
-              groupId,
-              e);
+          LOGGER.error("[RESET PEER LIST] Failed to remove sync channel with: 
{}", peer, e);
         }
       }
     }
-    LOGGER.info(
-        "[RESET PEER LIST] Local peer list has been reset: {} -> {}",
-        previousPeerListStr,
-        impl.getPeers());
+    // add correct peer
     for (Peer peer : correctPeers) {
-      if (!impl.containsPeer(peer)) {
-        LOGGER.warn("[RESET PEER LIST] \"Correct peer\" {} is not in local 
peer list", peer);
+      if (!impl.containsPeer(peer) && peer.getNodeId() != this.thisNodeId) {
+        try {
+          impl.createConsensusPipeToTargetPeer(peer, false);
+          LOGGER.info("[RESET PEER LIST] Build sync channel with: {}", peer);
+        } catch (ConsensusGroupModifyPeerException e) {
+          LOGGER.warn("[RESET PEER LIST] Failed to build sync channel with: 
{}", peer, e);
+        }
       }
     }
+    // show result
+    String currentPeerListStr = impl.getPeers().toString();
+    if (!previousPeerListStr.equals(currentPeerListStr)) {
+      LOGGER.info(
+          "[RESET PEER LIST] Local peer list has been reset: {} -> {}",
+          previousPeerListStr,
+          impl.getPeers());
+    } else {
+      LOGGER.info(
+          "[RESET PEER LIST] The current peer list is correct, nothing need to 
be reset: {}",
+          previousPeerListStr);
+    }
   }
 
   @Override
@@ -500,11 +552,6 @@ public class PipeConsensus implements IConsensus {
     return new ArrayList<>(stateMachineMap.keySet());
   }
 
-  @Override
-  public List<ConsensusGroupId> getAllConsensusGroupIdsWithoutStarting() {
-    return getConsensusGroupIdsFromDir(storageDir, LOGGER);
-  }
-
   @Override
   public String getRegionDirFromConsensusGroupId(ConsensusGroupId groupId) {
     return getPeerDir(groupId);
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 9b4c3274e6a..9ae8dcd6a26 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -94,19 +94,18 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.file.DirectoryStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.UUID;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
 import java.util.function.BooleanSupplier;
 import java.util.stream.Collectors;
 
@@ -146,6 +145,8 @@ class RatisConsensus implements IConsensus {
   private final RatisMetricSet ratisMetricSet;
   private final TConsensusGroupType consensusGroupType;
 
+  private Map<ConsensusGroupId, List<Peer>> correctPeerListBeforeStart = null;
+
   private final ConcurrentHashMap<ConsensusGroupId, AtomicBoolean> 
canServeStaleRead;
 
   public RatisConsensus(ConsensusConfig config, IStateMachine.Registry 
registry) {
@@ -235,6 +236,27 @@ class RatisConsensus implements IConsensus {
     MetricService.getInstance().addMetricSet(this.ratisMetricSet);
     server.get().start();
     registerAndStartDiskGuardian();
+
+    if (correctPeerListBeforeStart != null) {
+      BiConsumer<ConsensusGroupId, List<Peer>> resetPeerListWithoutThrow =
+          (consensusGroupId, peers) -> {
+            try {
+              resetPeerList(consensusGroupId, peers);
+            } catch (ConsensusGroupNotExistException ignore) {
+
+            } catch (Exception e) {
+              logger.warn("Failed to reset peer list while start", e);
+            }
+          };
+      // make peers which are in list correct
+      correctPeerListBeforeStart.forEach(resetPeerListWithoutThrow);
+      // clear peers which are not in the list
+      getAllConsensusGroupIds().stream()
+          .filter(consensusGroupId -> 
!correctPeerListBeforeStart.containsKey(consensusGroupId))
+          .forEach(
+              consensusGroupId ->
+                  resetPeerListWithoutThrow.accept(consensusGroupId, 
Collections.emptyList()));
+    }
   }
 
   @Override
@@ -591,10 +613,16 @@ class RatisConsensus implements IConsensus {
     sendReconfiguration(RaftGroup.valueOf(raftGroupId, newConfig));
   }
 
+  @Override
+  public void recordCorrectPeerListBeforeStarting(
+      Map<ConsensusGroupId, List<Peer>> correctPeerList) {
+    logger.info("Record correct peer list: {}", correctPeerList);
+    this.correctPeerListBeforeStart = correctPeerList;
+  }
+
   @Override
   public void resetPeerList(ConsensusGroupId groupId, List<Peer> correctPeers)
       throws ConsensusException {
-    logger.info("[RESET PEER LIST] Start to reset peer list to {}", 
correctPeers);
     final RaftGroupId raftGroupId = 
Utils.fromConsensusGroupIdToRaftGroupId(groupId);
     final RaftGroup group = getGroupInfo(raftGroupId);
 
@@ -610,7 +638,7 @@ class RatisConsensus implements IConsensus {
                         peer.getNodeId(), peer.getEndpoint(), 
DEFAULT_PRIORITY))
             .anyMatch(
                 raftPeer ->
-                    myself.getId() == raftPeer.getId()
+                    myself.getId().equals(raftPeer.getId())
                         && myself.getAddress().equals(raftPeer.getAddress()));
     if (!myselfInCorrectPeers) {
       logger.info(
@@ -624,6 +652,20 @@ class RatisConsensus implements IConsensus {
         Utils.fromPeersAndPriorityToRaftPeers(correctPeers, DEFAULT_PRIORITY);
     final RaftGroup newGroup = RaftGroup.valueOf(raftGroupId, newGroupPeers);
 
+    Set<RaftPeer> localRaftPeerSet = new HashSet<>(group.getPeers());
+    Set<RaftPeer> correctRaftPeerSet = new HashSet<>(newGroupPeers);
+    if (localRaftPeerSet.equals(correctRaftPeerSet)) {
+      // configurations are the same
+      logger.info(
+          "[RESET PEER LIST] The current peer list is correct, nothing need to 
be reset: {}",
+          localRaftPeerSet);
+      return;
+    }
+
+    logger.info(
+        "[RESET PEER LIST] Peer list will be reset from {} to {}",
+        localRaftPeerSet,
+        correctRaftPeerSet);
     RaftClientReply reply = sendReconfiguration(newGroup);
     if (reply.isSuccess()) {
       logger.info("[RESET PEER LIST] Peer list has been reset to {}", 
newGroupPeers);
@@ -791,30 +833,6 @@ class RatisConsensus implements IConsensus {
     }
   }
 
-  @Override
-  public List<ConsensusGroupId> getAllConsensusGroupIdsWithoutStarting() {
-    if (!storageDir.exists()) {
-      return Collections.emptyList();
-    }
-    List<ConsensusGroupId> consensusGroupIds = new ArrayList<>();
-    try (DirectoryStream<Path> stream = 
Files.newDirectoryStream(storageDir.toPath())) {
-      for (Path path : stream) {
-        try {
-          RaftGroupId raftGroupId =
-              
RaftGroupId.valueOf(UUID.fromString(path.getFileName().toString()));
-          
consensusGroupIds.add(Utils.fromRaftGroupIdToConsensusGroupId(raftGroupId));
-        } catch (Exception e) {
-          logger.info(
-              "The directory {} is not a group directory;" + " ignoring it. ",
-              path.getFileName().toString());
-        }
-      }
-    } catch (IOException e) {
-      logger.error("Failed to get all consensus group ids from disk", e);
-    }
-    return consensusGroupIds;
-  }
-
   @Override
   public String getRegionDirFromConsensusGroupId(ConsensusGroupId 
consensusGroupId) {
     RaftGroupId raftGroupId = 
Utils.fromConsensusGroupIdToRaftGroupId(consensusGroupId);
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java
index 19258309028..5800e76b008 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java
@@ -38,7 +38,6 @@ import 
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
 import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
 import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
 import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
-import org.apache.iotdb.consensus.iot.IoTConsensus;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
@@ -211,6 +210,12 @@ class SimpleConsensus implements IConsensus {
     throw new ConsensusException("SimpleConsensus does not support membership 
changes");
   }
 
+  @Override
+  public void recordCorrectPeerListBeforeStarting(
+      Map<ConsensusGroupId, List<Peer>> correctPeerList) {
+    logger.info("SimpleConsensus will do nothing when calling 
recordCorrectPeerListBeforeStarting");
+  }
+
   @Override
   public void transferLeader(ConsensusGroupId groupId, Peer newLeader) throws 
ConsensusException {
     throw new ConsensusException("SimpleConsensus does not support leader 
transfer");
@@ -254,11 +259,6 @@ class SimpleConsensus implements IConsensus {
     return new ArrayList<>(stateMachineMap.keySet());
   }
 
-  @Override
-  public List<ConsensusGroupId> getAllConsensusGroupIdsWithoutStarting() {
-    return IoTConsensus.getConsensusGroupIdsFromDir(storageDir, logger);
-  }
-
   @Override
   public String getRegionDirFromConsensusGroupId(ConsensusGroupId groupId) {
     return buildPeerDir(groupId);
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
index 8072ab10066..3ec7769f2ac 100644
--- 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
@@ -324,10 +324,6 @@ public class ReplicateTest {
     for (int i = 0; i < CHECK_POINT_GAP; i++) {
       servers.get(0).write(gid, new TestEntry(i, peers.get(0)));
     }
-    List<ConsensusGroupId> ids = 
servers.get(0).getAllConsensusGroupIdsWithoutStarting();
-
-    Assert.assertEquals(1, ids.size());
-    Assert.assertEquals(gid, ids.get(0));
 
     String regionDir = servers.get(0).getRegionDirFromConsensusGroupId(gid);
     try {
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
index d7675084680..9fcbf9e03f1 100644
--- 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.consensus.ConsensusFactory;
-import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.config.ConsensusConfig;
 import org.apache.iotdb.consensus.exception.ConsensusException;
@@ -42,7 +41,11 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 import static org.junit.Assert.assertTrue;
 
@@ -52,32 +55,34 @@ public class StabilityTest {
 
   private final File storageDir = new File("target" + java.io.File.separator + 
"stability");
 
-  private IConsensus consensusImpl;
+  private IoTConsensus consensusImpl;
 
   private final int basePort = 6667;
 
   public void constructConsensus() throws IOException {
     consensusImpl =
-        ConsensusFactory.getConsensusImpl(
-                ConsensusFactory.IOT_CONSENSUS,
-                ConsensusConfig.newBuilder()
-                    .setThisNodeId(1)
-                    .setThisNode(new TEndPoint("0.0.0.0", basePort))
-                    .setStorageDir(storageDir.getAbsolutePath())
-                    .setConsensusGroupType(TConsensusGroupType.DataRegion)
-                    .build(),
-                gid -> new TestStateMachine())
-            .orElseThrow(
-                () ->
-                    new IllegalArgumentException(
-                        String.format(
-                            ConsensusFactory.CONSTRUCT_FAILED_MSG,
-                            ConsensusFactory.IOT_CONSENSUS)));
+        (IoTConsensus)
+            ConsensusFactory.getConsensusImpl(
+                    ConsensusFactory.IOT_CONSENSUS,
+                    ConsensusConfig.newBuilder()
+                        .setThisNodeId(1)
+                        .setThisNode(new TEndPoint("0.0.0.0", basePort))
+                        .setStorageDir(storageDir.getAbsolutePath())
+                        .setConsensusGroupType(TConsensusGroupType.DataRegion)
+                        .build(),
+                    gid -> new TestStateMachine())
+                .orElseThrow(
+                    () ->
+                        new IllegalArgumentException(
+                            String.format(
+                                ConsensusFactory.CONSTRUCT_FAILED_MSG,
+                                ConsensusFactory.IOT_CONSENSUS)));
     consensusImpl.start();
   }
 
   @Before
   public void setUp() throws Exception {
+    FileUtils.deleteFully(storageDir);
     constructConsensus();
   }
 
@@ -210,4 +215,47 @@ public class StabilityTest {
     Assert.assertNotEquals(versionFiles1[0].getName(), 
versionFiles2[0].getName());
     consensusImpl.deleteLocalPeer(dataRegionId);
   }
+
+  @Test
+  public void recordAndResetPeerListTest() throws Exception {
+    try {
+      Assert.assertEquals(0, consensusImpl.getReplicationNum(dataRegionId));
+      consensusImpl.createLocalPeer(
+          dataRegionId,
+          Collections.singletonList(new Peer(dataRegionId, 1, new 
TEndPoint("0.0.0.0", basePort))));
+      Assert.assertEquals(1, consensusImpl.getReplicationNum(dataRegionId));
+      Assert.assertEquals(1, 
consensusImpl.getImpl(dataRegionId).getConfiguration().size());
+    } catch (ConsensusException e) {
+      Assert.fail();
+    }
+    consensusImpl.stop();
+
+    // test add sync channel
+    Map<ConsensusGroupId, List<Peer>> correctPeers = new HashMap<>();
+    List<Peer> peerList1And2 = new ArrayList<>();
+    peerList1And2.add(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 
basePort)));
+    peerList1And2.add(new Peer(dataRegionId, 2, new TEndPoint("0.0.0.0", 
basePort)));
+    correctPeers.put(dataRegionId, peerList1And2);
+    consensusImpl.recordCorrectPeerListBeforeStarting(correctPeers);
+    consensusImpl.start();
+    Assert.assertEquals(2, 
consensusImpl.getImpl(dataRegionId).getConfiguration().size());
+    consensusImpl.stop();
+
+    // test remove sync channel
+    List<Peer> peerList1 = new ArrayList<>();
+    peerList1.add(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 
basePort)));
+    correctPeers.put(dataRegionId, peerList1);
+    consensusImpl.recordCorrectPeerListBeforeStarting(correctPeers);
+    consensusImpl.start();
+    Assert.assertEquals(1, 
consensusImpl.getImpl(dataRegionId).getConfiguration().size());
+    consensusImpl.stop();
+
+    // test remove invalid peer
+    List<Peer> peerList2 = new ArrayList<>();
+    peerList2.add(new Peer(dataRegionId, 2, new TEndPoint("0.0.0.0", 
basePort)));
+    correctPeers.put(dataRegionId, peerList2);
+    consensusImpl.recordCorrectPeerListBeforeStarting(correctPeers);
+    consensusImpl.start();
+    Assert.assertNull(consensusImpl.getImpl(dataRegionId));
+  }
 }
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
index 685f580e4ba..842ea78af72 100644
--- 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
@@ -302,10 +302,6 @@ public class RatisConsensusTest {
     servers.get(0).createLocalPeer(gid, peers.subList(0, 1));
     doConsensus(0, 10, 10);
 
-    List<ConsensusGroupId> ids = 
servers.get(0).getAllConsensusGroupIdsWithoutStarting();
-    Assert.assertEquals(1, ids.size());
-    Assert.assertEquals(gid, ids.get(0));
-
     String regionDir = servers.get(0).getRegionDirFromConsensusGroupId(gid);
     try {
       File regionDirFile = new File(regionDir);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ConsensusPipeDataNodeDispatcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ConsensusPipeDataNodeDispatcher.java
index 23db3caa0d7..3e56a57e3ca 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ConsensusPipeDataNodeDispatcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ConsensusPipeDataNodeDispatcher.java
@@ -37,6 +37,9 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeRPCMessageConstant.PIPE_ALREADY_EXIST_MSG;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeRPCMessageConstant.PIPE_NOT_EXIST_MSG;
+
 public class ConsensusPipeDataNodeDispatcher implements 
ConsensusPipeDispatcher {
   private static final Logger LOGGER =
       LoggerFactory.getLogger(ConsensusPipeDataNodeDispatcher.class);
@@ -64,6 +67,10 @@ public class ConsensusPipeDataNodeDispatcher implements 
ConsensusPipeDispatcher
       TSStatus status = configNodeClient.createPipe(req);
       if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != status.getCode()) {
         LOGGER.warn("Failed to create consensus pipe-{}, status: {}", 
pipeName, status);
+        // ignore idempotence logic
+        if (status.getMessage().contains(PIPE_ALREADY_EXIST_MSG)) {
+          return;
+        }
         throw new PipeException(status.getMessage());
       }
     } catch (Exception e) {
@@ -111,6 +118,10 @@ public class ConsensusPipeDataNodeDispatcher implements 
ConsensusPipeDispatcher
       final TSStatus status = configNodeClient.dropPipe(pipeName.toString());
       if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != status.getCode()) {
         LOGGER.warn("Failed to drop consensus pipe-{}, status: {}", pipeName, 
status);
+        // ignore idempotence logic
+        if (status.getMessage().contains(PIPE_NOT_EXIST_MSG)) {
+          return;
+        }
         throw new PipeException(status.getMessage());
       }
     } catch (Exception e) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
index fc2100d45d1..9a2ff29502a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -21,12 +21,12 @@ package org.apache.iotdb.db.service;
 
 import org.apache.iotdb.common.rpc.thrift.Model;
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TNodeResource;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.ServerCommandLine;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.concurrent.IoTDBDefaultThreadExceptionHandler;
@@ -65,6 +65,7 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TRuntimeConfiguration;
 import org.apache.iotdb.confignode.rpc.thrift.TSystemConfigurationResp;
 import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.db.conf.DataNodeStartupCheck;
 import org.apache.iotdb.db.conf.DataNodeSystemPropertiesHandler;
 import org.apache.iotdb.db.conf.IoTDBConfig;
@@ -126,6 +127,7 @@ import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -560,11 +562,15 @@ public class DataNode extends ServerCommandLine 
implements DataNodeMBean {
     }
   }
 
-  private void removeInvalidRegions(List<ConsensusGroupId> 
dataNodeConsensusGroupIds) {
-    removeInvalidConsensusDataRegions(dataNodeConsensusGroupIds);
+  private void makeRegionsCorrect(List<TRegionReplicaSet> correctRegions) {
+    List<ConsensusGroupId> dataNodeConsensusGroupIds =
+        correctRegions.stream()
+            .map(TRegionReplicaSet::getRegionId)
+            .map(ConsensusGroupId.Factory::createFromTConsensusGroupId)
+            .collect(Collectors.toList());
     removeInvalidDataRegions(dataNodeConsensusGroupIds);
-    removeInvalidConsensusSchemaRegions(dataNodeConsensusGroupIds);
     removeInvalidSchemaRegions(dataNodeConsensusGroupIds);
+    prepareToResetDataRegionPeerList(correctRegions);
   }
 
   private void removeInvalidDataRegions(List<ConsensusGroupId> 
dataNodeConsensusGroupIds) {
@@ -581,24 +587,6 @@ public class DataNode extends ServerCommandLine implements 
DataNodeMBean {
         });
   }
 
-  private void removeInvalidConsensusDataRegions(List<ConsensusGroupId> 
dataNodeConsensusGroupIds) {
-    List<ConsensusGroupId> invalidDataRegionConsensusGroupIds =
-        
DataRegionConsensusImpl.getInstance().getAllConsensusGroupIdsWithoutStarting().stream()
-            .filter(consensusGroupId -> 
!dataNodeConsensusGroupIds.contains(consensusGroupId))
-            .collect(Collectors.toList());
-    if (invalidDataRegionConsensusGroupIds.isEmpty()) {
-      return;
-    }
-    logger.info("Remove invalid dataRegion directories... {}", 
invalidDataRegionConsensusGroupIds);
-    for (ConsensusGroupId consensusGroupId : 
invalidDataRegionConsensusGroupIds) {
-      File oldDir =
-          new File(
-              DataRegionConsensusImpl.getInstance()
-                  .getRegionDirFromConsensusGroupId(consensusGroupId));
-      removeDir(oldDir);
-    }
-  }
-
   private void removeInvalidSchemaRegions(List<ConsensusGroupId> 
schemaConsensusGroupIds) {
     Map<String, List<SchemaRegionId>> localSchemaRegionInfo =
         SchemaEngine.getLocalSchemaRegionInfo();
@@ -622,26 +610,6 @@ public class DataNode extends ServerCommandLine implements 
DataNodeMBean {
         });
   }
 
-  private void removeInvalidConsensusSchemaRegions(
-      List<ConsensusGroupId> dataNodeConsensusGroupIds) {
-    List<ConsensusGroupId> invalidSchemaRegionConsensusGroupIds =
-        
SchemaRegionConsensusImpl.getInstance().getAllConsensusGroupIdsWithoutStarting().stream()
-            .filter(consensusGroupId -> 
!dataNodeConsensusGroupIds.contains(consensusGroupId))
-            .collect(Collectors.toList());
-    if (invalidSchemaRegionConsensusGroupIds.isEmpty()) {
-      return;
-    }
-    logger.info(
-        "Remove invalid schemaRegion directories... {}", 
invalidSchemaRegionConsensusGroupIds);
-    for (ConsensusGroupId consensusGroupId : 
invalidSchemaRegionConsensusGroupIds) {
-      File oldDir =
-          new File(
-              SchemaRegionConsensusImpl.getInstance()
-                  .getRegionDirFromConsensusGroupId(consensusGroupId));
-      removeDir(oldDir);
-    }
-  }
-
   private void removeInvalidSchemaDir(String database, SchemaRegionId 
schemaRegionId) {
     String systemSchemaDir =
         config.getSystemDir() + File.separator + database + File.separator + 
schemaRegionId.getId();
@@ -657,6 +625,39 @@ public class DataNode extends ServerCommandLine implements 
DataNodeMBean {
     }
   }
 
+  private void prepareToResetDataRegionPeerList(List<TRegionReplicaSet> 
correctedRegions) {
+    Map<ConsensusGroupId, List<Peer>> correctPeerListForDataRegion = new 
HashMap<>();
+    Map<ConsensusGroupId, List<Peer>> correctPeerListForSchemaRegion = new 
HashMap<>();
+    for (TRegionReplicaSet regionReplicaSet : correctedRegions) {
+      ConsensusGroupId consensusGroupId =
+          
ConsensusGroupId.Factory.createFromTConsensusGroupId(regionReplicaSet.regionId);
+      List<Peer> peerList = new ArrayList<>();
+      if (consensusGroupId.getType() == TConsensusGroupType.DataRegion) {
+        for (TDataNodeLocation dataNodeLocation : 
regionReplicaSet.getDataNodeLocations()) {
+          peerList.add(
+              new Peer(
+                  consensusGroupId,
+                  dataNodeLocation.getDataNodeId(),
+                  dataNodeLocation.getDataRegionConsensusEndPoint()));
+        }
+        correctPeerListForDataRegion.put(consensusGroupId, peerList);
+      } else if (consensusGroupId.getType() == 
TConsensusGroupType.SchemaRegion) {
+        for (TDataNodeLocation dataNodeLocation : 
regionReplicaSet.getDataNodeLocations()) {
+          peerList.add(
+              new Peer(
+                  consensusGroupId,
+                  dataNodeLocation.getDataNodeId(),
+                  dataNodeLocation.getSchemaRegionConsensusEndPoint()));
+        }
+        correctPeerListForSchemaRegion.put(consensusGroupId, peerList);
+      }
+    }
+    DataRegionConsensusImpl.getInstance()
+        .recordCorrectPeerListBeforeStarting(correctPeerListForDataRegion);
+    SchemaRegionConsensusImpl.getInstance()
+        .recordCorrectPeerListBeforeStarting(correctPeerListForSchemaRegion);
+  }
+
   private void sendRestartRequestToConfigNode() throws StartupException {
     logger.info("Sending restart request to ConfigNode-leader...");
     long startTime = System.currentTimeMillis();
@@ -709,11 +710,7 @@ public class DataNode extends ServerCommandLine implements 
DataNodeMBean {
           config.getClusterName(),
           (endTime - startTime));
 
-      List<TConsensusGroupId> consensusGroupIds = 
dataNodeRestartResp.getConsensusGroupIds();
-      removeInvalidRegions(
-          consensusGroupIds.stream()
-              .map(ConsensusGroupId.Factory::createFromTConsensusGroupId)
-              .collect(Collectors.toList()));
+      makeRegionsCorrect(dataNodeRestartResp.getCorrectConsensusGroups());
     } else {
       /* Throw exception when restart is rejected */
       throw new StartupException(dataNodeRestartResp.getStatus().getMessage());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeRPCMessageConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeRPCMessageConstant.java
new file mode 100644
index 00000000000..7e9646e5a86
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeRPCMessageConstant.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.config.constant;
+
+public class PipeRPCMessageConstant {
+  // These two message are used in multi-modules such as pipe and IoTV2
+  public static final String PIPE_ALREADY_EXIST_MSG =
+      "the pipe with the same name has been created";
+  public static final String PIPE_NOT_EXIST_MSG = "the pipe does not exist";
+
+  private PipeRPCMessageConstant() {
+    throw new IllegalStateException("Utility class");
+  }
+}
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift 
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index 5f090003119..f2e6c140091 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -134,7 +134,7 @@ struct TDataNodeRestartResp {
   1: required common.TSStatus status
   2: required list<common.TConfigNodeLocation> configNodeList
   3: optional TRuntimeConfiguration runtimeConfiguration
-  4: optional list<common.TConsensusGroupId> consensusGroupIds
+  4: optional list<common.TRegionReplicaSet> correctConsensusGroups
 }
 
 struct TDataNodeRemoveReq {

Reply via email to