This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d46dd660a67 Region migration related work (#12246)
d46dd660a67 is described below

commit d46dd660a67086285f1095b0c53d4e021a4aa0c4
Author: Li Yu Heng <[email protected]>
AuthorDate: Sat Mar 30 10:58:47 2024 +0800

    Region migration related work (#12246)
---
 .../iotdb/it/env/cluster/env/AbstractEnv.java      |  15 +
 .../it/env/cluster/node/AbstractNodeWrapper.java   |  11 +
 .../iotdb/it/env/remote/env/RemoteServerEnv.java   |  10 +
 .../java/org/apache/iotdb/itbase/env/BaseEnv.java  |   4 +
 .../IoTDBRegionMigrateReliabilityITFramework.java} | 431 ++++++++++++---------
 .../CoordinatorRemoveRemotePeerCrashIT.java        |  51 +++
 .../IoTDBRegionMigrateDataNodeCrashIT.java         |  72 ++++
 .../pass/IoTDBRegionMigrateConfigNodeCrashIT.java  |  92 +++++
 .../pass/IoTDBRegionMigrateNormalIT.java           |  26 +-
 .../pass/IoTDBRegionMigrateOtherIT.java            |  45 +++
 .../iotdb/confignode/manager/node/NodeManager.java |   7 +
 .../impl/region/AddRegionPeerProcedure.java        |   4 +
 .../impl/region/RegionMigrateProcedure.java        |   2 +
 .../impl/region/RemoveRegionPeerProcedure.java     |   4 +
 .../apache/iotdb/consensus/iot/IoTConsensus.java   |  58 ++-
 .../consensus/iot/IoTConsensusServerImpl.java      |  69 ++--
 .../service/IoTConsensusRPCServiceProcessor.java   |   5 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  25 +-
 .../java/org/apache/iotdb/db/service/DataNode.java |  66 ++++
 .../apache/iotdb/commons/conf/CommonConfig.java    |  16 +
 .../apache/iotdb/commons/conf/IoTDBConstant.java   |   5 +
 .../org/apache/iotdb/commons/utils/FileUtils.java  |  10 -
 .../utils/{ => KillPoint}/DataNodeKillPoints.java  |  13 +-
 .../IoTConsensusRemovePeerKillPoints.java}         |  13 +-
 .../iotdb/commons/utils/KillPoint/KillPoint.java   |  80 ++++
 .../NeverTriggeredKillPoint.java}                  |  10 +-
 .../src/main/thrift/confignode.thrift              |   1 +
 27 files changed, 877 insertions(+), 268 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
