This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch config-bug
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/config-bug by this push:
     new 2047d3a681b Fixed multiple config bugs
2047d3a681b is described below

commit 2047d3a681b6a5713d96cfc84fc9952d3b212597
Author: Caideyipi <[email protected]>
AuthorDate: Wed May 6 16:55:22 2026 +0800

    Fixed multiple config bugs
---
 .../statemachine/ConfigRegionStateMachine.java     | 151 ++++++++----
 .../iotdb/confignode/manager/node/NodeManager.java | 265 ++++++++++++++++-----
 .../persistence/executor/ConfigPlanExecutor.java   |   5 +-
 .../partition/DatabasePartitionTable.java          |   2 +-
 .../persistence/partition/PartitionInfo.java       |  13 +-
 .../impl/node/AddConfigNodeProcedure.java          |   4 +-
 .../statemachine/ConfigRegionStateMachineTest.java |  48 ++++
 .../confignode/manager/node/NodeManagerTest.java   | 197 +++++++++++++++
 .../confignode/persistence/PartitionInfoTest.java  |  29 +++
 9 files changed, 600 insertions(+), 114 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index effb1c466ba..48deaef45f7 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -92,8 +92,8 @@ public class ConfigRegionStateMachine implements 
IStateMachine, IStateMachine.Ev
   private static final long LOG_FILE_MAX_SIZE =
       CONF.getConfigNodeSimpleConsensusLogSegmentSizeMax();
   private final TEndPoint currentNodeTEndPoint;
-  private static Pattern LOG_INPROGRESS_PATTERN = Pattern.compile("\\d+");
-  private static Pattern LOG_PATTERN = Pattern.compile("(?<=_)(\\d+)$");
+  private static final Pattern LOG_INPROGRESS_PATTERN = 
Pattern.compile("log_inprogress_(\\d+)$");
+  private static final Pattern LOG_PATTERN = 
Pattern.compile("log_(\\d+)_(\\d+)$");
 
   public ConfigRegionStateMachine(ConfigManager configManager, 
ConfigPlanExecutor executor) {
     this.executor = executor;
@@ -121,6 +121,13 @@ public class ConfigRegionStateMachine implements 
IStateMachine, IStateMachine.Ev
 
   /** Transmit {@link ConfigPhysicalPlan} to {@link ConfigPlanExecutor} */
   protected TSStatus write(ConfigPhysicalPlan plan) {
+    if 
(ConsensusFactory.SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass()))
 {
+      final TSStatus persistStatus = persistPlanForSimpleConsensus(plan);
+      if (persistStatus.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return persistStatus;
+      }
+    }
+
     TSStatus result;
     try {
       result = executor.executeNonQueryPlan(plan);
@@ -129,10 +136,6 @@ public class ConfigRegionStateMachine implements 
IStateMachine, IStateMachine.Ev
       result = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
     }
 
-    if 
(ConsensusFactory.SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass()))
 {
-      writeLogForSimpleConsensus(plan);
-    }
-
     if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       PipeConfigNodeAgent.runtime().listener().tryListenToPlan(plan, false);
     }
@@ -197,7 +200,6 @@ public class ConfigRegionStateMachine implements 
IStateMachine, IStateMachine.Ev
         PipeConfigNodeAgent.runtime()
             .listener()
             .tryListenToSnapshots(ConfigNodeSnapshotParser.getSnapshots());
-        return true;
       } catch (IOException e) {
         if (PipeConfigNodeAgent.runtime().listener().isOpened()) {
           LOGGER.warn(
@@ -205,14 +207,18 @@ public class ConfigRegionStateMachine implements 
IStateMachine, IStateMachine.Ev
               e);
         }
       }
+      return true;
     }
     return false;
   }
 
   @Override
   public void loadSnapshot(final File latestSnapshotRootDir) {
+    if (!executor.loadSnapshot(latestSnapshotRootDir)) {
+      return;
+    }
+
     try {
-      executor.loadSnapshot(latestSnapshotRootDir);
       // We recompute the snapshot for pipe listener when loading snapshot
       // to recover the newest snapshot in cache
       PipeConfigNodeAgent.runtime()
@@ -342,6 +348,9 @@ public class ConfigRegionStateMachine implements 
IStateMachine, IStateMachine.Ev
 
   @Override
   public void stop() {
+    if 
(ConsensusFactory.SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass()))
 {
+      closeSimpleLogWriter();
+    }
     // Shutdown leader related service for config pipe
     PipeConfigNodeAgent.runtime().notifyLeaderUnavailable();
   }
@@ -351,56 +360,48 @@ public class ConfigRegionStateMachine implements 
IStateMachine, IStateMachine.Ev
     return CommonDescriptor.getInstance().getConfig().isReadOnly();
   }
 
