This is an automated email from the ASF dual-hosted git repository.

CRZbulabula pushed a commit to branch 
fix-schema-region-partition-race-remove-message
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 14dc895d209d088869bc5e83c134e85784f4031d
Author: Yongzao <[email protected]>
AuthorDate: Tue Jun 9 15:46:20 2026 +0800

    Fix schema region visibility race and remove datanode message
---
 .../IoTDBRemoveDataNodeNormalIT.java               | 32 ++++++++++
 .../iotdb/confignode/i18n/ProcedureMessages.java   |  4 ++
 .../iotdb/confignode/i18n/ProcedureMessages.java   |  4 ++
 .../manager/partition/PartitionManager.java        | 68 +++++++++++++++++++++-
 .../procedure/env/ConfigNodeProcedureEnv.java      |  7 ++-
 .../procedure/env/RemoveDataNodeHandler.java       | 58 +++++++++++++-----
 .../impl/region/CreateRegionGroupsProcedure.java   | 10 +++-
 7 files changed, 165 insertions(+), 18 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeNormalIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeNormalIT.java
index 4ce11149070..304424198b6 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeNormalIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeNormalIT.java
@@ -135,6 +135,38 @@ public class IoTDBRemoveDataNodeNormalIT {
   // ConsensusFactory.IOT_CONSENSUS_V2);
   //  }
 
+  @Test
+  public void failWhenDataReplicationFactorIsOneUseSQL() throws Exception {
+    EnvFactory.getEnv()
+        .getConfig()
+        .getCommonConfig()
+        .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
+        .setSchemaReplicationFactor(3)
+        .setDataReplicationFactor(1)
+        .setDefaultDataRegionGroupNumPerDatabase(1);
+    EnvFactory.getEnv().initClusterEnvironment(1, 3);
+
+    try (final Connection connection = 
makeItCloseQuietly(EnvFactory.getEnv().getConnection());
+        final Statement statement = 
makeItCloseQuietly(connection.createStatement());
+        final ResultSet resultSet = statement.executeQuery(SHOW_DATANODES)) {
+      final Set<Integer> allDataNodeId = new HashSet<>();
+      while (resultSet.next()) {
+        allDataNodeId.add(resultSet.getInt(ColumnHeaderConstant.NODE_ID));
+      }
+
+      final String removeDataNodeSQL =
+          generateRemoveString(selectRemoveDataNodes(allDataNodeId, 1));
+      try {
+        statement.execute(removeDataNodeSQL);
+        Assert.fail("Remove DataNode should fail when data_replication_factor 
is 1");
+      } catch (final IoTDBSQLException e) {
+        Assert.assertTrue(e.getMessage(), 
e.getMessage().contains("data_replication_factor is 1"));
+        Assert.assertFalse(
+            e.getMessage(), e.getMessage().contains("Failed to remove all 
requested data nodes"));
+      }
+    }
+  }
+
   @Test
   public void fail1C3DTestIoTUseSQL() throws Exception {
     // Setup 1C3D with schema replication factor = 3, and remove 1D, this test 
should fail due to
diff --git 
a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ProcedureMessages.java
 
b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ProcedureMessages.java
index 085a803777e..ddce0ddd957 100644
--- 
a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ProcedureMessages.java
+++ 
b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ProcedureMessages.java
@@ -445,6 +445,10 @@ public final class ProcedureMessages {
       "Failed to push topic meta to dataNodes, details: %s";
   public static final String FAILED_TO_REMOVE_DATA_NODE_BECAUSE_IT_IS_NOT_IN =
       "Failed to remove data node {} because it is not in running and the 
configuration of cluster is one replication";
+
+  public static final String
+      FAILED_TO_REMOVE_DATA_NODE_BECAUSE_DATA_REPLICATION_FACTOR_IS_ONE =
+          "Cannot remove DataNode because data_replication_factor is 1 or at 
least one DataRegion has only one replica. Removing a DataNode may cause data 
loss. Increase data_replication_factor and ensure each DataRegion has more than 
one replica before removing DataNodes.";
   public static final String 
FAILED_TO_ROLLBACK_ALTER_PIPE_DETAILS_METADATA_WILL_BE_SYNCHRONIZED =
       "Failed to rollback alter pipe {}, details: {}, metadata will be 
synchronized later.";
   public static final String 
FAILED_TO_ROLLBACK_COMMIT_SET_TEMPLATE_ON_PATH_DUE_TO =
diff --git 
a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ProcedureMessages.java
 
b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ProcedureMessages.java
index d928f4f6bae..9b21d61cc74 100644
--- 
a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ProcedureMessages.java
+++ 
b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ProcedureMessages.java
@@ -445,6 +445,10 @@ public final class ProcedureMessages {
       "Failed to push topic meta to dataNodes, details: %s";
   public static final String FAILED_TO_REMOVE_DATA_NODE_BECAUSE_IT_IS_NOT_IN =
       "Failed to remove data node {} because it is not in running and the 
configuration of cluster is one replication";
+
+  public static final String
+      FAILED_TO_REMOVE_DATA_NODE_BECAUSE_DATA_REPLICATION_FACTOR_IS_ONE =
+          "不能移除 DataNode,因为 data_replication_factor 为 1,或至少存在一个 DataRegion 
只有一个副本。移除 DataNode 可能造成数据丢失。请先提高 data_replication_factor,并确保每个 DataRegion 
都有多个副本,再移除 DataNode。";
   public static final String 
FAILED_TO_ROLLBACK_ALTER_PIPE_DETAILS_METADATA_WILL_BE_SYNCHRONIZED =
       "Failed to rollback alter pipe {}, details: {}, metadata will be 
synchronized later.";
   public static final String 
FAILED_TO_ROLLBACK_COMMIT_SET_TEMPLATE_ON_PATH_DUE_TO =
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index 5be81256b5c..e4373e773a8 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -147,6 +147,9 @@ public class PartitionManager {
   public static final String CONSENSUS_WRITE_ERROR =
       "Failed in the write API executing the consensus layer due to: ";
 
+  private static final long REGION_GROUP_VISIBILITY_TIMEOUT_MS = 10_000L;
+  private static final long REGION_GROUP_VISIBILITY_CHECK_INTERVAL_MS = 20L;
+
   // Monitor for leadership change
   private final Object scheduleMonitor = new Object();
 
@@ -715,12 +718,75 @@ public class PartitionManager {
           getLoadManager().allocateRegionGroups(allotmentMap, 
consensusGroupType);
       
LOGGER.info(ManagerMessages.CREATEREGIONGROUPS_STARTING_TO_CREATE_THE_FOLLOWING_REGIONGROUPS);
       createRegionGroupsPlan.planLog(LOGGER);
-      return getProcedureManager().createRegionGroups(consensusGroupType, 
createRegionGroupsPlan);
+      final TSStatus createStatus =
+          getProcedureManager().createRegionGroups(consensusGroupType, 
createRegionGroupsPlan);
+      if (createStatus.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return createStatus;
+      }
+      return waitForRegionGroupsVisible(createRegionGroupsPlan, 
consensusGroupType);
     } else {
       return RpcUtils.SUCCESS_STATUS;
     }
   }
 
+  private TSStatus waitForRegionGroupsVisible(
+      final CreateRegionGroupsPlan createRegionGroupsPlan,
+      final TConsensusGroupType consensusGroupType) {
+    final Map<String, Set<TConsensusGroupId>> expectedRegionGroups = new 
HashMap<>();
+    createRegionGroupsPlan
+        .getRegionGroupMap()
+        .forEach(
+            (database, regionReplicaSets) -> {
+              final Set<TConsensusGroupId> regionGroupIds =
+                  regionReplicaSets.stream()
+                      .map(TRegionReplicaSet::getRegionId)
+                      .filter(regionGroupId -> 
consensusGroupType.equals(regionGroupId.getType()))
+                      .collect(Collectors.toSet());
+              if (!regionGroupIds.isEmpty()) {
+                expectedRegionGroups.put(database, regionGroupIds);
+              }
+            });
+
+    final long startTime = System.currentTimeMillis();
+    while (System.currentTimeMillis() - startTime <= 
REGION_GROUP_VISIBILITY_TIMEOUT_MS) {
+      if (areRegionGroupsVisible(expectedRegionGroups, consensusGroupType)) {
+        return RpcUtils.SUCCESS_STATUS;
+      }
+      try {
+        TimeUnit.MILLISECONDS.sleep(REGION_GROUP_VISIBILITY_CHECK_INTERVAL_MS);
+      } catch (final InterruptedException e) {
+        Thread.currentThread().interrupt();
+        return new TSStatus(TSStatusCode.CREATE_REGION_ERROR.getStatusCode())
+            .setMessage(
+                String.format(
+                    "Interrupted while waiting for created %s RegionGroups %s 
to become visible in PartitionInfo.",
+                    consensusGroupType, expectedRegionGroups));
+      }
+    }
+
+    final String message =
+        String.format(
+            "Created %s RegionGroups %s are not visible in PartitionInfo 
within %d ms.",
+            consensusGroupType, expectedRegionGroups, 
REGION_GROUP_VISIBILITY_TIMEOUT_MS);
+    LOGGER.warn(message);
+    return new 
TSStatus(TSStatusCode.CREATE_REGION_ERROR.getStatusCode()).setMessage(message);
+  }
+
+  private boolean areRegionGroupsVisible(
+      final Map<String, Set<TConsensusGroupId>> expectedRegionGroups,
+      final TConsensusGroupType consensusGroupType) {
+    for (final Map.Entry<String, Set<TConsensusGroupId>> entry : 
expectedRegionGroups.entrySet()) {
+      final Set<TConsensusGroupId> visibleRegionGroups =
+          partitionInfo.getRegionGroupSlotsCounter(entry.getKey(), 
consensusGroupType).stream()
+              .map(Pair::getRight)
+              .collect(Collectors.toSet());
+      if (!visibleRegionGroups.containsAll(entry.getValue())) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   /**
    * Only leader use this interface. Checks whether the specified 
DataPartition has a successor and
    * returns if it does.
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 960d0a7977f..8cd98f3286f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -504,12 +504,15 @@ public class ConfigNodeProcedureEnv {
     return clientHandler.getResponseList();
   }
 
-  public void persistRegionGroup(CreateRegionGroupsPlan 
createRegionGroupsPlan) {
+  public TSStatus persistRegionGroup(CreateRegionGroupsPlan 
createRegionGroupsPlan) {
     // Persist the allocation result
     try {
-      getConsensusManager().write(createRegionGroupsPlan);
+      return getConsensusManager().write(createRegionGroupsPlan);
     } catch (ConsensusException e) {
       LOG.warn("Failed in the write API executing the consensus layer due to: 
", e);
+      return new TSStatus(TSStatusCode.CREATE_REGION_ERROR.getStatusCode())
+          .setMessage(
+              "Failed to persist RegionGroup allocation in the consensus 
layer: " + e.getMessage());
     }
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeHandler.java
index 5b505ec001b..a7235747f71 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeHandler.java
@@ -559,6 +559,14 @@ public class RemoveDataNodeHandler {
     TSStatus status = new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     List<TDataNodeLocation> removedDataNodes = 
removeDataNodePlan.getDataNodeLocations();
 
+    if (hasSingleDataRegionReplica()) {
+      status.setCode(TSStatusCode.NO_ENOUGH_DATANODE.getStatusCode());
+      status.setMessage(
+          
ProcedureMessages.FAILED_TO_REMOVE_DATA_NODE_BECAUSE_DATA_REPLICATION_FACTOR_IS_ONE);
+      LOGGER.error(status.getMessage());
+      return status;
+    }
+
     int availableDatanodeSize =
         configManager
             .getNodeManager()
@@ -566,20 +574,26 @@ public class RemoveDataNodeHandler {
             .size();
     // when the configuration is one replication, it will be failed if the 
data node is not in
     // running state.
-    if (CONF.getSchemaReplicationFactor() == 1 || 
CONF.getDataReplicationFactor() == 1) {
-      for (TDataNodeLocation dataNodeLocation : removedDataNodes) {
-        // check whether removed data node is in running state
-        if (!NodeStatus.Running.equals(
-            
configManager.getLoadManager().getNodeStatus(dataNodeLocation.getDataNodeId())))
 {
-          removedDataNodes.remove(dataNodeLocation);
-          LOGGER.error(
-              
ProcedureMessages.FAILED_TO_REMOVE_DATA_NODE_BECAUSE_IT_IS_NOT_IN, 
dataNodeLocation);
-        }
-        if (removedDataNodes.isEmpty()) {
-          status.setCode(TSStatusCode.NO_ENOUGH_DATANODE.getStatusCode());
-          
status.setMessage(ProcedureMessages.FAILED_TO_REMOVE_ALL_REQUESTED_DATA_NODES);
-          return status;
-        }
+    if (CONF.getSchemaReplicationFactor() == 1) {
+      final List<TDataNodeLocation> notRunningDataNodes =
+          removedDataNodes.stream()
+              .filter(
+                  dataNodeLocation ->
+                      !NodeStatus.Running.equals(
+                          configManager
+                              .getLoadManager()
+                              
.getNodeStatus(dataNodeLocation.getDataNodeId())))
+              .collect(Collectors.toList());
+      notRunningDataNodes.forEach(
+          dataNodeLocation ->
+              LOGGER.error(
+                  
ProcedureMessages.FAILED_TO_REMOVE_DATA_NODE_BECAUSE_IT_IS_NOT_IN,
+                  dataNodeLocation));
+      removedDataNodes.removeAll(notRunningDataNodes);
+      if (removedDataNodes.isEmpty()) {
+        status.setCode(TSStatusCode.NO_ENOUGH_DATANODE.getStatusCode());
+        
status.setMessage(ProcedureMessages.FAILED_TO_REMOVE_ALL_REQUESTED_DATA_NODES);
+        return status;
       }
     }
 
@@ -604,6 +618,22 @@ public class RemoveDataNodeHandler {
     return status;
   }
 
+  private boolean hasSingleDataRegionReplica() {
+    return CONF.getDataReplicationFactor() == 1
+        || configManager
+            .getClusterSchemaManager()
+            .getMatchedDatabaseSchemasByName(
+                
configManager.getClusterSchemaManager().getDatabaseNames(null), null)
+            .values()
+            .stream()
+            .anyMatch(databaseSchema -> 
databaseSchema.getDataReplicationFactor() == 1)
+        || configManager
+            .getPartitionManager()
+            .getAllReplicaSets(TConsensusGroupType.DataRegion)
+            .stream()
+            .anyMatch(replicaSet -> replicaSet.getDataNodeLocationsSize() <= 
1);
+  }
+
   /**
    * Checks whether all DataNodes specified for deletion exist in the cluster.
    *
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java
index 2cb283d400e..e9cce807e77 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java
@@ -23,7 +23,9 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
@@ -36,10 +38,12 @@ import 
org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSamp
 import 
org.apache.iotdb.confignode.persistence.partition.maintainer.RegionCreateTask;
 import 
org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDeleteTask;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure;
 import org.apache.iotdb.confignode.procedure.state.CreateRegionGroupsState;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
 import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -173,7 +177,11 @@ public class CreateRegionGroupsProcedure
                           }
                         }));
 
-        env.persistRegionGroup(persistPlan);
+        final TSStatus persistStatus = env.persistRegionGroup(persistPlan);
+        if (persistStatus.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          setFailure(new ProcedureException(new 
IoTDBException(persistStatus)));
+          return Flow.NO_MORE_STATE;
+        }
         try {
           env.getConfigManager().getConsensusManager().write(offerPlan);
         } catch (final ConsensusException e) {

Reply via email to