index 766f2e0c617..68be2f40c51 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
@@ -85,6 +85,8 @@ public abstract class AbstractEnv implements BaseEnv {
   protected long startTime;
   protected int retryCount = 30;
   private IClientManager<TEndPoint, SyncConfigNodeIServiceClient> 
clientManager;
+  private List<String> configNodeKillPoints = new ArrayList<>();
+  private List<String> dataNodeKillPoints = new ArrayList<>();
 
   /**
    * This config object stores the properties set by developers during the 
test. It will be cleared
@@ -168,6 +170,7 @@ public abstract class AbstractEnv implements BaseEnv {
         (MppCommonConfig) clusterConfig.getConfigNodeCommonConfig(),
         (MppJVMConfig) clusterConfig.getConfigNodeJVMConfig());
     seedConfigNodeWrapper.createLogDir();
+    seedConfigNodeWrapper.setKillPoints(configNodeKillPoints);
     seedConfigNodeWrapper.start();
     String seedConfigNode = seedConfigNodeWrapper.getIpAndPortString();
     this.configNodeWrapperList.add(seedConfigNodeWrapper);
@@ -202,6 +205,7 @@ public abstract class AbstractEnv implements BaseEnv {
           (MppCommonConfig) clusterConfig.getConfigNodeCommonConfig(),
           (MppJVMConfig) clusterConfig.getConfigNodeJVMConfig());
       configNodeWrapper.createLogDir();
+      configNodeWrapper.setKillPoints(configNodeKillPoints);
       configNodesDelegate.addRequest(
           () -> {
             configNodeWrapper.start();
@@ -236,6 +240,7 @@ public abstract class AbstractEnv implements BaseEnv {
           (MppCommonConfig) clusterConfig.getDataNodeCommonConfig(),
           (MppJVMConfig) clusterConfig.getDataNodeJVMConfig());
       dataNodeWrapper.createLogDir();
+      dataNodeWrapper.setKillPoints(dataNodeKillPoints);
       dataNodesDelegate.addRequest(
           () -> {
             dataNodeWrapper.start();
@@ -1032,4 +1037,14 @@ public abstract class AbstractEnv implements BaseEnv {
       return Optional.empty();
     }
   }
+
+  @Override
+  public void registerConfigNodeKillPoints(List<String> killPoints) {
+    this.configNodeKillPoints = killPoints;
+  }
+
+  @Override
+  public void registerDataNodeKillPoints(List<String> killPoints) {
+    this.dataNodeKillPoints = killPoints;
+  }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
index 4e6372d5b8d..a517fc95829 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.it.env.cluster.node;
 
+import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.it.env.EnvFactory;
 import org.apache.iotdb.it.env.cluster.config.MppBaseConfig;
 import org.apache.iotdb.it.env.cluster.config.MppCommonConfig;
@@ -125,6 +126,7 @@ public abstract class AbstractNodeWrapper implements 
BaseNodeWrapper {
   private int nodePort;
   private int metricPort;
   private long startTime;
+  private List<String> killPoints = new ArrayList<>();
 
   /**
    * Mutable properties are always hardcoded default values to make the 
cluster be set up
@@ -453,6 +455,7 @@ public abstract class AbstractNodeWrapper implements 
BaseNodeWrapper {
               "-Xmx" + jvmConfig.getMaxHeapSize() + "m",
               "-XX:MaxDirectMemorySize=" + jvmConfig.getMaxDirectMemorySize() 
+ "m",
               "-Djdk.nio.maxCachedBufferSize=262144",
+              "-D" + IoTDBConstant.INTEGRATION_TEST_KILL_POINTS + "=" + 
killPoints.toString(),
               "-cp",
               server_node_lib_path));
       addStartCmdParams(startCmd);
@@ -640,6 +643,14 @@ public abstract class AbstractNodeWrapper implements 
BaseNodeWrapper {
     return testClassName + "_" + testMethodName;
   }
 
+  public void setKillPoints(List<String> killPoints) {
+    this.killPoints = killPoints;
+  }
+
+  private String getKillPoints() {
+    return killPoints.toString();
+  }
+
   /* Abstract methods, which must be implemented in ConfigNode and DataNode. */
   protected abstract void reloadMutableFields();
 
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
index 9dd357dbae6..eae42bd7a23 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
@@ -372,4 +372,14 @@ public class RemoteServerEnv implements BaseEnv {
   public Optional<DataNodeWrapper> dataNodeIdToWrapper(int nodeId) {
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  public void registerConfigNodeKillPoints(List<String> killPoints) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void registerDataNodeKillPoints(List<String> killPoints) {
+    throw new UnsupportedOperationException();
+  }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
index 38b2383baa8..965e2254e84 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
@@ -255,4 +255,8 @@ public interface BaseEnv {
   String getLibPath();
 
   Optional<DataNodeWrapper> dataNodeIdToWrapper(int nodeId);
+
+  void registerConfigNodeKillPoints(List<String> killPoints);
+
+  void registerDataNodeKillPoints(List<String> killPoints);
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBRegionMigrateReliabilityIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionMigrateReliabilityITFramework.java
similarity index 50%
rename from 
integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBRegionMigrateReliabilityIT.java
rename to 
integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionMigrateReliabilityITFramework.java
index 9073bcc1617..3f33c857fcd 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBRegionMigrateReliabilityIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionMigrateReliabilityITFramework.java
@@ -17,30 +17,26 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.it;
+package org.apache.iotdb.confignode.it.regionmigration;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
-import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.commons.utils.DataNodeKillPoints;
-import org.apache.iotdb.confignode.procedure.state.AddRegionPeerState;
-import org.apache.iotdb.confignode.procedure.state.RegionTransitionState;
-import org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState;
+import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
 import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.consensus.iot.IoTConsensusServerImpl;
 import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
 import org.apache.iotdb.it.env.EnvFactory;
 import org.apache.iotdb.it.env.cluster.node.AbstractNodeWrapper;
 import org.apache.iotdb.it.env.cluster.node.ConfigNodeWrapper;
 import org.apache.iotdb.itbase.exception.InconsistentDataException;
+import org.apache.iotdb.metrics.utils.SystemType;
 
 import org.awaitility.Awaitility;
 import org.awaitility.core.ConditionTimeoutException;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,6 +48,7 @@ import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -66,14 +63,15 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
-public class IoTDBRegionMigrateReliabilityIT {
+public class IoTDBRegionMigrateReliabilityITFramework {
   private static final Logger LOGGER =
-      LoggerFactory.getLogger(IoTDBRegionMigrateReliabilityIT.class);
+      LoggerFactory.getLogger(IoTDBRegionMigrateReliabilityITFramework.class);
   private static final String INSERTION =
       "INSERT INTO root.sg.d1(timestamp,speed,temperature) values(100, 10.1, 
20.7)";
   private static final String SHOW_REGIONS = "show regions";
   private static final String SHOW_DATANODES = "show datanodes";
   private static final String REGION_MIGRATE_COMMAND_FORMAT = "migrate region 
%d from %d to %d";
+  ExecutorService executorService = 
IoTDBThreadPoolFactory.newCachedThreadPool("regionMigrateIT");
 
   @Before
   public void setUp() throws Exception {
@@ -90,131 +88,59 @@ public class IoTDBRegionMigrateReliabilityIT {
     EnvFactory.getEnv().cleanClusterEnvironment();
   }
 
-  // region Normal tests
-
-  @Test
-  public void normal1C2DTest() throws Exception {
-    generalTest(1, 1, 1, 2, buildSet(), buildSet());
-  }
-
-  @Test
-  public void normal3C3DTest() throws Exception {
-    generalTest(2, 3, 3, 3, buildSet(), buildSet());
-  }
-
-  // endregion
-
-  // region ConfigNode crash tests
-  @Test
-  public void cnCrashDuringPreCheck() throws Exception {
-    generalTest(
-        1, 1, 1, 2, 
buildSet(RegionTransitionState.REGION_MIGRATE_PREPARE.toString()), buildSet());
-  }
-
-  @Test
-  public void cnCrashDuringCreatePeer() throws Exception {
-    generalTest(
-        1, 1, 1, 2, 
buildSet(AddRegionPeerState.CREATE_NEW_REGION_PEER.toString()), buildSet());
-  }
-
-  @Test
-  public void cnCrashDuringDoAddPeer() throws Exception {
-    generalTest(1, 1, 1, 2, 
buildSet(AddRegionPeerState.DO_ADD_REGION_PEER.toString()), buildSet());
-  }
-
-  @Test
-  public void cnCrashDuringUpdateCache() throws Exception {
-    generalTest(
-        1,
-        1,
-        1,
-        2,
-        buildSet(AddRegionPeerState.UPDATE_REGION_LOCATION_CACHE.toString()),
-        buildSet());
-  }
-
-  @Test
-  public void cnCrashDuringChangeRegionLeader() throws Exception {
-    generalTest(
-        1, 1, 1, 2, 
buildSet(RegionTransitionState.CHANGE_REGION_LEADER.toString()), buildSet());
-  }
-
-  @Test
-  public void cnCrashDuringRemoveRegionPeer() throws Exception {
-    generalTest(
-        1, 1, 1, 2, 
buildSet(RemoveRegionPeerState.REMOVE_REGION_PEER.toString()), buildSet());
-  }
-
-  @Test
-  public void cnCrashDuringDeleteOldRegionPeer() throws Exception {
-    generalTest(
-        1, 1, 1, 2, 
buildSet(RemoveRegionPeerState.DELETE_OLD_REGION_PEER.toString()), buildSet());
-  }
-
-  @Test
-  public void cnCrashDuringRemoveRegionLocationCache() throws Exception {
-    generalTest(
-        1,
-        1,
-        1,
-        2,
-        
buildSet(RemoveRegionPeerState.REMOVE_REGION_LOCATION_CACHE.toString()),
-        buildSet());
-  }
-
-  @Test
-  public void cnCrashTest() throws Exception {
-    KeySetView<String, Boolean> killConfigNodeKeywords = buildSet();
-    killConfigNodeKeywords.addAll(
-        Arrays.stream(AddRegionPeerState.values())
-            .map(Enum::toString)
-            .collect(Collectors.toList()));
-    killConfigNodeKeywords.addAll(
-        Arrays.stream(RemoveRegionPeerState.values())
-            .map(Enum::toString)
-            .collect(Collectors.toList()));
-    generalTest(1, 1, 1, 2, killConfigNodeKeywords, buildSet());
-  }
-
-  @Ignore
-  @Test
-  public void badKillPoint() throws Exception {
-    generalTest(1, 1, 1, 2, buildSet("??"), buildSet());
-  }
-
-  // endregion
-
-  // region coordinator DataNode crash tests
-
-  @Test
-  public void coordinatorCrashDuringRemovePeer() throws Exception {
-    generalTest(1, 1, 1, 2, buildSet(), 
buildSet(DataNodeKillPoints.CoordinatorRemovePeer.name()));
-  }
-
-  // endregion
-
-  // region original DataNode crash tests
-
-  @Test
-  public void originalCrashDuringRemovePeer() throws Exception {
-    generalTest(1, 1, 1, 2, buildSet(), 
buildSet(DataNodeKillPoints.OriginalRemovePeer.name()));
+  public void successTest(
+      final int dataReplicateFactor,
+      final int schemaReplicationFactor,
+      final int configNodeNum,
+      final int dataNodeNum,
+      KeySetView<String, Boolean> killConfigNodeKeywords,
+      KeySetView<String, Boolean> killDataNodeKeywords)
+      throws Exception {
+    generalTestWithAllOptions(
+        dataReplicateFactor,
+        schemaReplicationFactor,
+        configNodeNum,
+        dataNodeNum,
+        killConfigNodeKeywords,
+        killDataNodeKeywords,
+        true,
+        true,
+        0,
+        true);
   }
 
-  @Test
-  public void originalCrashDuringDeleteLocalPeer() throws Exception {
-    generalTest(
-        1, 1, 1, 2, buildSet(), 
buildSet(DataNodeKillPoints.OriginalDeleteOldRegionPeer.name()));
+  public void failTest(
+      final int dataReplicateFactor,
+      final int schemaReplicationFactor,
+      final int configNodeNum,
+      final int dataNodeNum,
+      KeySetView<String, Boolean> killConfigNodeKeywords,
+      KeySetView<String, Boolean> killDataNodeKeywords)
+      throws Exception {
+    generalTestWithAllOptions(
+        dataReplicateFactor,
+        schemaReplicationFactor,
+        configNodeNum,
+        dataNodeNum,
+        killConfigNodeKeywords,
+        killDataNodeKeywords,
+        true,
+        true,
+        60,
+        false);
   }
 
-  // region Helpers
-
-  public void generalTest(
+  public void generalTestWithAllOptions(
       final int dataReplicateFactor,
       final int schemaReplicationFactor,
       final int configNodeNum,
       final int dataNodeNum,
       KeySetView<String, Boolean> killConfigNodeKeywords,
-      KeySetView<String, Boolean> killDataNodeKeywords)
+      KeySetView<String, Boolean> killDataNodeKeywords,
+      final boolean checkOriginalRegionDirDeleted,
+      final boolean checkConfigurationFileDeleted,
+      final int restartTime,
+      final boolean isMigrateSuccess)
       throws Exception {
     // prepare env
     EnvFactory.getEnv()
@@ -222,24 +148,12 @@ public class IoTDBRegionMigrateReliabilityIT {
         .getCommonConfig()
         .setDataReplicationFactor(dataReplicateFactor)
         .setSchemaReplicationFactor(schemaReplicationFactor);
+    EnvFactory.getEnv().registerConfigNodeKillPoints(new 
ArrayList<>(killConfigNodeKeywords));
+    EnvFactory.getEnv().registerDataNodeKillPoints(new 
ArrayList<>(killDataNodeKeywords));
     EnvFactory.getEnv().initClusterEnvironment(configNodeNum, dataNodeNum);
 
-    ExecutorService service = 
IoTDBThreadPoolFactory.newCachedThreadPool("regionMigrateIT");
-    EnvFactory.getEnv()
-        .getConfigNodeWrapperList()
-        .forEach(
-            configNodeWrapper ->
-                service.submit(() -> nodeLogKillPoint(configNodeWrapper, 
killConfigNodeKeywords)));
-    EnvFactory.getEnv()
-        .getDataNodeWrapperList()
-        .forEach(
-            dataNodeWrapper ->
-                service.submit(() -> nodeLogKillPoint(dataNodeWrapper, 
killDataNodeKeywords)));
-
     try (final Connection connection = EnvFactory.getEnv().getConnection();
-        final Statement statement = connection.createStatement();
-        final SyncConfigNodeIServiceClient configClient =
-            (SyncConfigNodeIServiceClient) 
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+        final Statement statement = connection.createStatement()) {
 
       statement.execute(INSERTION);
 
@@ -247,38 +161,116 @@ public class IoTDBRegionMigrateReliabilityIT {
       Map<Integer, Set<Integer>> regionMap = getRegionMap(result);
 
       result = statement.executeQuery(SHOW_DATANODES);
-      Set<Integer> dataNodeSet = new HashSet<>();
+      Set<Integer> allDataNodeId = new HashSet<>();
       while (result.next()) {
-        dataNodeSet.add(result.getInt(ColumnHeaderConstant.NODE_ID));
+        allDataNodeId.add(result.getInt(ColumnHeaderConstant.NODE_ID));
       }
 
       final int selectedRegion = selectRegion(regionMap);
       final int originalDataNode = selectOriginalDataNode(regionMap, 
selectedRegion);
-      final int destDataNode = selectDestDataNode(dataNodeSet, regionMap, 
selectedRegion);
+      final int destDataNode = selectDestDataNode(allDataNodeId, regionMap, 
selectedRegion);
+
+      checkRegionFileExist(originalDataNode);
+      checkPeersExist(regionMap.get(selectedRegion), originalDataNode, 
selectedRegion);
+
+      // set kill points
+      setConfigNodeKillPoints(killConfigNodeKeywords, restartTime);
+      setDataNodeKillPoints(killDataNodeKeywords, restartTime);
 
+      // region migration start
       statement.execute(regionMigrateCommand(selectedRegion, originalDataNode, 
destDataNode));
 
-      awaitUntilSuccess(statement, selectedRegion, originalDataNode, 
destDataNode);
+      boolean success = false;
+      try {
+        awaitUntilSuccess(statement, selectedRegion, originalDataNode, 
destDataNode);
+        success = true;
+      } catch (ConditionTimeoutException e) {
+        LOGGER.error("Region migrate failed", e);
+      }
+      // Assert.assertTrue(isMigrateSuccess == success);
 
       // make sure all kill points have been triggered
-      Assert.assertTrue(killConfigNodeKeywords.isEmpty());
-      Assert.assertTrue(killDataNodeKeywords.isEmpty());
+      checkKillPointsAllTriggered(killConfigNodeKeywords);
+      checkKillPointsAllTriggered(killDataNodeKeywords);
+
+      if (!success) {
+        restartAllDataNodes();
+      }
+      System.out.println(
+          "originalDataNode: "
+              + 
EnvFactory.getEnv().dataNodeIdToWrapper(originalDataNode).get().getNodePath());
+      System.out.println(
+          "destDataNode: "
+              + 
EnvFactory.getEnv().dataNodeIdToWrapper(destDataNode).get().getNodePath());
+
+      // check if there is anything remain
+      if (checkOriginalRegionDirDeleted) {
+        if (success) {
+          checkRegionFileClear(originalDataNode);
+          checkRegionFileExist(destDataNode);
+        } else {
+          checkRegionFileClear(destDataNode);
+          checkRegionFileExist(originalDataNode);
+        }
+      }
+      if (checkConfigurationFileDeleted) {
+        if (success) {
+          checkPeersClear(allDataNodeId, originalDataNode, selectedRegion);
+        } else {
+          checkPeersClear(allDataNodeId, destDataNode, selectedRegion);
+        }
+      }
 
-      checkRegionFileClear(originalDataNode);
     } catch (InconsistentDataException ignore) {
 
     }
     LOGGER.info("test pass");
   }
 
+  private void restartAllDataNodes() {
+    EnvFactory.getEnv()
+        .getDataNodeWrapperList()
+        .parallelStream()
+        .forEach(
+            nodeWrapper -> {
+              nodeWrapper.stopForcibly();
+              nodeWrapper.start();
+            });
+  }
+
+  private void setConfigNodeKillPoints(
+      KeySetView<String, Boolean> killConfigNodeKeywords, int nodeRestartTime) 
{
+    EnvFactory.getEnv()
+        .getConfigNodeWrapperList()
+        .forEach(
+            configNodeWrapper ->
+                executorService.submit(
+                    () ->
+                        nodeLogKillPoint(
+                            configNodeWrapper, killConfigNodeKeywords, 
nodeRestartTime)));
+  }
+
+  private void setDataNodeKillPoints(
+      KeySetView<String, Boolean> killDataNodeKeywords, int nodeRestartTime) {
+    EnvFactory.getEnv()
+        .getDataNodeWrapperList()
+        .forEach(
+            dataNodeWrapper ->
+                executorService.submit(
+                    () ->
+                        nodeLogKillPoint(dataNodeWrapper, 
killDataNodeKeywords, nodeRestartTime)));
+  }
+
   /**
-   * Monitor the node's log and do something.
+   * Monitor the node's log and kill it when detect specific log.
    *
    * @param nodeWrapper Easy to understand
    * @param killNodeKeywords When detect these keywords in node's log, stop 
the node forcibly
    */
   private static void nodeLogKillPoint(
-      AbstractNodeWrapper nodeWrapper, KeySetView<String, Boolean> 
killNodeKeywords) {
+      AbstractNodeWrapper nodeWrapper,
+      KeySetView<String, Boolean> killNodeKeywords,
+      int restartTime) {
     if (killNodeKeywords.isEmpty()) {
       return;
     }
@@ -288,11 +280,29 @@ public class IoTDBRegionMigrateReliabilityIT {
     } else {
       logFileName = "log_datanode_all.log";
     }
-    ProcessBuilder builder =
-        new ProcessBuilder(
-            "tail",
-            "-f",
-            nodeWrapper.getNodePath() + File.separator + "logs" + 
File.separator + logFileName);
+    SystemType type = SystemType.getSystemType();
+    ProcessBuilder builder;
+    if (type == SystemType.LINUX || type == SystemType.MAC) {
+      builder =
+          new ProcessBuilder(
+              "tail",
+              "-f",
+              nodeWrapper.getNodePath() + File.separator + "logs" + 
File.separator + logFileName);
+    } else if (type == SystemType.WINDOWS) {
+      builder =
+          new ProcessBuilder(
+              "powershell",
+              "-Command",
+              "Get-Content "
+                  + nodeWrapper.getNodePath()
+                  + File.separator
+                  + "logs"
+                  + File.separator
+                  + logFileName
+                  + " -Wait");
+    } else {
+      throw new UnsupportedOperationException("Unsupported system type " + 
type);
+    }
     builder.redirectErrorStream(true);
 
     try {
@@ -302,29 +312,53 @@ public class IoTDBRegionMigrateReliabilityIT {
         String line;
         while ((line = reader.readLine()) != null) {
           // if trigger more than one keyword at a same time, test code may 
have mistakes
-          
Assert.assertTrue(killNodeKeywords.stream().filter(line::contains).count() <= 
1);
+          Assert.assertTrue(
+              line,
+              killNodeKeywords.stream()
+                      .map(KillPoint::addKillPointPrefix)
+                      .filter(line::contains)
+                      .count()
+                  <= 1);
           String finalLine = line;
           Optional<String> detectedKeyword =
               killNodeKeywords.stream()
-                  .filter(keyword -> finalLine.contains("breakpoint:" + 
keyword))
+                  .filter(keyword -> 
finalLine.contains(KillPoint.addKillPointPrefix(keyword)))
                   .findAny();
           if (detectedKeyword.isPresent()) {
             // each keyword only trigger once
             killNodeKeywords.remove(detectedKeyword.get());
+            LOGGER.info("Kill point is triggered: {}", detectedKeyword);
             // reboot the node
             nodeWrapper.stopForcibly();
+            if (restartTime > 0) {
+              try {
+                TimeUnit.SECONDS.sleep(restartTime);
+              } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+              }
+            }
             nodeWrapper.start();
           }
           if (killNodeKeywords.isEmpty()) {
             break;
           }
         }
+      } catch (AssertionError e) {
+        LOGGER.error("gg", e);
+        throw e;
       }
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
   }
 
+  void checkKillPointsAllTriggered(KeySetView<String, Boolean> killPoints) {
+    if (!killPoints.isEmpty()) {
+      killPoints.forEach(killPoint -> LOGGER.error("Kill point {} not 
triggered", killPoint));
+      Assert.fail("Some kill points was not triggered");
+    }
+  }
+
   private static String regionMigrateCommand(int who, int from, int to) {
     return String.format(REGION_MIGRATE_COMMAND_FORMAT, who, from, to);
   }
@@ -369,7 +403,7 @@ public class IoTDBRegionMigrateReliabilityIT {
     AtomicReference<Exception> lastException = new AtomicReference<>();
     try {
       Awaitility.await()
-          .atMost(1, TimeUnit.MINUTES)
+          .atMost(2, TimeUnit.MINUTES)
           .until(
               () -> {
                 try {
@@ -400,29 +434,78 @@ public class IoTDBRegionMigrateReliabilityIT {
     }
   }
 
+  private static void checkRegionFileExist(int dataNode) {
+    File originalRegionDir = new File(buildRegionDirPath(dataNode));
+    Assert.assertTrue(originalRegionDir.isDirectory());
+    Assert.assertNotEquals(0, 
Objects.requireNonNull(originalRegionDir.listFiles()).length);
+  }
+
   /** Check whether the original DataNode's region file has been deleted. */
   private static void checkRegionFileClear(int dataNode) {
-    String nodePath = 
EnvFactory.getEnv().dataNodeIdToWrapper(dataNode).get().getNodePath();
-    File originalRegionDir =
-        new File(
-            nodePath
-                + File.separator
-                + IoTDBConstant.DATA_FOLDER_NAME
-                + File.separator
-                + "datanode"
-                + File.separator
-                + IoTDBConstant.CONSENSUS_FOLDER_NAME
-                + File.separator
-                + "data_region");
+    File originalRegionDir = new File(buildRegionDirPath(dataNode));
     Assert.assertTrue(originalRegionDir.isDirectory());
     Assert.assertEquals(0, 
Objects.requireNonNull(originalRegionDir.listFiles()).length);
+    LOGGER.info("Original region clear");
   }
 
-  private static KeySetView<String, Boolean> buildSet(String... keywords) {
+  private static void checkPeersExist(Set<Integer> dataNodes, int 
originalDataNode, int regionId) {
+    dataNodes.forEach(targetDataNode -> checkPeerExist(targetDataNode, 
originalDataNode, regionId));
+  }
+
+  private static void checkPeerExist(int checkTargetDataNode, int 
originalDataNode, int regionId) {
+    File expectExistedFile =
+        new File(buildConfigurationDataFilePath(checkTargetDataNode, 
originalDataNode, regionId));
+    Assert.assertTrue(
+        "configuration file should exist, but it didn't: " + 
expectExistedFile.getPath(),
+        expectExistedFile.exists());
+  }
+
+  private static void checkPeersClear(Set<Integer> dataNodes, int 
originalDataNode, int regionId) {
+    dataNodes.stream()
+        .filter(dataNode -> dataNode != originalDataNode)
+        .forEach(targetDataNode -> checkPeerClear(targetDataNode, 
originalDataNode, regionId));
+    LOGGER.info("Peer clear");
+  }
+
+  private static void checkPeerClear(int checkTargetDataNode, int 
originalDataNode, int regionId) {
+    File expectDeletedFile =
+        new File(buildConfigurationDataFilePath(checkTargetDataNode, 
originalDataNode, regionId));
+    Assert.assertFalse(
+        "configuration file should be deleted, but it didn't: " + 
expectDeletedFile.getPath(),
+        expectDeletedFile.exists());
+  }
+
+  private static String buildRegionDirPath(int dataNode) {
+    String nodePath = 
EnvFactory.getEnv().dataNodeIdToWrapper(dataNode).get().getNodePath();
+    return nodePath
+        + File.separator
+        + IoTDBConstant.DATA_FOLDER_NAME
+        + File.separator
+        + "datanode"
+        + File.separator
+        + IoTDBConstant.CONSENSUS_FOLDER_NAME
+        + File.separator
+        + IoTDBConstant.DATA_REGION_FOLDER_NAME;
+  }
+
+  private static String buildConfigurationDataFilePath(
+      int localDataNodeId, int remoteDataNodeId, int regionId) {
+    String configurationDatDirName =
+        buildRegionDirPath(localDataNodeId) + File.separator + "1_" + regionId;
+    String expectDeletedFileName =
+        
IoTConsensusServerImpl.generateConfigurationDatFileName(remoteDataNodeId);
+    return configurationDatDirName + File.separator + expectDeletedFileName;
+  }
+
+  protected static KeySetView<String, Boolean> noKillPoints() {
+    return ConcurrentHashMap.newKeySet();
+  }
+
+  @SafeVarargs
+  protected static <T extends Enum<?>> KeySetView<String, Boolean> 
buildSet(T... keywords) {
     KeySetView<String, Boolean> result = ConcurrentHashMap.newKeySet();
-    result.addAll(Arrays.asList(keywords));
+    result.addAll(
+        
Arrays.stream(keywords).map(KillPoint::enumToString).collect(Collectors.toList()));
     return result;
   }
-
-  // endregion
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/notpass/datanodecrash/CoordinatorRemoveRemotePeerCrashIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/notpass/datanodecrash/CoordinatorRemoveRemotePeerCrashIT.java
new file mode 100644
index 00000000000..7533424909b
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/notpass/datanodecrash/CoordinatorRemoveRemotePeerCrashIT.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.it.regionmigration.notpass.datanodecrash;
+
+import 
org.apache.iotdb.commons.utils.KillPoint.IoTConsensusRemovePeerKillPoints;
+import 
org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionMigrateReliabilityITFramework;
+
+import org.junit.Test;
+
+public class CoordinatorRemoveRemotePeerCrashIT extends 
IoTDBRegionMigrateReliabilityITFramework {
+  private <T extends Enum<T>> void base(T... dataNodeKillPoints) throws 
Exception {
+    successTest(1, 1, 1, 2, noKillPoints(), buildSet(dataNodeKillPoints));
+  }
+
+  @Test
+  public void initCrash() throws Exception {
+    base(IoTConsensusRemovePeerKillPoints.INIT);
+  }
+
+  @Test
+  public void crashAfterNotifyPeersToRemoveSyncLogChannel() throws Exception {
+    
base(IoTConsensusRemovePeerKillPoints.AFTER_NOTIFY_PEERS_TO_REMOVE_SYNC_LOG_CHANNEL);
+  }
+
+  @Test
+  public void crashAfterInactivePeer() throws Exception {
+    base(IoTConsensusRemovePeerKillPoints.AFTER_INACTIVE_PEER);
+  }
+
+  @Test
+  public void crashAfterFinish() throws Exception {
+    base(IoTConsensusRemovePeerKillPoints.FINISH);
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/notpass/datanodecrash/IoTDBRegionMigrateDataNodeCrashIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/notpass/datanodecrash/IoTDBRegionMigrateDataNodeCrashIT.java
new file mode 100644
index 00000000000..6b023274aef
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/notpass/datanodecrash/IoTDBRegionMigrateDataNodeCrashIT.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.it.regionmigration.notpass.datanodecrash;
+
+import org.apache.iotdb.commons.utils.KillPoint.DataNodeKillPoints;
+import 
org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionMigrateReliabilityITFramework;
+
+import org.junit.Test;
+
+public class IoTDBRegionMigrateDataNodeCrashIT extends 
IoTDBRegionMigrateReliabilityITFramework {
+  // region Coordinator DataNode crash tests
+
+  @Test
+  public void coordinatorCrashDuringAddPeerTransition() throws Exception {
+    failTest(
+        2, 2, 1, 3, noKillPoints(), 
buildSet(DataNodeKillPoints.COORDINATOR_ADD_PEER_TRANSITION));
+  }
+
+  @Test
+  public void coordinatorCrashDuringAddPeerDone() throws Exception {
+    failTest(2, 2, 1, 3, noKillPoints(), 
buildSet(DataNodeKillPoints.COORDINATOR_ADD_PEER_DONE));
+  }
+
+  // endregion ----------------------------------------------
+
+  // region Original DataNode crash tests
+
+  @Test
+  public void originalCrashDuringAddPeerDone() throws Exception {
+    failTest(2, 2, 1, 3, noKillPoints(), 
buildSet(DataNodeKillPoints.ORIGINAL_ADD_PEER_DONE));
+  }
+
+  // endregion ----------------------------------------------
+
+  // region Destination DataNode crash tests
+
+  @Test
+  public void destinationCrashDuringCreateLocalPeer() throws Exception {
+    failTest(
+        2, 2, 1, 3, noKillPoints(), 
buildSet(DataNodeKillPoints.DESTINATION_CREATE_LOCAL_PEER));
+  }
+
+  @Test
+  public void destinationCrashDuringAddPeerTransition() throws Exception {
+    failTest(
+        2, 2, 1, 3, noKillPoints(), 
buildSet(DataNodeKillPoints.DESTINATION_ADD_PEER_TRANSITION));
+  }
+
+  @Test
+  public void destinationCrashDuringAddPeerDone() throws Exception {
+    failTest(2, 2, 1, 3, noKillPoints(), 
buildSet(DataNodeKillPoints.DESTINATION_ADD_PEER_DONE));
+  }
+
+  // endregion ----------------------------------------------
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/IoTDBRegionMigrateConfigNodeCrashIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/IoTDBRegionMigrateConfigNodeCrashIT.java
new file mode 100644
index 00000000000..5b21cc81bb1
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/IoTDBRegionMigrateConfigNodeCrashIT.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.it.regionmigration.pass;
+
+import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
+import 
org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionMigrateReliabilityITFramework;
+import org.apache.iotdb.confignode.procedure.state.AddRegionPeerState;
+import org.apache.iotdb.confignode.procedure.state.RegionTransitionState;
+import org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+public class IoTDBRegionMigrateConfigNodeCrashIT extends 
IoTDBRegionMigrateReliabilityITFramework {
+  @Test
+  @Ignore
+  public void cnCrashDuringPreCheck() throws Exception {
+    successTest(1, 1, 1, 2, 
buildSet(RegionTransitionState.REGION_MIGRATE_PREPARE), noKillPoints());
+  }
+
+  @Test
+  public void cnCrashDuringCreatePeer() throws Exception {
+    successTest(1, 1, 1, 2, 
buildSet(AddRegionPeerState.CREATE_NEW_REGION_PEER), noKillPoints());
+  }
+
+  @Test
+  public void cnCrashDuringDoAddPeer() throws Exception {
+    successTest(1, 1, 1, 2, buildSet(AddRegionPeerState.DO_ADD_REGION_PEER), 
noKillPoints());
+  }
+
+  @Test
+  public void cnCrashDuringUpdateCache() throws Exception {
+    successTest(
+        1, 1, 1, 2, buildSet(AddRegionPeerState.UPDATE_REGION_LOCATION_CACHE), 
noKillPoints());
+  }
+
+  @Test
+  public void cnCrashDuringChangeRegionLeader() throws Exception {
+    successTest(1, 1, 1, 2, 
buildSet(RegionTransitionState.CHANGE_REGION_LEADER), noKillPoints());
+  }
+
+  @Test
+  public void cnCrashDuringRemoveRegionPeer() throws Exception {
+    successTest(1, 1, 1, 2, 
buildSet(RemoveRegionPeerState.REMOVE_REGION_PEER), noKillPoints());
+  }
+
+  @Test
+  public void cnCrashDuringDeleteOldRegionPeer() throws Exception {
+    successTest(1, 1, 1, 2, 
buildSet(RemoveRegionPeerState.DELETE_OLD_REGION_PEER), noKillPoints());
+  }
+
+  @Test
+  public void cnCrashDuringRemoveRegionLocationCache() throws Exception {
+    successTest(
+        1, 1, 1, 2, 
buildSet(RemoveRegionPeerState.REMOVE_REGION_LOCATION_CACHE), noKillPoints());
+  }
+
+  @Test
+  public void cnCrashTest() throws Exception {
+    ConcurrentHashMap.KeySetView<String, Boolean> killConfigNodeKeywords = 
noKillPoints();
+    killConfigNodeKeywords.addAll(
+        Arrays.stream(AddRegionPeerState.values())
+            .map(KillPoint::enumToString)
+            .collect(Collectors.toList()));
+    killConfigNodeKeywords.addAll(
+        Arrays.stream(RemoveRegionPeerState.values())
+            .map(KillPoint::enumToString)
+            .collect(Collectors.toList()));
+    successTest(1, 1, 1, 2, killConfigNodeKeywords, noKillPoints());
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/DataNodeKillPoints.java
 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/IoTDBRegionMigrateNormalIT.java
similarity index 51%
copy from 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/DataNodeKillPoints.java
copy to 
integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/IoTDBRegionMigrateNormalIT.java
index 9d08cc09b78..072530da626 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/DataNodeKillPoints.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/IoTDBRegionMigrateNormalIT.java
@@ -17,12 +17,26 @@
  * under the License.
  */
 
-package org.apache.iotdb.commons.utils;
+package org.apache.iotdb.confignode.it.regionmigration.pass;
 
-@TestOnly
-public enum DataNodeKillPoints {
-  OriginalRemovePeer,
-  OriginalDeleteOldRegionPeer,
+import 
org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionMigrateReliabilityITFramework;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
 
-  CoordinatorRemovePeer,
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBRegionMigrateNormalIT extends 
IoTDBRegionMigrateReliabilityITFramework {
+  @Test
+  public void normal1C2DTest() throws Exception {
+    successTest(1, 1, 1, 2, noKillPoints(), noKillPoints());
+  }
+
+  @Test
+  public void normal3C3DTest() throws Exception {
+    successTest(2, 3, 3, 3, noKillPoints(), noKillPoints());
+  }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/IoTDBRegionMigrateOtherIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/IoTDBRegionMigrateOtherIT.java
new file mode 100644
index 00000000000..6d9444c9e4f
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/IoTDBRegionMigrateOtherIT.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.it.regionmigration.pass;
+
+import org.apache.iotdb.commons.utils.KillPoint.NeverTriggeredKillPoint;
+import 
org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionMigrateReliabilityITFramework;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBRegionMigrateOtherIT extends 
IoTDBRegionMigrateReliabilityITFramework {
+  @Test
+  public void badKillPoint() throws Exception {
+    try {
+      successTest(
+          1, 1, 1, 2, 
buildSet(NeverTriggeredKillPoint.NEVER_TRIGGERED_KILL_POINT), noKillPoints());
+    } catch (AssertionError e) {
+      return;
+    }
+    Assert.fail("kill point not triggered but test pass");
+  }
+}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 008882b67cd..f66663a9483 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.confignode.manager.node;
 
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
@@ -95,6 +96,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
 
 /** {@link NodeManager} manages cluster node addition and removal requests. */
 public class NodeManager {
@@ -353,6 +355,11 @@ public class NodeManager {
 
     resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART);
     
resp.setRuntimeConfiguration(getRuntimeConfiguration().setClusterId(clusterId));
+    List<TConsensusGroupId> consensusGroupIds =
+        getPartitionManager().getAllReplicaSets(nodeId).stream()
+            .map(TRegionReplicaSet::getRegionId)
+            .collect(Collectors.toList());
+    resp.setConsensusGroupIds(consensusGroupIds);
     return resp;
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
index 000f47067fc..853262625b7 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
@@ -41,6 +41,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import static org.apache.iotdb.commons.utils.KillPoint.KillPoint.setKillPoint;
 import static 
org.apache.iotdb.confignode.procedure.state.AddRegionPeerState.UPDATE_REGION_LOCATION_CACHE;
 import static org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS;
 
@@ -79,11 +80,13 @@ public class AddRegionPeerProcedure
       switch (state) {
         case CREATE_NEW_REGION_PEER:
           handler.createNewRegionPeer(consensusGroupId, destDataNode);
+          setKillPoint(state);
           setNextState(AddRegionPeerState.DO_ADD_REGION_PEER);
           break;
         case DO_ADD_REGION_PEER:
           TSStatus tsStatus =
               handler.addRegionPeer(this.getProcId(), destDataNode, 
consensusGroupId, coordinator);
+          setKillPoint(state);
           TRegionMigrateResult result;
           if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) {
             result = handler.waitTaskFinish(this.getProcId(), coordinator);
@@ -124,6 +127,7 @@ public class AddRegionPeerProcedure
           }
         case UPDATE_REGION_LOCATION_CACHE:
           handler.addRegionLocation(consensusGroupId, destDataNode);
+          setKillPoint(state);
           LOGGER.info("AddRegionPeer state {} complete", state);
           LOGGER.info(
               "AddRegionPeerProcedure success, region {} has been added to 
DataNode {}",
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java
index b6f418f0dc9..7823a6dc826 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.confignode.procedure.impl.region;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
+import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
 import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler;
@@ -103,6 +104,7 @@ public class RegionMigrateProcedure
           break;
         case CHANGE_REGION_LEADER:
           handler.changeRegionLeader(consensusGroupId, originalDataNode, 
destDataNode);
+          KillPoint.setKillPoint(state);
           setNextState(RegionTransitionState.REMOVE_REGION_PEER);
           break;
         case REMOVE_REGION_PEER:
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
index fdf62606e66..7055a274874 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
@@ -42,6 +42,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import static org.apache.iotdb.commons.utils.KillPoint.KillPoint.setKillPoint;
 import static 
org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState.DELETE_OLD_REGION_PEER;
 import static 
org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState.REMOVE_REGION_LOCATION_CACHE;
 import static org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS;
@@ -81,6 +82,7 @@ public class RemoveRegionPeerProcedure
           tsStatus =
               handler.removeRegionPeer(
                   this.getProcId(), targetDataNode, consensusGroupId, 
coordinator);
+          setKillPoint(state);
           if (tsStatus.getCode() != SUCCESS_STATUS.getStatusCode()) {
             throw new ProcedureException("REMOVE_REGION_PEER executed failed 
in DataNode");
           }
@@ -94,6 +96,7 @@ public class RemoveRegionPeerProcedure
         case DELETE_OLD_REGION_PEER:
           tsStatus =
               handler.deleteOldRegionPeer(this.getProcId(), targetDataNode, 
consensusGroupId);
+          setKillPoint(state);
           if (tsStatus.getCode() != SUCCESS_STATUS.getStatusCode()) {
             throw new ProcedureException("DELETE_OLD_REGION_PEER executed 
failed in DataNode");
           }
@@ -106,6 +109,7 @@ public class RemoveRegionPeerProcedure
           break;
         case REMOVE_REGION_LOCATION_CACHE:
           handler.removeRegionLocation(consensusGroupId, targetDataNode);
+          setKillPoint(state);
           LOGGER.info("RemoveRegionPeer state {} success", state);
           LOGGER.info(
               "RemoveRegionPeerProcedure success, region {} has been removed 
from DataNode {}",
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index dc857dc18cd..e9bb1fb6eb5 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -29,6 +29,9 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.service.RegisterManager;
 import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.commons.utils.KillPoint.DataNodeKillPoints;
+import 
org.apache.iotdb.commons.utils.KillPoint.IoTConsensusRemovePeerKillPoints;
+import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
 import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.IStateMachine;
@@ -253,6 +256,7 @@ public class IoTConsensus implements IConsensus {
             () ->
                 new ConsensusException(
                     String.format("Unable to create consensus dir for group 
%s", groupId)));
+    KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_CREATE_LOCAL_PEER);
     if (exist.get()) {
       throw new ConsensusGroupAlreadyExistException(groupId);
     }
@@ -287,22 +291,23 @@ public class IoTConsensus implements IConsensus {
       logger.info("[IoTConsensus] inactivate new peer: {}", peer);
       impl.inactivePeer(peer);
 
-      // step 2: notify all the other Peers to build the sync connection to 
newPeer
-      logger.info("[IoTConsensus] notify current peers to build sync log...");
-      impl.checkAndLockSafeDeletedSearchIndex();
-      impl.notifyPeersToBuildSyncLogChannel(peer);
-
-      // step 3: take snapshot
+      // step 2: take snapshot
       logger.info("[IoTConsensus] start to take snapshot...");
+      impl.checkAndLockSafeDeletedSearchIndex();
       impl.takeSnapshot();
 
-      // step 4: transit snapshot
+      // step 3: transit snapshot
       logger.info("[IoTConsensus] start to transit snapshot...");
       impl.transitSnapshot(peer);
 
-      // step 5: let the new peer load snapshot
+      // step 4: let the new peer load snapshot
       logger.info("[IoTConsensus] trigger new peer to load snapshot...");
       impl.triggerSnapshotLoad(peer);
+      
KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_TRANSITION);
+
+      // step 5: notify all the other Peers to build the sync connection to 
newPeer
+      logger.info("[IoTConsensus] notify current peers to build sync log...");
+      impl.notifyPeersToBuildSyncLogChannel(peer);
 
       // step 6: active new Peer
       logger.info("[IoTConsensus] activate new peer...");
@@ -311,8 +316,19 @@ public class IoTConsensus implements IConsensus {
       // step 7: spot clean
       logger.info("[IoTConsensus] do spot clean...");
       doSpotClean(peer, impl);
+      KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_DONE);
 
     } catch (ConsensusGroupModifyPeerException e) {
+      try {
+        logger.info("[IoTConsensus] add remote peer failed, automatic cleanup 
side effects...");
+
+        // clean up the sync log channel
+        impl.notifyPeersToRemoveSyncLogChannel(peer);
+
+      } catch (ConsensusGroupModifyPeerException mpe) {
+        logger.error(
+            "[IoTConsensus] failed to cleanup side effects after failed to add 
remote peer", mpe);
+      }
       throw new ConsensusException(e.getMessage());
     }
   }
@@ -335,21 +351,27 @@ public class IoTConsensus implements IConsensus {
       throw new PeerNotInConsensusGroupException(groupId, peer.toString());
     }
 
+    KillPoint.setKillPoint(IoTConsensusRemovePeerKillPoints.INIT);
+
     try {
       // let other peers remove the sync channel with target peer
       impl.notifyPeersToRemoveSyncLogChannel(peer);
     } catch (ConsensusGroupModifyPeerException e) {
       throw new ConsensusException(e.getMessage());
     }
+    KillPoint.setKillPoint(
+        
IoTConsensusRemovePeerKillPoints.AFTER_NOTIFY_PEERS_TO_REMOVE_SYNC_LOG_CHANNEL);
 
     try {
       // let target peer reject new write
       impl.inactivePeer(peer);
+      
KillPoint.setKillPoint(IoTConsensusRemovePeerKillPoints.AFTER_INACTIVE_PEER);
       // wait its SyncLog to complete
       impl.waitTargetPeerUntilSyncLogCompleted(peer);
     } catch (ConsensusGroupModifyPeerException e) {
       throw new ConsensusException(e.getMessage());
     }
+    KillPoint.setKillPoint(IoTConsensusRemovePeerKillPoints.FINISH);
   }
 
   @Override
@@ -401,19 +423,19 @@ public class IoTConsensus implements IConsensus {
     } else if (!impl.isActive()) {
       throw new ConsensusException(
           "peer is inactive and not ready to receive reset configuration 
request.");
-    } else {
-      for (Peer peer : impl.getConfiguration()) {
-        if (!peers.contains(peer)) {
-          try {
-            removeRemotePeer(groupId, peer);
-          } catch (ConsensusException e) {
-            logger.error("Failed to remove peer {} from group {}", peer, 
groupId, e);
-            throw e;
-          }
+    }
+
+    for (Peer peer : impl.getConfiguration()) {
+      if (!peers.contains(peer)) {
+        try {
+          removeRemotePeer(groupId, peer);
+        } catch (ConsensusException e) {
+          logger.error("Failed to remove peer {} from group {}", peer, 
groupId, e);
+          throw e;
         }
       }
-      impl.resetConfiguration(peers);
     }
+    impl.resetConfiguration(peers);
   }
 
   public IoTConsensusServerImpl getImpl(ConsensusGroupId groupId) {
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index ebee4982058..cdb797da4e4 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -87,6 +87,7 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
+import java.util.stream.Stream;
 
 public class IoTConsensusServerImpl {
 
@@ -133,9 +134,8 @@ public class IoTConsensusServerImpl {
     this.configuration = configuration;
     if (configuration.isEmpty()) {
       recoverConfiguration();
-    } else {
-      persistConfiguration();
     }
+    persistConfiguration();
     this.backgroundTaskService = backgroundTaskService;
     this.config = config;
     this.consensusGroupId = thisNode.getGroupId().toString();
@@ -572,7 +572,7 @@ public class IoTConsensusServerImpl {
     configuration.add(targetPeer);
     // step 3, persist configuration
     logger.info("[IoTConsensus] persist new configuration: {}", configuration);
-    persistConfigurationUpdate();
+    persistConfiguration();
   }
 
   public void removeSyncLogChannel(Peer targetPeer) throws 
ConsensusGroupModifyPeerException {
@@ -584,16 +584,30 @@ public class IoTConsensusServerImpl {
       configuration.remove(targetPeer);
       checkAndUpdateSafeDeletedSearchIndex();
       // step 3, persist configuration
-      persistConfigurationUpdate();
+      persistConfiguration();
       logger.info("[IoTConsensus] configuration updated to {}", 
this.configuration);
     } catch (IOException e) {
       throw new ConsensusGroupModifyPeerException("error when remove 
LogDispatcherThread", e);
     }
   }
 
+  // TODO: persist first and then delete old configuration file
   public void persistConfiguration() {
     try {
-      serializeConfigurationAndFsyncToDisk(CONFIGURATION_FILE_NAME);
+      try (Stream<Path> stream = Files.walk(Paths.get(storageDir))) {
+        stream
+            .filter(Files::isRegularFile)
+            .filter(filePath -> 
filePath.getFileName().toString().contains("configuration"))
+            .forEach(
+                filePath -> {
+                  try {
+                    Files.delete(filePath);
+                  } catch (IOException e) {
+                    logger.error("Unexpected error occurs when deleting old 
configuration file", e);
+                  }
+                });
+      }
+      serializeConfigurationAndFsyncToDisk();
     } catch (IOException e) {
       // TODO: (xingtanzjr) need to handle the IOException because the 
IoTConsensus won't
       // work expectedly
@@ -602,16 +616,6 @@ public class IoTConsensusServerImpl {
     }
   }
 
-  public void persistConfigurationUpdate() throws 
ConsensusGroupModifyPeerException {
-    try {
-      serializeConfigurationAndFsyncToDisk(CONFIGURATION_TMP_FILE_NAME);
-      tmpConfigurationUpdate(configuration);
-    } catch (IOException e) {
-      throw new ConsensusGroupModifyPeerException(
-          "Unexpected error occurs when update configuration", e);
-    }
-  }
-
   public void recoverConfiguration() {
     try {
       Path tmpConfigurationPath =
@@ -631,9 +635,13 @@ public class IoTConsensusServerImpl {
         // recover from split configuration file
         Path dirPath = Paths.get(storageDir);
         List<Peer> tmpPeerList = getConfiguration(dirPath, 
CONFIGURATION_TMP_FILE_NAME);
-        tmpConfigurationUpdate(tmpPeerList);
+        configuration.addAll(tmpPeerList);
         List<Peer> peerList = getConfiguration(dirPath, 
CONFIGURATION_FILE_NAME);
-        configuration.addAll(peerList);
+        for (Peer peer : peerList) {
+          if (!configuration.contains(peer)) {
+            configuration.add(peer);
+          }
+        }
       }
       logger.info("Recover IoTConsensus server Impl, configuration: {}", 
configuration);
     } catch (IOException e) {
@@ -643,28 +651,18 @@ public class IoTConsensusServerImpl {
 
   // @Compatibility
   private void recoverFromOldConfigurationFile(Path oldConfigurationPath) 
throws IOException {
+    // recover from old configuration file
     ByteBuffer buffer = 
ByteBuffer.wrap(Files.readAllBytes(oldConfigurationPath));
     int size = buffer.getInt();
     for (int i = 0; i < size; i++) {
       configuration.add(Peer.deserialize(buffer));
     }
-    persistConfiguration();
+    // TODO: delete old file before new file persisted is unsafe
     Files.delete(oldConfigurationPath);
   }
 
-  private void tmpConfigurationUpdate(List<Peer> tmpPeerList) throws 
IOException {
-    for (Peer peer : tmpPeerList) {
-      Path tmpConfigurationPath =
-          Paths.get(
-              new File(storageDir, peer.getNodeId() + "_" + 
CONFIGURATION_TMP_FILE_NAME)
-                  .getAbsolutePath());
-      Path configurationPath =
-          Paths.get(
-              new File(storageDir, peer.getNodeId() + "_" + 
CONFIGURATION_FILE_NAME)
-                  .getAbsolutePath());
-      Files.deleteIfExists(configurationPath);
-      Files.move(tmpConfigurationPath, configurationPath);
-    }
+  public static String generateConfigurationDatFileName(int nodeId) {
+    return nodeId + "_" + CONFIGURATION_FILE_NAME;
   }
 
   private List<Peer> getConfiguration(Path dirPath, String 
configurationFileName)
@@ -876,10 +874,9 @@ public class IoTConsensusServerImpl {
     return consensusGroupId;
   }
 
-  private void serializeConfigurationAndFsyncToDisk(String 
configurationFileName)
-      throws IOException {
+  private void serializeConfigurationAndFsyncToDisk() throws IOException {
     for (Peer peer : configuration) {
-      String peerConfigurationFileName = peer.getNodeId() + "_" + 
configurationFileName;
+      String peerConfigurationFileName = 
generateConfigurationDatFileName(peer.getNodeId());
       FileOutputStream fileOutputStream =
           new FileOutputStream(new File(storageDir, 
peerConfigurationFileName));
       try (DataOutputStream outputStream = new 
DataOutputStream(fileOutputStream)) {
@@ -888,8 +885,8 @@ public class IoTConsensusServerImpl {
         try {
           fileOutputStream.flush();
           fileOutputStream.getFD().sync();
-        } catch (IOException e) {
-          logger.error("Failed to fsync the configuration file {}", 
peerConfigurationFileName, e);
+        } catch (IOException ignore) {
+          // ignore
         }
       }
     }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
index 2a80ea3281a..6c4b7cc6777 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.consensus.iot.service;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.utils.KillPoint.DataNodeKillPoints;
+import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.BatchIndexedConsensusRequest;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
@@ -169,6 +171,7 @@ public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Asy
       resultHandler.onComplete(new TActivatePeerRes(status));
       return;
     }
+    KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_ADD_PEER_DONE);
     impl.setActive(true);
     resultHandler.onComplete(
         new TActivatePeerRes(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
@@ -198,6 +201,7 @@ public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Asy
       responseStatus = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
       responseStatus.setMessage(e.getMessage());
     }
+    KillPoint.setKillPoint(DataNodeKillPoints.ORIGINAL_ADD_PEER_DONE);
     resultHandler.onComplete(new TBuildSyncLogChannelRes(responseStatus));
   }
 
@@ -294,6 +298,7 @@ public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Asy
       return;
     }
     impl.loadSnapshot(req.snapshotId);
+    KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_ADD_PEER_TRANSITION);
     resultHandler.onComplete(
         new TTriggerSnapshotLoadRes(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 1380a167505..27c07ff6bd0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -328,9 +328,14 @@ public class IoTDBConfig {
   /** Consensus directory. */
   private String consensusDir = IoTDBConstant.DN_DEFAULT_DATA_DIR + 
File.separator + "consensus";
 
-  private String dataRegionConsensusDir = consensusDir + File.separator + 
"data_region";
+  private String dataRegionConsensusDir =
+      consensusDir + File.separator + IoTDBConstant.DATA_REGION_FOLDER_NAME;
 
-  private String schemaRegionConsensusDir = consensusDir + File.separator + 
"schema_region";
+  private String invalidDataRegionConsensusDir =
+      consensusDir + File.separator + 
IoTDBConstant.INVALID_DATA_REGION_FOLDER_NAME;
+
+  private String schemaRegionConsensusDir =
+      consensusDir + File.separator + IoTDBConstant.SCHEMA_REGION_FOLDER_NAME;
 
   /** temp result directory for sortOperator */
   private String sortTmpDir =
@@ -1445,8 +1450,12 @@ public class IoTDBConfig {
 
   public void setConsensusDir(String consensusDir) {
     this.consensusDir = consensusDir;
-    setDataRegionConsensusDir(consensusDir + File.separator + "data_region");
-    setSchemaRegionConsensusDir(consensusDir + File.separator + 
"schema_region");
+    setDataRegionConsensusDir(
+        consensusDir + File.separator + IoTDBConstant.DATA_REGION_FOLDER_NAME);
+    setSchemaRegionConsensusDir(
+        consensusDir + File.separator + 
IoTDBConstant.SCHEMA_REGION_FOLDER_NAME);
+    setInvalidDataRegionConsensusDir(
+        consensusDir + File.separator + 
IoTDBConstant.INVALID_DATA_REGION_FOLDER_NAME);
   }
 
   public String getDataRegionConsensusDir() {
@@ -1457,6 +1466,14 @@ public class IoTDBConfig {
     this.dataRegionConsensusDir = dataRegionConsensusDir;
   }
 
+  public String getInvalidDataRegionConsensusDir() {
+    return invalidDataRegionConsensusDir;
+  }
+
+  public void setInvalidDataRegionConsensusDir(String 
invalidDataRegionConsensusDir) {
+    this.invalidDataRegionConsensusDir = invalidDataRegionConsensusDir;
+  }
+
   public String getSchemaRegionConsensusDir() {
     return schemaRegionConsensusDir;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 02ad2844374..1cdd3406574 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.service;
 
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
@@ -29,6 +30,7 @@ import 
org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.concurrent.IoTDBDefaultThreadExceptionHandler;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.file.SystemFileFactory;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
@@ -44,6 +46,7 @@ import org.apache.iotdb.commons.udf.UDFInformation;
 import org.apache.iotdb.commons.udf.service.UDFClassLoaderManager;
 import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
 import org.apache.iotdb.commons.udf.service.UDFManagementService;
+import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
@@ -54,6 +57,7 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TRuntimeConfiguration;
 import org.apache.iotdb.confignode.rpc.thrift.TSystemConfigurationResp;
 import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.consensus.iot.IoTConsensus;
 import org.apache.iotdb.db.conf.DataNodeStartupCheck;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -108,6 +112,9 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -450,6 +457,57 @@ public class DataNode implements DataNodeMBean {
     }
   }
 
+  // TODO: Implement in IConsensus, not in DataNode
+  private List<ConsensusGroupId> getConsensusGroupId() {
+    List<ConsensusGroupId> consensusGroupIds = new ArrayList<>();
+    String dataRegionConsensusDir = config.getDataRegionConsensusDir();
+    if 
(config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS))
 {
+      return consensusGroupIds;
+    }
+    try (DirectoryStream<Path> stream =
+        Files.newDirectoryStream(new File(dataRegionConsensusDir).toPath())) {
+      for (Path path : stream) {
+        String[] items = path.getFileName().toString().split("_");
+        ConsensusGroupId consensusGroupId =
+            ConsensusGroupId.Factory.create(Integer.parseInt(items[0]), 
Integer.parseInt(items[1]));
+        consensusGroupIds.add(consensusGroupId);
+      }
+    } catch (IOException e) {
+      logger.error("Cannot get consensus group id from {}", 
dataRegionConsensusDir, e);
+    }
+    return consensusGroupIds;
+  }
+
+  // TODO: remove for current version, add todo for rename
+  private void renameInvalidRegionDirs(List<ConsensusGroupId> 
invalidConsensusGroupIds) {
+    for (ConsensusGroupId consensusGroupId : invalidConsensusGroupIds) {
+      File oldDir =
+          new File(
+              IoTConsensus.buildPeerDir(
+                  new File(config.getDataRegionConsensusDir()), 
consensusGroupId));
+      File newDir =
+          new File(
+              IoTConsensus.buildPeerDir(
+                  new File(config.getInvalidDataRegionConsensusDir()), 
consensusGroupId));
+      if (oldDir.exists() && !FileUtils.moveFileSafe(oldDir, newDir)) {
+        logger.error("move {} to {} failed.", oldDir.getAbsolutePath(), 
newDir.getAbsolutePath());
+        try {
+          FileUtils.recursivelyDeleteFolder(oldDir.getPath());
+        } catch (IOException e) {
+          logger.error("delete {} failed.", oldDir.getAbsolutePath());
+        }
+      }
+    }
+  }
+
+  private void removeInvalidRegions(List<ConsensusGroupId> 
dataNodeConsensusGroupIds) {
+    List<ConsensusGroupId> invalidConsensusGroupIds =
+        getConsensusGroupId().stream()
+            .filter(consensusGroupId -> 
!dataNodeConsensusGroupIds.contains(consensusGroupId))
+            .collect(Collectors.toList());
+    renameInvalidRegionDirs(invalidConsensusGroupIds);
+  }
+
   private void sendRestartRequestToConfigNode() throws StartupException {
     logger.info("Sending restart request to ConfigNode-leader...");
     long startTime = System.currentTimeMillis();
@@ -500,6 +558,14 @@ public class DataNode implements DataNodeMBean {
           "Restart request to cluster: {} is accepted, which takes {} ms.",
           config.getClusterName(),
           (endTime - startTime));
+
+      List<TConsensusGroupId> consensusGroupIds = 
dataNodeRestartResp.getConsensusGroupIds();
+      List<ConsensusGroupId> dataNodeConsensusGroupIds =
+          consensusGroupIds.stream()
+              .map(ConsensusGroupId.Factory::createFromTConsensusGroupId)
+              .collect(Collectors.toList());
+
+      removeInvalidRegions(dataNodeConsensusGroupIds);
     } else {
       /* Throw exception when restart is rejected */
       throw new StartupException(dataNodeRestartResp.getStatus().getMessage());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 90610bbcb53..0c9e28ae500 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -23,6 +23,7 @@ import 
org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProper
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.enums.HandleSystemErrorStrategy;
 import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
 import org.apache.iotdb.tsfile.fileSystem.FSType;
 
 import org.slf4j.Logger;
@@ -30,6 +31,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 public class CommonConfig {
@@ -256,6 +258,12 @@ public class CommonConfig {
   // time in nanosecond precision when starting up
   private final long startUpNanosecond = System.nanoTime();
 
+  private final boolean isIntegrationTest =
+      
System.getProperties().containsKey(IoTDBConstant.INTEGRATION_TEST_KILL_POINTS);
+
+  private final Set<String> enabledKillPoints =
+      
KillPoint.parseKillPoints(System.getProperty(IoTDBConstant.INTEGRATION_TEST_KILL_POINTS));
+
   CommonConfig() {
     // Empty constructor
   }
@@ -1073,4 +1081,12 @@ public class CommonConfig {
   public long getStartUpNanosecond() {
     return startUpNanosecond;
   }
+
+  public boolean isIntegrationTest() {
+    return isIntegrationTest;
+  }
+
+  public Set<String> getEnabledKillPoints() {
+    return enabledKillPoints;
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
index dcf059deb74..b6697483f9f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -233,6 +233,9 @@ public class IoTDBConstant {
   public static final String UNSEQUENCE_FOLDER_NAME = "unsequence";
   public static final String FILE_NAME_SEPARATOR = "-";
   public static final String CONSENSUS_FOLDER_NAME = "consensus";
+  public static final String DATA_REGION_FOLDER_NAME = "data_region";
+  public static final String INVALID_DATA_REGION_FOLDER_NAME = 
"invalid_data_region";
+  public static final String SCHEMA_REGION_FOLDER_NAME = "schema_region";
   public static final String SNAPSHOT_FOLDER_NAME = "snapshot";
 
   // system folder name
@@ -340,4 +343,6 @@ public class IoTDBConstant {
   public static final String TIER_SEPARATOR = ";";
 
   public static final String OBJECT_STORAGE_DIR = "object_storage";
+
+  public static final String INTEGRATION_TEST_KILL_POINTS = 
"integrationTestKillPoints";
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
index 401dfeb2477..f2788381b4a 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
@@ -37,7 +37,6 @@ import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
 import java.util.Arrays;
 import java.util.Objects;
-import java.util.concurrent.TimeUnit;
 
 public class FileUtils {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(FileUtils.class);
@@ -262,15 +261,6 @@ public class FileUtils {
     return true;
   }
 
-  public static void logBreakpoint(String logContent) {
-    LOGGER.info("breakpoint:{}", logContent);
-    try {
-      TimeUnit.SECONDS.sleep(1);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-    }
-  }
-
   public static File createHardLink(File sourceFile, File hardlink) throws 
IOException {
     if (!hardlink.getParentFile().exists() && 
!hardlink.getParentFile().mkdirs()) {
       throw new IOException(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/DataNodeKillPoints.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/KillPoint/DataNodeKillPoints.java
similarity index 78%
copy from 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/DataNodeKillPoints.java
copy to 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/KillPoint/DataNodeKillPoints.java
index 9d08cc09b78..1959537aa2c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/DataNodeKillPoints.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/KillPoint/DataNodeKillPoints.java
@@ -17,12 +17,13 @@
  * under the License.
  */
 
-package org.apache.iotdb.commons.utils;
+package org.apache.iotdb.commons.utils.KillPoint;
 
-@TestOnly
 public enum DataNodeKillPoints {
-  OriginalRemovePeer,
-  OriginalDeleteOldRegionPeer,
-
-  CoordinatorRemovePeer,
+  ORIGINAL_ADD_PEER_DONE,
+  DESTINATION_CREATE_LOCAL_PEER,
+  DESTINATION_ADD_PEER_TRANSITION,
+  DESTINATION_ADD_PEER_DONE,
+  COORDINATOR_ADD_PEER_TRANSITION,
+  COORDINATOR_ADD_PEER_DONE,
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/DataNodeKillPoints.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/KillPoint/IoTConsensusRemovePeerKillPoints.java
similarity index 81%
copy from 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/DataNodeKillPoints.java
copy to 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/KillPoint/IoTConsensusRemovePeerKillPoints.java
index 9d08cc09b78..4f80aa87212 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/DataNodeKillPoints.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/KillPoint/IoTConsensusRemovePeerKillPoints.java
@@ -17,12 +17,11 @@
  * under the License.
  */
 
-package org.apache.iotdb.commons.utils;
+package org.apache.iotdb.commons.utils.KillPoint;
 
-@TestOnly
-public enum DataNodeKillPoints {
-  OriginalRemovePeer,
-  OriginalDeleteOldRegionPeer,
-
-  CoordinatorRemovePeer,
+public enum IoTConsensusRemovePeerKillPoints {
+  INIT,
+  AFTER_NOTIFY_PEERS_TO_REMOVE_SYNC_LOG_CHANNEL,
+  AFTER_INACTIVE_PEER,
+  FINISH,
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/KillPoint/KillPoint.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/KillPoint/KillPoint.java
new file mode 100644
index 00000000000..40d06f5d0ad
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/KillPoint/KillPoint.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.utils.KillPoint;
+
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+
+import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class KillPoint {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(KillPoint.class);
+
+  public static String addKillPointPrefix(String name) {
+    return "[KILL POINT] " + name;
+  }
+
+  /**
+   * @param s something like "[a, b, c]"
+   * @return Set{a,b,c}
+   */
+  public static ImmutableSet<String> parseKillPoints(String s) {
+    if (s == null) {
+      return ImmutableSet.of();
+    }
+    Set<String> result =
+        Arrays.stream(s.replace("[", "").replace("]", "").replace(" ", 
"").split(","))
+            .collect(Collectors.toSet());
+    LOGGER.info("Kill point set: {}", result);
+    return ImmutableSet.copyOf(result);
+  }
+
+  /**
+   * Call this function to print a line in log.
+   *
+   * @param x Any enum member
+   */
+  public static <T extends Enum<T>> void setKillPoint(T x) {
+    if (CommonDescriptor.getInstance().getConfig().isIntegrationTest()) {
+      String breakPointName = enumToString(x);
+      if (CommonDescriptor.getInstance()
+          .getConfig()
+          .getEnabledKillPoints()
+          .contains(breakPointName)) {
+        LOGGER.info(addKillPointPrefix(breakPointName));
+        try {
+          TimeUnit.SECONDS.sleep(1);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+  }
+
+  public static <T extends Enum<?>> String enumToString(T x) {
+    return x.getClass().getSimpleName() + "." + x.name();
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/DataNodeKillPoints.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/KillPoint/NeverTriggeredKillPoint.java
similarity index 83%
rename from 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/DataNodeKillPoints.java
rename to 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/KillPoint/NeverTriggeredKillPoint.java
index 9d08cc09b78..b802622f23b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/DataNodeKillPoints.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/KillPoint/NeverTriggeredKillPoint.java
@@ -17,12 +17,8 @@
  * under the License.
  */
 
-package org.apache.iotdb.commons.utils;
+package org.apache.iotdb.commons.utils.KillPoint;
 
-@TestOnly
-public enum DataNodeKillPoints {
-  OriginalRemovePeer,
-  OriginalDeleteOldRegionPeer,
-
-  CoordinatorRemovePeer,
+public enum NeverTriggeredKillPoint {
+  NEVER_TRIGGERED_KILL_POINT
 }
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift 
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index b32e4acb0f9..ec47654f1ad 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -130,6 +130,7 @@ struct TDataNodeRestartResp {
   1: required common.TSStatus status
   2: required list<common.TConfigNodeLocation> configNodeList
   3: optional TRuntimeConfiguration runtimeConfiguration
+  4: optional list<common.TConsensusGroupId> consensusGroupIds
 }
 
 struct TDataNodeRemoveReq {

Reply via email to