-  private void writeLogForSimpleConsensus(ConfigPhysicalPlan plan) {
-    if (simpleLogFile.length() > LOG_FILE_MAX_SIZE) {
-      try {
-        simpleLogWriter.force();
-        File completedFilePath = new File(FILE_PATH + startIndex + "_" + 
endIndex);
-        Files.move(
-            simpleLogFile.toPath(), completedFilePath.toPath(), 
StandardCopyOption.ATOMIC_MOVE);
-      } catch (IOException e) {
-        LOGGER.error("Can't force logWriter for ConfigNode SimpleConsensus 
mode", e);
+  private TSStatus persistPlanForSimpleConsensus(ConfigPhysicalPlan plan) {
+    try {
+      if (simpleLogWriter == null || simpleLogFile == null) {
+        throw new IOException("SimpleConsensus log writer is not 
initialized.");
       }
-      for (int retry = 0; retry < 5; retry++) {
-        try {
-          simpleLogWriter.close();
-        } catch (IOException e) {
-          LOGGER.warn(
-              "Can't close StandAloneLog for ConfigNode SimpleConsensus mode, "
-                  + "filePath: {}, retry: {}",
-              simpleLogFile.getAbsolutePath(),
-              retry);
-          try {
-            // Sleep 1s and retry
-            TimeUnit.SECONDS.sleep(1);
-          } catch (InterruptedException e2) {
-            Thread.currentThread().interrupt();
-            LOGGER.warn("Unexpected interruption during the close method of 
logWriter");
-          }
-          continue;
-        }
-        break;
+
+      if (simpleLogFile.length() > LOG_FILE_MAX_SIZE) {
+        rollSimpleConsensusLogFile();
       }
-      startIndex = endIndex + 1;
-      createLogFile(startIndex);
-    }
 
-    try {
       ByteBuffer buffer = plan.serializeToByteBuffer();
       buffer.position(buffer.limit());
       simpleLogWriter.write(buffer);
+      simpleLogWriter.force();
 
       endIndex = endIndex + 1;
     } catch (Exception e) {
       LOGGER.error(
-          "Can't serialize current ConfigPhysicalPlan for ConfigNode 
SimpleConsensus mode", e);
+          "Persist current ConfigPhysicalPlan for ConfigNode SimpleConsensus 
mode failed", e);
+      return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+          .setMessage(
+              "Persist ConfigNode SimpleConsensus log failed: " + 
String.valueOf(e.getMessage()));
     }
+    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+  }
+
+  private void rollSimpleConsensusLogFile() throws IOException {
+    simpleLogWriter.force();
+    closeSimpleLogWriter();
+    Files.move(
+        simpleLogFile.toPath(),
+        new File(FILE_PATH + startIndex + "_" + endIndex).toPath(),
+        StandardCopyOption.ATOMIC_MOVE);
+    startIndex = endIndex + 1;
+    createLogFile(startIndex);
   }
 
   private void initStandAloneConfigNode() {
     File dir = new File(CURRENT_FILE_DIR);
     dir.mkdirs();
     String[] list = new File(CURRENT_FILE_DIR).list();
+    endIndex = 0;
     if (list != null && list.length != 0) {
       Arrays.sort(list, new FileComparator());
       for (String logFileName : list) {
@@ -417,7 +418,7 @@ public class ConfigRegionStateMachine implements 
IStateMachine, IStateMachine.Ev
           continue;
         }
 
-        startIndex = endIndex;
+        final int recoveredStartIndex = parseStartIndex(logFileName);
         while (logReader.hasNext()) {
           endIndex++;
           // Read and re-serialize the PhysicalPlan
@@ -435,13 +436,13 @@ public class ConfigRegionStateMachine implements 
IStateMachine, IStateMachine.Ev
           }
         }
         logReader.close();
+        if (isInProgressLogFile(logFileName)) {
+          sealRecoveredInProgressLogFile(logFile, recoveredStartIndex, 
endIndex);
+        }
       }
-    } else {
-      startIndex = 0;
-      endIndex = 0;
     }
-    startIndex = startIndex + 1;
-    createLogFile(endIndex);
+    startIndex = endIndex + 1;
+    createLogFile(startIndex);
 
     ScheduledExecutorService simpleConsensusThread =
         IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
@@ -482,26 +483,72 @@ public class ConfigRegionStateMachine implements 
IStateMachine, IStateMachine.Ev
     }
   }
 
+  private void sealRecoveredInProgressLogFile(
+      File logFile, int recoveredStartIndex, int recoveredEndIndex) {
+    try {
+      if (recoveredStartIndex > recoveredEndIndex) {
+        Files.deleteIfExists(logFile.toPath());
+        return;
+      }
+      Files.move(
+          logFile.toPath(),
+          new File(FILE_PATH + recoveredStartIndex + "_" + 
recoveredEndIndex).toPath(),
+          StandardCopyOption.ATOMIC_MOVE);
+    } catch (IOException e) {
+      LOGGER.warn("Seal recovered ConfigNode SimpleConsensus log failed: {}", 
logFile, e);
+    }
+  }
+
+  private boolean isInProgressLogFile(String filename) {
+    return filename.startsWith("log_inprogress_");
+  }
+
+  private void closeSimpleLogWriter() {
+    if (simpleLogWriter == null) {
+      return;
+    }
+    for (int retry = 0; retry < 5; retry++) {
+      try {
+        simpleLogWriter.close();
+        simpleLogWriter = null;
+        return;
+      } catch (IOException e) {
+        LOGGER.warn(
+            "Can't close StandAloneLog for ConfigNode SimpleConsensus mode, "
+                + "filePath: {}, retry: {}",
+            simpleLogFile == null ? null : simpleLogFile.getAbsolutePath(),
+            retry);
+        try {
+          TimeUnit.SECONDS.sleep(1);
+        } catch (InterruptedException e2) {
+          Thread.currentThread().interrupt();
+          LOGGER.warn("Unexpected interruption during the close method of 
logWriter");
+          break;
+        }
+      }
+    }
+  }
+
   static class FileComparator implements Comparator<String> {
 
     @Override
     public int compare(String filename1, String filename2) {
-      long id1 = parseEndIndex(filename1);
-      long id2 = parseEndIndex(filename2);
+      long id1 = parseStartIndex(filename1);
+      long id2 = parseStartIndex(filename2);
       return Long.compare(id1, id2);
     }
   }
 
-  static long parseEndIndex(String filename) {
+  static int parseStartIndex(String filename) {
     if (filename.startsWith("log_inprogress_")) {
       Matcher matcher = LOG_INPROGRESS_PATTERN.matcher(filename);
       if (matcher.find()) {
-        return Long.parseLong(matcher.group());
+        return Integer.parseInt(matcher.group(1));
       }
     } else {
       Matcher matcher = LOG_PATTERN.matcher(filename);
       if (matcher.find()) {
-        return Long.parseLong(matcher.group());
+        return Integer.parseInt(matcher.group(1));
       }
     }
     return 0;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 2e50c6b787b..abd89ff6037 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.confignode.manager.node;
 
 import org.apache.iotdb.common.rpc.thrift.TAINodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TAINodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
@@ -46,6 +47,7 @@ import 
org.apache.iotdb.confignode.client.sync.SyncConfigNodeClientPool;
 import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
 import 
org.apache.iotdb.confignode.consensus.request.read.ainode.GetAINodeConfigurationPlan;
 import 
org.apache.iotdb.confignode.consensus.request.read.datanode.GetDataNodeConfigurationPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.ainode.RegisterAINodePlan;
@@ -321,30 +323,33 @@ public class NodeManager {
     DataNodeRegisterResp resp = new DataNodeRegisterResp();
     resp.setConfigNodeList(getRegisteredConfigNodes());
 
-    // Create a new DataNodeHeartbeatCache and force update NodeStatus
     int dataNodeId = nodeInfo.generateNextNodeId();
-    
getLoadManager().getLoadCache().createNodeHeartbeatCache(NodeType.DataNode, 
dataNodeId);
-    // TODO: invoke a force heartbeat to update new DataNode's status 
immediately
 
     RegisterDataNodePlan registerDataNodePlan =
         new RegisterDataNodePlan(req.getDataNodeConfiguration());
     // Register new DataNode
     
registerDataNodePlan.getDataNodeConfiguration().getLocation().setDataNodeId(dataNodeId);
-    try {
-      getConsensusManager().write(registerDataNodePlan);
-    } catch (ConsensusException e) {
-      LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
+    TSStatus registerStatus = writeConfigPhysicalPlan(registerDataNodePlan);
+    if (!isConsensusWriteSuccessful(registerStatus)) {
+      resp.setStatus(registerStatus);
+      return resp;
     }
 
     // update datanode's versionInfo
     UpdateVersionInfoPlan updateVersionInfoPlan =
         new UpdateVersionInfoPlan(req.getVersionInfo(), dataNodeId);
-    try {
-      getConsensusManager().write(updateVersionInfoPlan);
-    } catch (ConsensusException e) {
-      LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
+    TSStatus updateVersionStatus = 
writeConfigPhysicalPlan(updateVersionInfoPlan);
+    if (!isConsensusWriteSuccessful(updateVersionStatus)) {
+      resp.setStatus(
+          rollbackDataNodeRegistration(
+              registerDataNodePlan.getDataNodeConfiguration().getLocation(), 
updateVersionStatus));
+      return resp;
     }
 
+    // Create a new DataNodeHeartbeatCache and force update NodeStatus
+    
getLoadManager().getLoadCache().createNodeHeartbeatCache(NodeType.DataNode, 
dataNodeId);
+    // TODO: invoke a force heartbeat to update new DataNode's status 
immediately
+
     // Bind DataNode metrics
     PartitionMetrics.bindDataNodePartitionMetricsWhenUpdate(
         MetricService.getInstance(), configManager, dataNodeId);
@@ -352,7 +357,10 @@ public class NodeManager {
     // Adjust the maximum RegionGroup number of each Database
     getClusterSchemaManager().adjustMaxRegionGroupNum();
 
-    resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION);
+    resp.setStatus(
+        buildSuccessStatus(
+            ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION.getMessage(),
+            registerStatus.getMessage()));
     resp.setDataNodeId(
         
registerDataNodePlan.getDataNodeConfiguration().getLocation().getDataNodeId());
     resp.setRuntimeConfiguration(getRuntimeConfiguration(dataNodeId));
@@ -380,10 +388,10 @@ public class NodeManager {
       // Update DataNodeConfiguration when modified during restart
       UpdateDataNodePlan updateDataNodePlan =
           new UpdateDataNodePlan(req.getDataNodeConfiguration());
-      try {
-        getConsensusManager().write(updateDataNodePlan);
-      } catch (ConsensusException e) {
-        LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
+      TSStatus updateStatus = writeConfigPhysicalPlan(updateDataNodePlan);
+      if (!isConsensusWriteSuccessful(updateStatus)) {
+        resp.setStatus(updateStatus);
+        return resp;
       }
     }
     TNodeVersionInfo versionInfo = nodeInfo.getVersionInfo(nodeId);
@@ -391,14 +399,14 @@ public class NodeManager {
       // Update versionInfo when modified during restart
       UpdateVersionInfoPlan updateVersionInfoPlan =
           new UpdateVersionInfoPlan(req.getVersionInfo(), nodeId);
-      try {
-        getConsensusManager().write(updateVersionInfoPlan);
-      } catch (ConsensusException e) {
-        LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
+      TSStatus updateStatus = writeConfigPhysicalPlan(updateVersionInfoPlan);
+      if (!isConsensusWriteSuccessful(updateStatus)) {
+        resp.setStatus(updateStatus);
+        return resp;
       }
     }
 
-    resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART);
+    
resp.setStatus(buildSuccessStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART.getMessage()));
     resp.setRuntimeConfiguration(getRuntimeConfiguration(nodeId));
 
     
resp.setCorrectConsensusGroups(getPartitionManager().getAllReplicaSets(nodeId));
@@ -476,13 +484,12 @@ public class NodeManager {
       // Update versionInfo when modified during restart
       UpdateVersionInfoPlan updateConfigNodePlan =
           new UpdateVersionInfoPlan(versionInfo, configNodeId);
-      try {
-        return getConsensusManager().write(updateConfigNodePlan);
-      } catch (ConsensusException e) {
-        return new 
TSStatus(TSStatusCode.CONSENSUS_NOT_INITIALIZED.getStatusCode());
+      TSStatus updateStatus = writeConfigPhysicalPlan(updateConfigNodePlan);
+      if (!isConsensusWriteSuccessful(updateStatus)) {
+        return updateStatus;
       }
     }
-    return ClusterNodeStartUtils.ACCEPT_NODE_RESTART;
+    return 
buildSuccessStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART.getMessage());
   }
 
   public List<TAINodeInfo> getRegisteredAINodeInfoList() {
@@ -528,27 +535,37 @@ public class NodeManager {
     }
 
     int aiNodeId = nodeInfo.generateNextNodeId();
-    getLoadManager().getLoadCache().createNodeHeartbeatCache(NodeType.AINode, 
aiNodeId);
     RegisterAINodePlan registerAINodePlan = new 
RegisterAINodePlan(req.getAiNodeConfiguration());
     // Register new DataNode
     
registerAINodePlan.getAINodeConfiguration().getLocation().setAiNodeId(aiNodeId);
-    try {
-      getConsensusManager().write(registerAINodePlan);
-    } catch (ConsensusException e) {
-      LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
+    TSStatus registerStatus = writeConfigPhysicalPlan(registerAINodePlan);
+    if (!isConsensusWriteSuccessful(registerStatus)) {
+      AINodeRegisterResp resp = new AINodeRegisterResp();
+      resp.setConfigNodeList(getRegisteredConfigNodes());
+      resp.setStatus(registerStatus);
+      return resp;
     }
 
     // update datanode's versionInfo
     UpdateVersionInfoPlan updateVersionInfoPlan =
         new UpdateVersionInfoPlan(req.getVersionInfo(), aiNodeId);
-    try {
-      getConsensusManager().write(updateVersionInfoPlan);
-    } catch (ConsensusException e) {
-      LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
+    TSStatus updateVersionStatus = 
writeConfigPhysicalPlan(updateVersionInfoPlan);
+    if (!isConsensusWriteSuccessful(updateVersionStatus)) {
+      AINodeRegisterResp resp = new AINodeRegisterResp();
+      resp.setConfigNodeList(getRegisteredConfigNodes());
+      resp.setStatus(
+          rollbackAINodeRegistration(
+              registerAINodePlan.getAINodeConfiguration().getLocation(), 
updateVersionStatus));
+      return resp;
     }
 
+    getLoadManager().getLoadCache().createNodeHeartbeatCache(NodeType.AINode, 
aiNodeId);
+
     AINodeRegisterResp resp = new AINodeRegisterResp();
-    resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION);
+    resp.setStatus(
+        buildSuccessStatus(
+            ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION.getMessage(),
+            registerStatus.getMessage()));
     resp.setConfigNodeList(getRegisteredConfigNodes());
     
resp.setAINodeId(registerAINodePlan.getAINodeConfiguration().getLocation().getAiNodeId());
     return resp;
@@ -586,10 +603,12 @@ public class NodeManager {
     if (!req.getAiNodeConfiguration().equals(aiNodeConfiguration)) {
       // Update AINodeConfiguration when modified during restart
       UpdateAINodePlan updateAINodePlan = new 
UpdateAINodePlan(req.getAiNodeConfiguration());
-      try {
-        getConsensusManager().write(updateAINodePlan);
-      } catch (ConsensusException e) {
-        LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
+      TSStatus updateStatus = writeConfigPhysicalPlan(updateAINodePlan);
+      if (!isConsensusWriteSuccessful(updateStatus)) {
+        TAINodeRestartResp resp = new TAINodeRestartResp();
+        resp.setConfigNodeList(getRegisteredConfigNodes());
+        resp.setStatus(updateStatus);
+        return resp;
       }
     }
     TNodeVersionInfo versionInfo = nodeInfo.getVersionInfo(nodeId);
@@ -597,15 +616,17 @@ public class NodeManager {
       // Update versionInfo when modified during restart
       UpdateVersionInfoPlan updateVersionInfoPlan =
           new UpdateVersionInfoPlan(req.getVersionInfo(), nodeId);
-      try {
-        getConsensusManager().write(updateVersionInfoPlan);
-      } catch (ConsensusException e) {
-        LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
+      TSStatus updateStatus = writeConfigPhysicalPlan(updateVersionInfoPlan);
+      if (!isConsensusWriteSuccessful(updateStatus)) {
+        TAINodeRestartResp resp = new TAINodeRestartResp();
+        resp.setConfigNodeList(getRegisteredConfigNodes());
+        resp.setStatus(updateStatus);
+        return resp;
       }
     }
 
     TAINodeRestartResp resp = new TAINodeRestartResp();
-    resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART);
+    
resp.setStatus(buildSuccessStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART.getMessage()));
     resp.setConfigNodeList(getRegisteredConfigNodes());
     return resp;
   }
@@ -890,17 +911,15 @@ public class NodeManager {
   public void applyConfigNode(
       TConfigNodeLocation configNodeLocation, TNodeVersionInfo versionInfo) {
     ApplyConfigNodePlan applyConfigNodePlan = new 
ApplyConfigNodePlan(configNodeLocation);
-    try {
-      getConsensusManager().write(applyConfigNodePlan);
-    } catch (ConsensusException e) {
-      LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
-    }
+    ensureConsensusWriteSuccessful(
+        writeConfigPhysicalPlan(applyConfigNodePlan),
+        String.format("apply ConfigNode %s", configNodeLocation));
     UpdateVersionInfoPlan updateVersionInfoPlan =
         new UpdateVersionInfoPlan(versionInfo, 
configNodeLocation.getConfigNodeId());
-    try {
-      getConsensusManager().write(updateVersionInfoPlan);
-    } catch (ConsensusException e) {
-      LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
+    final TSStatus updateStatus = 
writeConfigPhysicalPlan(updateVersionInfoPlan);
+    if (!isConsensusWriteSuccessful(updateStatus)) {
+      throw new IllegalStateException(
+          rollbackConfigNodeRegistration(configNodeLocation, 
updateStatus).getMessage());
     }
   }
 
@@ -1303,6 +1322,144 @@ public class NodeManager {
     return getRegisteredDataNode(dataNodeId).getLocation();
   }
 
+  private TSStatus writeConfigPhysicalPlan(ConfigPhysicalPlan plan) {
+    try {
+      return getConsensusManager().write(plan);
+    } catch (ConsensusException e) {
+      LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
+      return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+          .setMessage(e.getMessage());
+    }
+  }
+
+  private boolean isConsensusWriteSuccessful(TSStatus status) {
+    return status != null && status.getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode();
+  }
+
+  private TSStatus rollbackDataNodeRegistration(
+      TDataNodeLocation dataNodeLocation, TSStatus versionUpdateStatus) {
+    final TSStatus rollbackStatus =
+        writeConfigPhysicalPlan(
+            new 
RemoveDataNodePlan(Collections.singletonList(dataNodeLocation)));
+    final String failureMessage =
+        String.format(
+            "Failed to persist version info for DataNode %d: %s",
+            dataNodeLocation.getDataNodeId(), 
describeStatus(versionUpdateStatus));
+    if (isConsensusWriteSuccessful(rollbackStatus)) {
+      return buildStatus(
+          versionUpdateStatus.getCode(),
+          failureMessage,
+          "The registration has been rolled back. Please retry the 
registration.");
+    }
+
+    LOGGER.error(
+        "Failed to roll back DataNode registration {} after version info 
persistence failure. "
+            + "versionUpdateStatus: {}, rollbackStatus: {}",
+        dataNodeLocation,
+        versionUpdateStatus,
+        rollbackStatus);
+    return buildStatus(
+        rollbackStatus.getCode(),
+        failureMessage,
+        String.format("The registration rollback also failed: %s", 
describeStatus(rollbackStatus)),
+        "Manual cleanup may be required before retrying the registration.");
+  }
+
+  private TSStatus rollbackAINodeRegistration(
+      TAINodeLocation aiNodeLocation, TSStatus versionUpdateStatus) {
+    final TSStatus rollbackStatus = writeConfigPhysicalPlan(new 
RemoveAINodePlan(aiNodeLocation));
+    final String failureMessage =
+        String.format(
+            "Failed to persist version info for AINode %d: %s",
+            aiNodeLocation.getAiNodeId(), describeStatus(versionUpdateStatus));
+    if (isConsensusWriteSuccessful(rollbackStatus)) {
+      return buildStatus(
+          versionUpdateStatus.getCode(),
+          failureMessage,
+          "The registration has been rolled back. Please retry the 
registration.");
+    }
+
+    LOGGER.error(
+        "Failed to roll back AINode registration {} after version info 
persistence failure. "
+            + "versionUpdateStatus: {}, rollbackStatus: {}",
+        aiNodeLocation,
+        versionUpdateStatus,
+        rollbackStatus);
+    return buildStatus(
+        rollbackStatus.getCode(),
+        failureMessage,
+        String.format("The registration rollback also failed: %s", 
describeStatus(rollbackStatus)),
+        "Manual cleanup may be required before retrying the registration.");
+  }
+
+  private TSStatus rollbackConfigNodeRegistration(
+      TConfigNodeLocation configNodeLocation, TSStatus versionUpdateStatus) {
+    final TSStatus rollbackStatus =
+        writeConfigPhysicalPlan(new RemoveConfigNodePlan(configNodeLocation));
+    final String failureMessage =
+        String.format(
+            "Failed to persist version info for ConfigNode %d: %s",
+            configNodeLocation.getConfigNodeId(), 
describeStatus(versionUpdateStatus));
+    if (isConsensusWriteSuccessful(rollbackStatus)) {
+      return buildStatus(
+          versionUpdateStatus.getCode(),
+          failureMessage,
+          "The ConfigNode registration has been rolled back.");
+    }
+
+    LOGGER.error(
+        "Failed to roll back ConfigNode registration {} after version info 
persistence failure. "
+            + "versionUpdateStatus: {}, rollbackStatus: {}",
+        configNodeLocation,
+        versionUpdateStatus,
+        rollbackStatus);
+    return buildStatus(
+        rollbackStatus.getCode(),
+        failureMessage,
+        String.format("The registration rollback also failed: %s", 
describeStatus(rollbackStatus)),
+        "Manual cleanup may be required before retrying the registration.");
+  }
+
+  private TSStatus buildStatus(int statusCode, String... messages) {
+    final TSStatus status = new TSStatus(statusCode);
+    final StringBuilder builder = new StringBuilder();
+    for (String message : messages) {
+      if (message == null || message.isEmpty()) {
+        continue;
+      }
+      if (builder.length() > 0) {
+        builder.append(' ');
+      }
+      builder.append(message);
+    }
+    if (builder.length() > 0) {
+      status.setMessage(builder.toString());
+    }
+    return status;
+  }
+
+  private TSStatus buildSuccessStatus(String... messages) {
+    return buildStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode(), messages);
+  }
+
+  private String describeStatus(TSStatus status) {
+    if (status == null) {
+      return "unknown error";
+    }
+    if (status.getMessage() != null && !status.getMessage().isEmpty()) {
+      return status.getMessage();
+    }
+    return "status code " + status.getCode();
+  }
+
+  private void ensureConsensusWriteSuccessful(TSStatus status, String action) {
+    if (isConsensusWriteSuccessful(status)) {
+      return;
+    }
+    throw new IllegalStateException(
+        String.format("Failed to %s through consensus layer: %s", action, 
status));
+  }
+
   private ConsensusManager getConsensusManager() {
     return configManager.getConsensusManager();
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index b2bae24de38..fe1a608f6ab 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -760,12 +760,12 @@ public class ConfigPlanExecutor {
     return result.get();
   }
 
-  public void loadSnapshot(final File latestSnapshotRootDir) {
+  public boolean loadSnapshot(final File latestSnapshotRootDir) {
     if (!latestSnapshotRootDir.exists()) {
       LOGGER.error(
           "snapshot directory [{}] is not exist, can not load snapshot with 
this directory.",
           latestSnapshotRootDir.getAbsolutePath());
-      return;
+      return false;
     }
 
     final AtomicBoolean result = new AtomicBoolean(true);
@@ -793,6 +793,7 @@ public class ConfigPlanExecutor {
           "[ConfigNodeSnapshot] Load snapshot success, latestSnapshotRootDir: 
{}",
           latestSnapshotRootDir);
     }
+    return result.get();
   }
 
   private DataSet getSchemaNodeManagementPartition(ConfigPhysicalPlan req) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
index 5a80364f033..e8b3cb3bb55 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
@@ -243,7 +243,7 @@ public class DatabasePartitionTable {
                 result.getAndIncrement();
               }
             });
-    return result.getAndIncrement();
+    return result.get();
   }
 
   /**
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index 5c2c93daeab..f876d4dc25e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -243,7 +243,9 @@ public class PartitionInfo implements SnapshotProcessor {
    */
   public TSStatus pollRegionMaintainTask() {
     synchronized (regionMaintainTaskList) {
-      regionMaintainTaskList.remove(0);
+      if (!regionMaintainTaskList.isEmpty()) {
+        regionMaintainTaskList.remove(0);
+      }
       return RpcUtils.SUCCESS_STATUS;
     }
   }
@@ -1008,9 +1010,14 @@ public class PartitionInfo implements SnapshotProcessor {
         databasePartitionTableEntry.getValue().serialize(bufferedOutputStream, 
protocol);
       }
 
+      final List<RegionMaintainTask> copiedRegionMaintainTaskList;
+      synchronized (regionMaintainTaskList) {
+        copiedRegionMaintainTaskList = new ArrayList<>(regionMaintainTaskList);
+      }
+
       // serialize regionCleanList
-      ReadWriteIOUtils.write(regionMaintainTaskList.size(), 
bufferedOutputStream);
-      for (RegionMaintainTask task : regionMaintainTaskList) {
+      ReadWriteIOUtils.write(copiedRegionMaintainTaskList.size(), 
bufferedOutputStream);
+      for (RegionMaintainTask task : copiedRegionMaintainTaskList) {
         task.serialize(bufferedOutputStream, protocol);
       }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
index b38696a10ed..c0d45d00d33 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
@@ -80,9 +80,9 @@ public class AddConfigNodeProcedure extends 
AbstractNodeProcedure<AddConfigNodeS
           LOG.info("Successfully ADD_PEER {}", tConfigNodeLocation);
           break;
         case REGISTER_SUCCESS:
-          env.notifyRegisterSuccess(tConfigNodeLocation);
-          
env.createConfigNodeHeartbeatCache(tConfigNodeLocation.getConfigNodeId());
           env.applyConfigNode(tConfigNodeLocation, versionInfo);
+          
env.createConfigNodeHeartbeatCache(tConfigNodeLocation.getConfigNodeId());
+          env.notifyRegisterSuccess(tConfigNodeLocation);
           LOG.info("The ConfigNode: {} is successfully added to the cluster", 
tConfigNodeLocation);
           return Flow.NO_MORE_STATE;
       }
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachineTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachineTest.java
new file mode 100644
index 00000000000..7bd59c81178
--- /dev/null
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachineTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.statemachine;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class ConfigRegionStateMachineTest {
+
+  @Test
+  public void testParseStartIndex() {
+    Assert.assertEquals(1, 
ConfigRegionStateMachine.parseStartIndex("log_1_10"));
+    Assert.assertEquals(11, 
ConfigRegionStateMachine.parseStartIndex("log_11_20"));
+    Assert.assertEquals(21, 
ConfigRegionStateMachine.parseStartIndex("log_inprogress_21"));
+    Assert.assertEquals(0, 
ConfigRegionStateMachine.parseStartIndex("invalid"));
+  }
+
+  @Test
+  public void testFileComparatorSortsByStartIndex() {
+    List<String> filenames =
+        new ArrayList<>(Arrays.asList("log_inprogress_21", "log_11_20", 
"log_1_10"));
+
+    filenames.sort(new ConfigRegionStateMachine.FileComparator());
+
+    Assert.assertEquals(Arrays.asList("log_1_10", "log_11_20", 
"log_inprogress_21"), filenames);
+  }
+}
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeManagerTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeManagerTest.java
new file mode 100644
index 00000000000..247d745661e
--- /dev/null
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeManagerTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.node;
+
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cluster.NodeType;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfigNodePlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.confignode.UpdateVersionInfoPlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
+import 
org.apache.iotdb.confignode.consensus.response.datanode.DataNodeRegisterResp;
+import org.apache.iotdb.confignode.manager.ClusterManager;
+import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
+import org.apache.iotdb.confignode.manager.load.LoadManager;
+import org.apache.iotdb.confignode.manager.load.cache.LoadCache;
+import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
+import org.apache.iotdb.confignode.persistence.node.NodeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartResp;
+import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class NodeManagerTest {
+
+  private IManager configManager;
+  private ConsensusManager consensusManager;
+  private LoadManager loadManager;
+  private LoadCache loadCache;
+  private ClusterSchemaManager clusterSchemaManager;
+  private ClusterManager clusterManager;
+  private NodeInfo nodeInfo;
+  private NodeManager nodeManager;
+
+  @Before
+  public void setUp() {
+    configManager = Mockito.mock(IManager.class);
+    consensusManager = Mockito.mock(ConsensusManager.class);
+    loadManager = Mockito.mock(LoadManager.class);
+    loadCache = Mockito.mock(LoadCache.class);
+    clusterSchemaManager = Mockito.mock(ClusterSchemaManager.class);
+    clusterManager = Mockito.mock(ClusterManager.class);
+    nodeInfo = new NodeInfo();
+    nodeManager = new NodeManager(configManager, nodeInfo);
+
+    when(configManager.getConsensusManager()).thenReturn(consensusManager);
+    when(configManager.getLoadManager()).thenReturn(loadManager);
+    when(loadManager.getLoadCache()).thenReturn(loadCache);
+    
when(configManager.getClusterSchemaManager()).thenReturn(clusterSchemaManager);
+    when(configManager.getClusterManager()).thenReturn(clusterManager);
+  }
+
+  @Test
+  public void testRegisterDataNodeStopsWhenRegisterWriteFails() throws 
ConsensusException {
+    TSStatus failureStatus =
+        new 
TSStatus(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()).setMessage("redirect");
+    when(consensusManager.write(any())).thenReturn(failureStatus);
+
+    DataNodeRegisterResp resp =
+        (DataNodeRegisterResp) 
nodeManager.registerDataNode(generateDataNodeRegisterReq(1));
+
+    Assert.assertEquals(failureStatus, resp.getStatus());
+    verify(loadCache, never()).createNodeHeartbeatCache(eq(NodeType.DataNode), 
anyInt());
+    verify(clusterSchemaManager, never()).adjustMaxRegionGroupNum();
+  }
+
+  @Test
+  public void testRegisterDataNodeRollsBackWhenVersionWriteFails() throws 
ConsensusException {
+    TSStatus successStatus = new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    TSStatus failureStatus =
+        new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+            .setMessage("update failed");
+    when(consensusManager.write(any())).thenReturn(successStatus, 
failureStatus, successStatus);
+
+    DataNodeRegisterResp resp =
+        (DataNodeRegisterResp) 
nodeManager.registerDataNode(generateDataNodeRegisterReq(1));
+
+    Assert.assertEquals(
+        TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), 
resp.getStatus().getCode());
+    Assert.assertTrue(resp.getStatus().getMessage().contains("rolled back"));
+    verify(loadCache, never()).createNodeHeartbeatCache(eq(NodeType.DataNode), 
anyInt());
+    verify(clusterSchemaManager, never()).adjustMaxRegionGroupNum();
+
+    ArgumentCaptor<ConfigPhysicalPlan> planCaptor =
+        ArgumentCaptor.forClass(ConfigPhysicalPlan.class);
+    verify(consensusManager, Mockito.times(3)).write(planCaptor.capture());
+    Assert.assertTrue(planCaptor.getAllValues().get(0) instanceof 
RegisterDataNodePlan);
+    Assert.assertTrue(planCaptor.getAllValues().get(1) instanceof 
UpdateVersionInfoPlan);
+    Assert.assertTrue(planCaptor.getAllValues().get(2) instanceof 
RemoveDataNodePlan);
+  }
+
+  @Test
+  public void testRestartDataNodeReturnsFailureWhenUpdateWriteFails() throws 
ConsensusException {
+    final TDataNodeConfiguration registeredConfig = 
generateDataNodeConfiguration(1, "127.0.0.1");
+    nodeInfo.registerDataNode(new RegisterDataNodePlan(registeredConfig));
+    
when(clusterManager.getClusterIdWithRetry(anyLong())).thenReturn("cluster");
+
+    TSStatus failureStatus =
+        new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+            .setMessage("update failed");
+    when(consensusManager.write(any())).thenReturn(failureStatus);
+
+    final TDataNodeRestartReq req = new TDataNodeRestartReq();
+    req.setDataNodeConfiguration(generateDataNodeConfiguration(1, 
"127.0.0.2"));
+    req.setVersionInfo(new TNodeVersionInfo("version", "build"));
+
+    final TDataNodeRestartResp resp = 
nodeManager.updateDataNodeIfNecessary(req);
+
+    Assert.assertEquals(failureStatus, resp.getStatus());
+  }
+
+  @Test
+  public void testApplyConfigNodeRollsBackWhenVersionWriteFails() throws 
ConsensusException {
+    TSStatus successStatus = new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    TSStatus failureStatus =
+        new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+            .setMessage("apply failed");
+    when(consensusManager.write(any())).thenReturn(successStatus, 
failureStatus, successStatus);
+
+    try {
+      nodeManager.applyConfigNode(
+          new TConfigNodeLocation(
+              1, new TEndPoint("127.0.0.1", 10710), new TEndPoint("127.0.0.1", 
10720)),
+          new TNodeVersionInfo("version", "build"));
+      Assert.fail("Expected applyConfigNode to fail fast");
+    } catch (IllegalStateException e) {
+      Assert.assertTrue(e.getMessage().contains("rolled back"));
+    }
+
+    ArgumentCaptor<ConfigPhysicalPlan> planCaptor =
+        ArgumentCaptor.forClass(ConfigPhysicalPlan.class);
+    verify(consensusManager, Mockito.times(3)).write(planCaptor.capture());
+    Assert.assertTrue(planCaptor.getAllValues().get(0) instanceof 
ApplyConfigNodePlan);
+    Assert.assertTrue(planCaptor.getAllValues().get(1) instanceof 
UpdateVersionInfoPlan);
+    Assert.assertTrue(planCaptor.getAllValues().get(2) instanceof 
RemoveConfigNodePlan);
+  }
+
+  private TDataNodeRegisterReq generateDataNodeRegisterReq(int dataNodeId) {
+    final TDataNodeRegisterReq req = new TDataNodeRegisterReq();
+    req.setDataNodeConfiguration(generateDataNodeConfiguration(dataNodeId, 
"127.0.0.1"));
+    req.setVersionInfo(new TNodeVersionInfo("version", "build"));
+    return req;
+  }
+
+  private TDataNodeConfiguration generateDataNodeConfiguration(int dataNodeId, 
String ip) {
+    final TDataNodeLocation dataNodeLocation = new TDataNodeLocation();
+    dataNodeLocation.setDataNodeId(dataNodeId);
+    dataNodeLocation.setClientRpcEndPoint(new TEndPoint(ip, 6667 + 
dataNodeId));
+    dataNodeLocation.setInternalEndPoint(new TEndPoint(ip, 10730 + 
dataNodeId));
+    dataNodeLocation.setMPPDataExchangeEndPoint(new TEndPoint(ip, 10740 + 
dataNodeId));
+    dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint(ip, 10760 + 
dataNodeId));
+    dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint(ip, 10750 
+ dataNodeId));
+
+    final TDataNodeConfiguration dataNodeConfiguration = new 
TDataNodeConfiguration();
+    dataNodeConfiguration.setLocation(dataNodeLocation);
+    return dataNodeConfiguration;
+  }
+}
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
index afccb0c0eba..eccddcaa8d1 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
@@ -37,6 +37,7 @@ import 
org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchem
 import 
org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan;
 import 
org.apache.iotdb.confignode.consensus.response.partition.RegionInfoListResp;
+import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
 import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
 import 
org.apache.iotdb.confignode.persistence.partition.maintainer.RegionCreateTask;
 import 
org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDeleteTask;
@@ -267,6 +268,34 @@ public class PartitionInfoTest {
             });
   }
 
+  @Test
+  public void testRegionGroupCount() throws DatabaseNotExistsException {
+    partitionInfo.createDatabase(
+        new DatabaseSchemaPlan(
+            ConfigPhysicalPlanType.CreateDatabase, new 
TDatabaseSchema("root.region_count")));
+
+    CreateRegionGroupsPlan createRegionGroupsPlan = new 
CreateRegionGroupsPlan();
+    createRegionGroupsPlan.addRegionGroup(
+        "root.region_count",
+        generateTRegionReplicaSet(
+            testFlag.SchemaPartition.getFlag(),
+            generateTConsensusGroupId(
+                testFlag.SchemaPartition.getFlag(), 
TConsensusGroupType.SchemaRegion)));
+    createRegionGroupsPlan.addRegionGroup(
+        "root.region_count",
+        generateTRegionReplicaSet(
+            testFlag.DataPartition.getFlag(),
+            generateTConsensusGroupId(
+                testFlag.DataPartition.getFlag(), 
TConsensusGroupType.DataRegion)));
+    partitionInfo.createRegionGroups(createRegionGroupsPlan);
+
+    Assert.assertEquals(
+        1,
+        partitionInfo.getRegionGroupCount("root.region_count", 
TConsensusGroupType.SchemaRegion));
+    Assert.assertEquals(
+        1, partitionInfo.getRegionGroupCount("root.region_count", 
TConsensusGroupType.DataRegion));
+  }
+
   private TRegionReplicaSet generateTRegionReplicaSet(
       int startFlag, TConsensusGroupId tConsensusGroupId) {
     TRegionReplicaSet tRegionReplicaSet = new TRegionReplicaSet();

Reply via email to