This is an automated email from the ASF dual-hosted git repository.

CRZbulabula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 57a5ec6d534 Fix leader balance blocking region activation (#17755)
57a5ec6d534 is described below

commit 57a5ec6d534a9603cd6107929a2e7c0275516534
Author: Yongzao <[email protected]>
AuthorDate: Mon May 25 23:30:38 2026 +0800

    Fix leader balance blocking region activation (#17755)
---
 .../iotdb/confignode/i18n/ConfigNodeMessages.java  |   2 +-
 .../iotdb/confignode/i18n/ConfigNodeMessages.java  |   2 +-
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |   2 +-
 .../confignode/conf/ConfigNodeDescriptor.java      |   2 +-
 .../confignode/conf/ConfigNodeStartupCheck.java    |   2 +-
 .../manager/load/balancer/RouteBalancer.java       | 631 ++++++++++++++-------
 .../router/leader/AbstractLeaderBalancer.java      |   2 +-
 .../impl/region/CreateRegionGroupsProcedure.java   |  26 +-
 .../procedure/state/CreateRegionGroupsState.java   |   4 +
 9 files changed, 458 insertions(+), 215 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
 
b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
index 3a662932b61..c1c3e877de7 100644
--- 
a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
+++ 
b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
@@ -466,7 +466,7 @@ public final class ConfigNodeMessages {
   public static final String UNKNOWN_HOST_WHEN_CHECKING_SEED_CONFIGNODE_IP =
       "Unknown host when checking seed configNode IP {}";
   public static final String UNKNOWN_LEADER_DISTRIBUTION_POLICY =
-      "Unknown leader_distribution_policy: %s, please set to \"GREEDY\" or 
\"CFD\" or \"HASH\"";
+      "Unknown leader_distribution_policy: %s, please set to \"GREEDY\" or 
\"CFS\" or \"HASH\"";
   public static final String UNKNOWN_PHYSICALPLAN_CONFIGPHYSICALPLANTYPE =
       "unknown PhysicalPlan configPhysicalPlanType: ";
   public static final String UNKNOWN_READ_CONSISTENCY_LEVEL_PLEASE_SET_TO =
diff --git 
a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
 
b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
index b08332bb35c..6ca847626c8 100644
--- 
a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
+++ 
b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
@@ -462,7 +462,7 @@ public final class ConfigNodeMessages {
   public static final String UNKNOWN_HOST_WHEN_CHECKING_SEED_CONFIGNODE_IP =
       "Unknown host when checking seed configNode IP {}";
   public static final String UNKNOWN_LEADER_DISTRIBUTION_POLICY =
-      "未知 leader_distribution_policy:%s,请设置为 \"GREEDY\"、\"CFD\" 或 \"HASH\"";
+      "未知 leader_distribution_policy:%s,请设置为 \"GREEDY\"、\"CFS\" 或 \"HASH\"";
   public static final String UNKNOWN_PHYSICALPLAN_CONFIGPHYSICALPLANTYPE =
       "unknown PhysicalPlan configPhysicalPlanType: ";
   public static final String UNKNOWN_READ_CONSISTENCY_LEVEL_PLEASE_SET_TO =
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index aacbe20203f..1c0555affe3 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -226,7 +226,7 @@ public class ConfigNodeConfig {
   private double topologyProbingTimeoutRatio = 0.5;
 
   /** The policy of cluster RegionGroups' leader distribution. */
-  private String leaderDistributionPolicy = AbstractLeaderBalancer.CFD_POLICY;
+  private String leaderDistributionPolicy = AbstractLeaderBalancer.CFS_POLICY;
 
   /** Whether to enable auto leader balance for Ratis consensus protocol. */
   private boolean enableAutoLeaderBalanceForRatisConsensus = true;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index d6dc5eb4351..b6bf74edb31 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -369,7 +369,7 @@ public class ConfigNodeDescriptor {
     String leaderDistributionPolicy =
         properties.getProperty("leader_distribution_policy", 
conf.getLeaderDistributionPolicy());
     if (AbstractLeaderBalancer.GREEDY_POLICY.equals(leaderDistributionPolicy)
-        || AbstractLeaderBalancer.CFD_POLICY.equals(leaderDistributionPolicy)
+        || AbstractLeaderBalancer.CFS_POLICY.equals(leaderDistributionPolicy)
         || 
AbstractLeaderBalancer.HASH_POLICY.equals(leaderDistributionPolicy)) {
       conf.setLeaderDistributionPolicy(leaderDistributionPolicy);
     } else {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
index d3f6f15f1be..111f32bcdd3 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
@@ -161,7 +161,7 @@ public class ConfigNodeStartupCheck extends StartupChecks {
 
     // The leader distribution policy is limited
     if 
(!AbstractLeaderBalancer.GREEDY_POLICY.equals(CONF.getLeaderDistributionPolicy())
-        && 
!AbstractLeaderBalancer.CFD_POLICY.equals(CONF.getLeaderDistributionPolicy())
+        && 
!AbstractLeaderBalancer.CFS_POLICY.equals(CONF.getLeaderDistributionPolicy())
         && 
!AbstractLeaderBalancer.HASH_POLICY.equals(CONF.getLeaderDistributionPolicy())) 
{
       throw new ConfigurationException(
           ConfigNodeMessages.LEADER_DISTRIBUTION_POLICY,
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index e3954a59385..5429727f77b 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -60,18 +60,20 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
 /** The RouteBalancer guides the cluster RegionGroups' leader distribution and 
routing priority. */
@@ -104,10 +106,12 @@ public class RouteBalancer implements 
IClusterStatusSubscriber {
           ProcedureManager.PROCEDURE_WAIT_TIME_OUT - 
TimeUnit.SECONDS.toMillis(2),
           TimeUnit.SECONDS.toMillis(10));
   private static final long WAIT_PRIORITY_INTERVAL = 10;
+  private static final long RATIS_CHANGE_LEADER_RPC_TIMEOUT_IN_MS = 
TimeUnit.SECONDS.toMillis(10);
 
   private final IManager configManager;
-  // For generating optimal Region leader distribution
-  private final AbstractLeaderBalancer leaderBalancer;
+  // For serializing and generating optimal Region leader distribution by 
RegionGroup type
+  private final LeaderBalanceContext schemaRegionLeaderBalanceContext;
+  private final LeaderBalanceContext dataRegionLeaderBalanceContext;
   // For generating optimal cluster Region routing priority
   private final IPriorityBalancer priorityRouter;
 
@@ -121,29 +125,24 @@ public class RouteBalancer implements 
IClusterStatusSubscriber {
   private static final long BALANCE_RATIS_LEADER_FAILED_INTERVAL_IN_NS = 20 * 
1000L * 1000L * 1000L;
   private final Map<TConsensusGroupId, Long> lastFailedTimeForLeaderBalance;
 
-  private final Map<Integer, List<String>> lastBalancedOldLeaderId2RegionMap;
-  private Map<TConsensusGroupId, Integer> lastDataRegion2OldLeaderMap;
-  private Set<TConsensusGroupId> lastBalancedDataRegionSet;
-
   public RouteBalancer(IManager configManager) {
     this.configManager = configManager;
     this.priorityMapLock = new ReentrantReadWriteLock();
     this.regionPriorityMap = new TreeMap<>();
-    this.lastFailedTimeForLeaderBalance = new TreeMap<>();
-    this.lastBalancedOldLeaderId2RegionMap = new ConcurrentHashMap<>();
-
-    switch (CONF.getLeaderDistributionPolicy()) {
-      case AbstractLeaderBalancer.GREEDY_POLICY:
-        this.leaderBalancer = new GreedyLeaderBalancer();
-        break;
-      case AbstractLeaderBalancer.HASH_POLICY:
-        this.leaderBalancer = new HashLeaderBalancer();
-        break;
-      case AbstractLeaderBalancer.CFD_POLICY:
-      default:
-        this.leaderBalancer = new CostFlowSelectionLeaderBalancer();
-        break;
-    }
+    this.lastFailedTimeForLeaderBalance = new ConcurrentHashMap<>();
+
+    this.schemaRegionLeaderBalanceContext =
+        new LeaderBalanceContext(
+            TConsensusGroupType.SchemaRegion,
+            new ReentrantLock(),
+            createLeaderBalancer(),
+            SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS);
+    this.dataRegionLeaderBalanceContext =
+        new LeaderBalanceContext(
+            TConsensusGroupType.DataRegion,
+            new ReentrantLock(),
+            createLeaderBalancer(),
+            DATA_REGION_CONSENSUS_PROTOCOL_CLASS);
 
     switch (CONF.getRoutePriorityPolicy()) {
       case IPriorityBalancer.GREEDY_POLICY:
@@ -156,185 +155,404 @@ public class RouteBalancer implements 
IClusterStatusSubscriber {
     }
   }
 
-  /** Balance cluster RegionGroup leader distribution through configured 
algorithm. */
-  private synchronized void balanceRegionLeader() {
-    if (IS_ENABLE_AUTO_LEADER_BALANCE_FOR_SCHEMA_REGION) {
-      balanceRegionLeader(TConsensusGroupType.SchemaRegion, 
SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS);
+  /** Result of DataRegion leader balance used by the required post-balance 
actions. */
+  private static class DataRegionLeaderBalanceResult {
+
+    private final Map<TConsensusGroupId, Integer> dataRegion2OldLeaderMap;
+    private final Set<TConsensusGroupId> balancedDataRegionSet;
+    private final Map<Integer, List<String>> balancedOldLeaderId2RegionMap;
+
+    private DataRegionLeaderBalanceResult(
+        Map<TConsensusGroupId, Integer> dataRegion2OldLeaderMap,
+        Set<TConsensusGroupId> balancedDataRegionSet,
+        Map<Integer, List<String>> balancedOldLeaderId2RegionMap) {
+      this.dataRegion2OldLeaderMap = dataRegion2OldLeaderMap;
+      this.balancedDataRegionSet = balancedDataRegionSet;
+      this.balancedOldLeaderId2RegionMap = balancedOldLeaderId2RegionMap;
+    }
+
+    private static DataRegionLeaderBalanceResult empty() {
+      return new DataRegionLeaderBalanceResult(
+          Collections.emptyMap(), Collections.emptySet(), 
Collections.emptyMap());
+    }
+
+    private boolean hasBalancedDataRegion() {
+      return !balancedDataRegionSet.isEmpty();
+    }
+  }
+
+  /** Immutable state for one RegionGroup type's leader balance round. */
+  private static class LeaderBalanceContext {
+
+    private final TConsensusGroupType regionGroupType;
+    private final ReentrantLock leaderBalanceLock;
+    private final AbstractLeaderBalancer leaderBalancer;
+    private final String consensusProtocolClass;
+
+    private LeaderBalanceContext(
+        TConsensusGroupType regionGroupType,
+        ReentrantLock leaderBalanceLock,
+        AbstractLeaderBalancer leaderBalancer,
+        String consensusProtocolClass) {
+      this.regionGroupType = regionGroupType;
+      this.leaderBalanceLock = leaderBalanceLock;
+      this.leaderBalancer = leaderBalancer;
+      this.consensusProtocolClass = consensusProtocolClass;
+    }
+  }
+
+  /** Mutable accumulator for one leader balance round's transfer requests and 
cache updates. */
+  private static class LeaderTransferContext {
+
+    private final long currentTime;
+    private int requestId;
+    private final DataNodeAsyncRequestContext<TRegionLeaderChangeReq, 
TRegionLeaderChangeResp>
+        clientHandler;
+    private final Map<TConsensusGroupId, ConsensusGroupHeartbeatSample> 
successTransferMap;
+    private final Map<Integer, List<String>> balancedOldLeaderId2RegionMap;
+
+    private LeaderTransferContext(long currentTime) {
+      this.currentTime = currentTime;
+      this.requestId = 0;
+      this.clientHandler =
+          new 
DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.CHANGE_REGION_LEADER);
+      this.successTransferMap = new TreeMap<>();
+      this.balancedOldLeaderId2RegionMap = new HashMap<>();
+    }
+
+    private void putSuccessTransfer(TConsensusGroupId regionGroupId, long 
timestamp, int leaderId) {
+      successTransferMap.put(regionGroupId, new 
ConsensusGroupHeartbeatSample(timestamp, leaderId));
+    }
+
+    private void putRatisTransferRequest(
+        TConsensusGroupId regionGroupId, TDataNodeLocation newLeader) {
+      TRegionLeaderChangeReq regionLeaderChangeReq =
+          new TRegionLeaderChangeReq(regionGroupId, newLeader);
+      clientHandler.putRequest(requestId, regionLeaderChangeReq);
+      clientHandler.putNodeLocation(requestId, newLeader);
+      requestId++;
+    }
+
+    private boolean hasRatisTransferRequest() {
+      return requestId > 0;
+    }
+  }
+
+  /** Create a leader balancer instance according to the configured leader 
distribution policy. */
+  private AbstractLeaderBalancer createLeaderBalancer() {
+    switch (CONF.getLeaderDistributionPolicy()) {
+      case AbstractLeaderBalancer.GREEDY_POLICY:
+        return new GreedyLeaderBalancer();
+      case AbstractLeaderBalancer.HASH_POLICY:
+        return new HashLeaderBalancer();
+      case AbstractLeaderBalancer.CFS_POLICY:
+      default:
+        return new CostFlowSelectionLeaderBalancer();
+    }
+  }
+
+  /**
+   * Balance leaders for all enabled RegionGroup types.
+   *
+   * <p>If both SchemaRegion and DataRegion leader balance are enabled, the 
two types are balanced
+   * concurrently and serialized only by their own type-specific locks.
+   */
+  private DataRegionLeaderBalanceResult balanceAllEnabledRegionLeaders() {
+    return balanceSelectedRegionLeaders(
+        IS_ENABLE_AUTO_LEADER_BALANCE_FOR_SCHEMA_REGION,
+        IS_ENABLE_AUTO_LEADER_BALANCE_FOR_DATA_REGION);
+  }
+
+  /**
+   * Balance leaders only for the RegionGroup types included in the given 
change event.
+   *
+   * <p>This avoids letting an unrelated SchemaRegion change delay DataRegion 
leader selection, and
+   * vice versa.
+   */
+  private DataRegionLeaderBalanceResult balanceRegionLeadersForChangedGroups(
+      Set<TConsensusGroupId> regionGroupIds) {
+    final boolean shouldBalanceSchemaRegion =
+        IS_ENABLE_AUTO_LEADER_BALANCE_FOR_SCHEMA_REGION
+            && containsRegionType(regionGroupIds, 
TConsensusGroupType.SchemaRegion);
+    final boolean shouldBalanceDataRegion =
+        IS_ENABLE_AUTO_LEADER_BALANCE_FOR_DATA_REGION
+            && containsRegionType(regionGroupIds, 
TConsensusGroupType.DataRegion);
+    return balanceSelectedRegionLeaders(shouldBalanceSchemaRegion, 
shouldBalanceDataRegion);
+  }
+
+  /** Return whether the given RegionGroup set contains a RegionGroup of the 
specified type. */
+  private boolean containsRegionType(
+      Set<TConsensusGroupId> regionGroupIds, TConsensusGroupType 
regionGroupType) {
+    return regionGroupIds.stream()
+        .anyMatch(regionGroupId -> 
regionGroupType.equals(regionGroupId.getType()));
+  }
+
+  /**
+   * Balance the selected RegionGroup types.
+   *
+   * <p>When both types are selected, DataRegion balance runs in the current 
thread while
+   * SchemaRegion balance runs asynchronously. The method still waits for both 
rounds to finish
+   * before returning.
+   */
+  private DataRegionLeaderBalanceResult balanceSelectedRegionLeaders(
+      boolean shouldBalanceSchemaRegion, boolean shouldBalanceDataRegion) {
+    if (shouldBalanceSchemaRegion && shouldBalanceDataRegion) {
+      return balanceSchemaAndDataRegionLeaders();
+    }
+    if (shouldBalanceSchemaRegion) {
+      balanceRegionLeaders(schemaRegionLeaderBalanceContext);
+    }
+    return shouldBalanceDataRegion
+        ? balanceRegionLeaders(dataRegionLeaderBalanceContext)
+        : DataRegionLeaderBalanceResult.empty();
+  }
+
+  /**
+   * Balance SchemaRegion and DataRegion leaders in parallel while preserving 
serialization inside
+   * each RegionGroup type.
+   */
+  private DataRegionLeaderBalanceResult balanceSchemaAndDataRegionLeaders() {
+    CompletableFuture<Void> schemaRegionLeaderBalanceFuture =
+        CompletableFuture.runAsync(() -> 
balanceRegionLeaders(schemaRegionLeaderBalanceContext));
+    try {
+      return balanceRegionLeaders(dataRegionLeaderBalanceContext);
+    } finally {
+      schemaRegionLeaderBalanceFuture.join();
     }
-    if (IS_ENABLE_AUTO_LEADER_BALANCE_FOR_DATA_REGION) {
-      balanceRegionLeader(TConsensusGroupType.DataRegion, 
DATA_REGION_CONSENSUS_PROTOCOL_CLASS);
+  }
+
+  /** Balance leaders of one RegionGroup type under its dedicated 
serialization lock. */
+  private DataRegionLeaderBalanceResult balanceRegionLeaders(
+      LeaderBalanceContext leaderBalanceContext) {
+    leaderBalanceContext.leaderBalanceLock.lock();
+    try {
+      return balanceRegionLeadersInLock(leaderBalanceContext);
+    } finally {
+      leaderBalanceContext.leaderBalanceLock.unlock();
     }
   }
 
-  private void balanceRegionLeader(
-      TConsensusGroupType regionGroupType, String consensusProtocolClass) {
+  /**
+   * Generate the optimal leader distribution of one RegionGroup type and 
apply the required leader
+   * transfers.
+   */
+  private DataRegionLeaderBalanceResult balanceRegionLeadersInLock(
+      LeaderBalanceContext leaderBalanceContext) {
     // Collect the latest data and generate the optimal leader distribution
     Map<TConsensusGroupId, Integer> currentLeaderMap =
-        getLoadManager().getLoadCache().getRegionLeaderMap(regionGroupType);
+        
getLoadManager().getLoadCache().getRegionLeaderMap(leaderBalanceContext.regionGroupType);
     Map<TConsensusGroupId, Integer> optimalLeaderMap =
-        leaderBalancer.generateOptimalLeaderDistribution(
-            
getLoadManager().getLoadCache().getCurrentDatabaseRegionGroupMap(regionGroupType),
-            
getLoadManager().getLoadCache().getCurrentRegionLocationMap(regionGroupType),
+        leaderBalanceContext.leaderBalancer.generateOptimalLeaderDistribution(
+            getLoadManager()
+                .getLoadCache()
+                
.getCurrentDatabaseRegionGroupMap(leaderBalanceContext.regionGroupType),
+            getLoadManager()
+                .getLoadCache()
+                
.getCurrentRegionLocationMap(leaderBalanceContext.regionGroupType),
             currentLeaderMap,
             getLoadManager().getLoadCache().getCurrentDataNodeStatisticsMap(),
-            
getLoadManager().getLoadCache().getCurrentRegionStatisticsMap(regionGroupType));
-
-    // Transfer leader to the optimal distribution
-    long currentTime = System.nanoTime();
-    AtomicInteger requestId = new AtomicInteger(0);
-    DataNodeAsyncRequestContext<TRegionLeaderChangeReq, 
TRegionLeaderChangeResp> clientHandler =
-        new 
DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.CHANGE_REGION_LEADER);
-    Map<TConsensusGroupId, ConsensusGroupHeartbeatSample> successTransferMap = 
new TreeMap<>();
-    optimalLeaderMap.forEach(
-        (regionGroupId, newLeaderId) -> {
-          if (ConsensusFactory.RATIS_CONSENSUS.equals(consensusProtocolClass)
-              && currentTime - 
lastFailedTimeForLeaderBalance.getOrDefault(regionGroupId, 0L)
-                  <= BALANCE_RATIS_LEADER_FAILED_INTERVAL_IN_NS) {
-            return;
-          }
+            getLoadManager()
+                .getLoadCache()
+                
.getCurrentRegionStatisticsMap(leaderBalanceContext.regionGroupType));
 
-          int oldLeaderId = currentLeaderMap.get(regionGroupId);
-          if (newLeaderId != -1 && !newLeaderId.equals(oldLeaderId)) {
-            LOGGER.info(
-                
ManagerMessages.LEADERBALANCER_TRY_TO_CHANGE_THE_LEADER_OF_REGION_TO_DATANODE,
+    LeaderTransferContext leaderTransferContext = new 
LeaderTransferContext(System.nanoTime());
+    optimalLeaderMap.forEach(
+        (regionGroupId, newLeaderId) ->
+            collectLeaderTransfer(
+                leaderBalanceContext.regionGroupType,
+                leaderBalanceContext.consensusProtocolClass,
+                currentLeaderMap,
+                leaderTransferContext,
                 regionGroupId,
-                newLeaderId);
-            switch (consensusProtocolClass) {
-              case ConsensusFactory.IOT_CONSENSUS:
-              case ConsensusFactory.SIMPLE_CONSENSUS:
-                // For IoTConsensus or SimpleConsensus protocol, change
-                // RegionRouteMap is enough
-                successTransferMap.put(
-                    regionGroupId, new 
ConsensusGroupHeartbeatSample(currentTime, newLeaderId));
-                break;
-              case ConsensusFactory.IOT_CONSENSUS_V2:
-                // For IoTConsensusV2 protocol, change RegionRouteMap and 
execute flush on old
-                // region leader
-                successTransferMap.put(
-                    regionGroupId, new 
ConsensusGroupHeartbeatSample(currentTime, newLeaderId));
-                // Prepare data for flushOldLeader
-                if (oldLeaderId != -1) {
-                  lastBalancedOldLeaderId2RegionMap.compute(
-                      oldLeaderId,
-                      (k, v) -> {
-                        if (v == null) {
-                          List<String> value = new ArrayList<>();
-                          value.add(String.valueOf(regionGroupId.getId()));
-                          return value;
-                        }
-                        v.add(String.valueOf(regionGroupId.getId()));
-                        return v;
-                      });
-                }
-                break;
-              case ConsensusFactory.RATIS_CONSENSUS:
-              default:
-                // For ratis protocol, the ConfigNode-leader will send a 
changeLeaderRequest to the
-                // new
-                // leader.
-                // And the RegionRouteMap will be updated by 
Cluster-Heartbeat-Service later if
-                // change
-                // leader success.
-                // Force update region leader for ratis consensus when 
replication factor is 1.
-                if (TConsensusGroupType.SchemaRegion.equals(regionGroupType)
-                    && CONF.getSchemaReplicationFactor() == 1) {
-                  successTransferMap.put(
-                      regionGroupId, new ConsensusGroupHeartbeatSample(0, 
newLeaderId));
-                } else if 
(TConsensusGroupType.DataRegion.equals(regionGroupType)
-                    && CONF.getDataReplicationFactor() == 1) {
-                  successTransferMap.put(
-                      regionGroupId, new ConsensusGroupHeartbeatSample(0, 
newLeaderId));
-                } else {
-                  TDataNodeLocation newLeader =
-                      
getNodeManager().getRegisteredDataNode(newLeaderId).getLocation();
-                  TRegionLeaderChangeReq regionLeaderChangeReq =
-                      new TRegionLeaderChangeReq(regionGroupId, newLeader);
-                  int requestIndex = requestId.getAndIncrement();
-                  clientHandler.putRequest(requestIndex, 
regionLeaderChangeReq);
-                  clientHandler.putNodeLocation(requestIndex, newLeader);
-                }
-                break;
-            }
-          }
-        });
-    if (requestId.get() > 0) {
-      // Don't retry ChangeLeader request
-      
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(clientHandler);
-      for (int i = 0; i < requestId.get(); i++) {
-        if (clientHandler.getResponseMap().get(i).getStatus().getCode()
-            == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-          successTransferMap.put(
-              clientHandler.getRequest(i).getRegionId(),
-              new ConsensusGroupHeartbeatSample(
-                  
clientHandler.getResponseMap().get(i).getConsensusLogicalTimestamp(),
-                  
clientHandler.getRequest(i).getNewLeaderNode().getDataNodeId()));
-        } else {
-          lastFailedTimeForLeaderBalance.put(
-              clientHandler.getRequest(i).getRegionId(), currentTime);
-          LOGGER.error(
-              
ManagerMessages.LEADERBALANCER_FAILED_TO_CHANGE_THE_LEADER_OF_REGION_TO_DATANODE,
-              clientHandler.getRequest(i).getRegionId(),
-              clientHandler.getRequest(i).getNewLeaderNode().getDataNodeId());
-        }
+                newLeaderId));
+
+    sendRatisLeaderTransferRequests(leaderTransferContext);
+    
getLoadManager().forceUpdateConsensusGroupCache(leaderTransferContext.successTransferMap);
+
+    return 
TConsensusGroupType.DataRegion.equals(leaderBalanceContext.regionGroupType)
+        ? new DataRegionLeaderBalanceResult(
+            new HashMap<>(currentLeaderMap),
+            new HashSet<>(leaderTransferContext.successTransferMap.keySet()),
+            leaderTransferContext.balancedOldLeaderId2RegionMap)
+        : DataRegionLeaderBalanceResult.empty();
+  }
+
+  /** Collect one leader transfer into either an immediate cache update or a 
Ratis transfer RPC. */
+  private void collectLeaderTransfer(
+      TConsensusGroupType regionGroupType,
+      String consensusProtocolClass,
+      Map<TConsensusGroupId, Integer> currentLeaderMap,
+      LeaderTransferContext leaderTransferContext,
+      TConsensusGroupId regionGroupId,
+      Integer newLeaderId) {
+    if (shouldSkipRatisLeaderTransferAfterFailure(
+        consensusProtocolClass, regionGroupId, 
leaderTransferContext.currentTime)) {
+      return;
+    }
+
+    int oldLeaderId = currentLeaderMap.get(regionGroupId);
+    if (newLeaderId == -1 || newLeaderId.equals(oldLeaderId)) {
+      return;
+    }
+
+    LOGGER.info(
+        
ManagerMessages.LEADERBALANCER_TRY_TO_CHANGE_THE_LEADER_OF_REGION_TO_DATANODE,
+        regionGroupId,
+        newLeaderId);
+    switch (consensusProtocolClass) {
+      case ConsensusFactory.IOT_CONSENSUS:
+      case ConsensusFactory.SIMPLE_CONSENSUS:
+        // For IoTConsensus or SimpleConsensus protocol, changing 
RegionRouteMap is enough.
+        leaderTransferContext.putSuccessTransfer(
+            regionGroupId, leaderTransferContext.currentTime, newLeaderId);
+        break;
+      case ConsensusFactory.IOT_CONSENSUS_V2:
+        leaderTransferContext.putSuccessTransfer(
+            regionGroupId, leaderTransferContext.currentTime, newLeaderId);
+        recordOldLeaderForFlush(
+            leaderTransferContext.balancedOldLeaderId2RegionMap, 
regionGroupId, oldLeaderId);
+        break;
+      case ConsensusFactory.RATIS_CONSENSUS:
+      default:
+        collectRatisLeaderTransfer(
+            regionGroupType, leaderTransferContext, regionGroupId, 
newLeaderId);
+        break;
+    }
+  }
+
+  /** Return whether a Ratis leader transfer should be skipped because it 
recently failed. */
+  private boolean shouldSkipRatisLeaderTransferAfterFailure(
+      String consensusProtocolClass, TConsensusGroupId regionGroupId, long 
currentTime) {
+    return ConsensusFactory.RATIS_CONSENSUS.equals(consensusProtocolClass)
+        && currentTime - 
lastFailedTimeForLeaderBalance.getOrDefault(regionGroupId, 0L)
+            <= BALANCE_RATIS_LEADER_FAILED_INTERVAL_IN_NS;
+  }
+
+  /** Prepare a Ratis leader transfer, or force-update cache directly for 
single-replica groups. */
+  private void collectRatisLeaderTransfer(
+      TConsensusGroupType regionGroupType,
+      LeaderTransferContext leaderTransferContext,
+      TConsensusGroupId regionGroupId,
+      int newLeaderId) {
+    // The RegionRouteMap will be updated later by heartbeat if the transfer 
succeeds.
+    if (isSingleReplicaRegionGroup(regionGroupType)) {
+      leaderTransferContext.putSuccessTransfer(regionGroupId, 0, newLeaderId);
+      return;
+    }
+
+    TDataNodeLocation newLeader = 
getNodeManager().getRegisteredDataNode(newLeaderId).getLocation();
+    leaderTransferContext.putRatisTransferRequest(regionGroupId, newLeader);
+  }
+
+  /** Return whether the RegionGroup type has only one replica and therefore 
no transfer RPC. */
+  private boolean isSingleReplicaRegionGroup(TConsensusGroupType 
regionGroupType) {
+    return (TConsensusGroupType.SchemaRegion.equals(regionGroupType)
+            && CONF.getSchemaReplicationFactor() == 1)
+        || (TConsensusGroupType.DataRegion.equals(regionGroupType)
+            && CONF.getDataReplicationFactor() == 1);
+  }
+
+  /**
+   * Record the old DataRegion leader so IoTConsensusV2 can flush it after a 
successful transfer.
+   */
+  private void recordOldLeaderForFlush(
+      Map<Integer, List<String>> balancedOldLeaderId2RegionMap,
+      TConsensusGroupId regionGroupId,
+      int oldLeaderId) {
+    if (oldLeaderId != -1) {
+      balancedOldLeaderId2RegionMap
+          .computeIfAbsent(oldLeaderId, ignored -> new ArrayList<>())
+          .add(String.valueOf(regionGroupId.getId()));
+    }
+  }
+
+  /** Send collected Ratis leader transfer RPCs and record successful 
transfers in cache samples. */
+  private void sendRatisLeaderTransferRequests(LeaderTransferContext 
leaderTransferContext) {
+    if (!leaderTransferContext.hasRatisTransferRequest()) {
+      return;
+    }
+
+    // Don't retry ChangeLeader request.
+    CnToDnInternalServiceAsyncRequestManager.getInstance()
+        .sendAsyncRequest(
+            leaderTransferContext.clientHandler, 1, 
RATIS_CHANGE_LEADER_RPC_TIMEOUT_IN_MS);
+    for (int i = 0; i < leaderTransferContext.requestId; i++) {
+      TRegionLeaderChangeReq request = 
leaderTransferContext.clientHandler.getRequest(i);
+      TRegionLeaderChangeResp response =
+          leaderTransferContext.clientHandler.getResponseMap().get(i);
+      if (response != null
+          && response.getStatus().getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        leaderTransferContext.putSuccessTransfer(
+            request.getRegionId(),
+            response.getConsensusLogicalTimestamp(),
+            request.getNewLeaderNode().getDataNodeId());
+      } else {
+        lastFailedTimeForLeaderBalance.put(request.getRegionId(), 
System.nanoTime());
+        LOGGER.error(
+            
ManagerMessages.LEADERBALANCER_FAILED_TO_CHANGE_THE_LEADER_OF_REGION_TO_DATANODE,
+            request.getRegionId(),
+            request.getNewLeaderNode().getDataNodeId());
+      }
+    }
+  }
+
+  /** Invalidate schema cache on old DataRegion leaders after successful 
leader transfers. */
+  private void invalidateSchemaCacheOfOldLeaders(
+      DataRegionLeaderBalanceResult leaderBalanceResult) {
+    if (!IS_ENABLE_AUTO_LEADER_BALANCE_FOR_DATA_REGION
+        || !leaderBalanceResult.hasBalancedDataRegion()) {
+      return;
+    }
+
+    final DataNodeAsyncRequestContext<String, TSStatus> 
invalidateSchemaCacheRequestHandler =
+        new 
DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.INVALIDATE_LAST_CACHE);
+    int requestIndex = 0;
+    for (Map.Entry<TConsensusGroupId, Integer> entry :
+        leaderBalanceResult.dataRegion2OldLeaderMap.entrySet()) {
+      if (!leaderBalanceResult.balancedDataRegionSet.contains(entry.getKey())) 
{
+        continue;
       }
+      requestIndex =
+          addInvalidateSchemaCacheRequest(
+              invalidateSchemaCacheRequestHandler, requestIndex, 
entry.getKey(), entry.getValue());
+    }
+    CnToDnInternalServiceAsyncRequestManager.getInstance()
+        .sendAsyncRequest(invalidateSchemaCacheRequestHandler);
+  }
+
+  /** Add one invalidate-schema-cache request for the old leader of a balanced 
DataRegion. */
+  private int addInvalidateSchemaCacheRequest(
+      DataNodeAsyncRequestContext<String, TSStatus> 
invalidateSchemaCacheRequestHandler,
+      int requestIndex,
+      TConsensusGroupId consensusGroupId,
+      Integer dataNodeId) {
+    if (dataNodeId == null || dataNodeId == -1) {
+      return requestIndex;
     }
 
-    getLoadManager().forceUpdateConsensusGroupCache(successTransferMap);
-
-    // Prepare data for invalidSchemaCacheOfOldLeaders
-    if (regionGroupType.equals(TConsensusGroupType.DataRegion)) {
-      lastBalancedDataRegionSet = successTransferMap.keySet();
-      lastDataRegion2OldLeaderMap = currentLeaderMap;
-    }
-  }
-
-  private void invalidateSchemaCacheOfOldLeaders() {
-    BiConsumer<Map<TConsensusGroupId, Integer>, Set<TConsensusGroupId>> 
consumer =
-        (oldLeaderMap, successTransferSet) -> {
-          final DataNodeAsyncRequestContext<String, TSStatus> 
invalidateSchemaCacheRequestHandler =
-              new 
DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.INVALIDATE_LAST_CACHE);
-          final AtomicInteger requestIndex = new AtomicInteger(0);
-          oldLeaderMap.entrySet().stream()
-              .filter(entry -> successTransferSet.contains(entry.getKey()))
-              .forEach(
-                  entry -> {
-                    // set target
-                    final Integer dataNodeId = entry.getValue();
-                    if (dataNodeId == -1) {
-                      return;
-                    }
-                    final TDataNodeLocation dataNodeLocation =
-                        
getNodeManager().getRegisteredDataNode(dataNodeId).getLocation();
-                    if (dataNodeLocation == null) {
-                      
LOGGER.warn(ManagerMessages.DATANODELOCATION_IS_NULL_DATANODEID, dataNodeId);
-                      return;
-                    }
-                    invalidateSchemaCacheRequestHandler.putNodeLocation(
-                        requestIndex.get(), dataNodeLocation);
-                    // set req
-                    final TConsensusGroupId consensusGroupId = entry.getKey();
-                    final String database =
-                        
getPartitionManager().getRegionDatabase(consensusGroupId);
-                    
invalidateSchemaCacheRequestHandler.putRequest(requestIndex.get(), database);
-                    requestIndex.incrementAndGet();
-                  });
-          CnToDnInternalServiceAsyncRequestManager.getInstance()
-              .sendAsyncRequest(invalidateSchemaCacheRequestHandler);
-        };
-
-    if (IS_ENABLE_AUTO_LEADER_BALANCE_FOR_DATA_REGION) {
-      consumer.accept(lastDataRegion2OldLeaderMap, lastBalancedDataRegionSet);
-    }
-  }
-
-  private void flushOldLeaderIfIoTV2() {
+    TDataNodeConfiguration dataNodeConfiguration =
+        getNodeManager().getRegisteredDataNode(dataNodeId);
+    if (dataNodeConfiguration == null || dataNodeConfiguration.getLocation() 
== null) {
+      LOGGER.warn(ManagerMessages.DATANODELOCATION_IS_NULL_DATANODEID, 
dataNodeId);
+      return requestIndex;
+    }
+
+    invalidateSchemaCacheRequestHandler.putNodeLocation(
+        requestIndex, dataNodeConfiguration.getLocation());
+    invalidateSchemaCacheRequestHandler.putRequest(
+        requestIndex, 
getPartitionManager().getRegionDatabase(consensusGroupId));
+    return requestIndex + 1;
+  }
+
+  /** Flush old DataRegion leaders after IoTConsensusV2 leader transfers. */
+  private void flushOldLeaderIfIoTV2(DataRegionLeaderBalanceResult 
leaderBalanceResult) {
     if (!IS_ENABLE_AUTO_LEADER_BALANCE_FOR_DATA_REGION
-        || !Objects.equals(
-            DATA_REGION_CONSENSUS_PROTOCOL_CLASS, 
ConsensusFactory.IOT_CONSENSUS_V2)) {
+        || !Objects.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS, 
ConsensusFactory.IOT_CONSENSUS_V2)
+        || leaderBalanceResult.balancedOldLeaderId2RegionMap.isEmpty()) {
       return;
     }
 
-    BiConsumer<Integer, List<String>> consumer =
+    leaderBalanceResult.balancedOldLeaderId2RegionMap.forEach(
         (oldLeaderId, regionGroupIds) -> {
           TDataNodeConfiguration configuration =
               getNodeManager().getRegisteredDataNode(oldLeaderId);
@@ -358,21 +576,20 @@ public class RouteBalancer implements 
IClusterStatusSubscriber {
                 oldLeaderId,
                 regionGroupIds);
           }
-        };
-    lastBalancedOldLeaderId2RegionMap.forEach(consumer);
-    // after flush, clear map for next balance
-    lastBalancedOldLeaderId2RegionMap.clear();
+        });
   }
 
-  private synchronized void handleBalanceAction() {
-    invalidateSchemaCacheOfOldLeaders();
-    flushOldLeaderIfIoTV2();
+  /** Execute follow-up actions required after DataRegion leader balance. */
+  private void handleBalanceAction(DataRegionLeaderBalanceResult 
leaderBalanceResult) {
+    invalidateSchemaCacheOfOldLeaders(leaderBalanceResult);
+    flushOldLeaderIfIoTV2(leaderBalanceResult);
   }
 
-  public synchronized void balanceRegionLeaderAndPriority() {
-    balanceRegionLeader();
+  /** Balance leaders and routing priority immediately, then run DataRegion 
follow-up actions. */
+  public void balanceRegionLeaderAndPriority() {
+    DataRegionLeaderBalanceResult leaderBalanceResult = 
balanceAllEnabledRegionLeaders();
     balanceRegionPriority();
-    handleBalanceAction();
+    handleBalanceAction(leaderBalanceResult);
   }
 
   /** Balance cluster RegionGroup route priority through configured algorithm. 
*/
@@ -415,6 +632,7 @@ public class RouteBalancer implements 
IClusterStatusSubscriber {
     }
   }
 
+  /** Broadcast the latest Region route priority map to all available 
DataNodes. */
   private void broadcastLatestRegionPriorityMap() {
     // Broadcast the RegionRouteMap to all DataNodes except the unknown ones
     Map<Integer, TDataNodeLocation> dataNodeLocationMap =
@@ -435,6 +653,7 @@ public class RouteBalancer implements 
IClusterStatusSubscriber {
     
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
   }
 
+  /** Record Region route priority changes for later diagnosis. */
   private void recordRegionPriorityMap(
       Map<TConsensusGroupId, Pair<TRegionReplicaSet, TRegionReplicaSet>> 
differentPriorityMap) {
     LOGGER.info(ManagerMessages.REGIONPRIORITY_REGIONPRIORITYMAP);
@@ -458,6 +677,8 @@ public class RouteBalancer implements 
IClusterStatusSubscriber {
   }
 
   /**
+   * Return a snapshot of the current Region route priority map.
+   *
    * @return Map<RegionGroupId, RegionPriority>
    */
   public Map<TConsensusGroupId, TRegionReplicaSet> getRegionPriorityMap() {
@@ -469,6 +690,7 @@ public class RouteBalancer implements 
IClusterStatusSubscriber {
     }
   }
 
+  /** Remove one RegionGroup's route priority from the local cache. */
   public void removeRegionPriority(TConsensusGroupId regionGroupId) {
     priorityMapLock.writeLock().lock();
     try {
@@ -478,6 +700,7 @@ public class RouteBalancer implements 
IClusterStatusSubscriber {
     }
   }
 
+  /** Clear all cached Region route priorities. */
   public void clearRegionPriority() {
     priorityMapLock.writeLock().lock();
     try {
@@ -530,32 +753,44 @@ public class RouteBalancer implements 
IClusterStatusSubscriber {
         regionGroupIds);
   }
 
+  /** Return the NodeManager facade used by route balance operations. */
   private NodeManager getNodeManager() {
     return configManager.getNodeManager();
   }
 
+  /** Return the PartitionManager facade used by route balance operations. */
   private PartitionManager getPartitionManager() {
     return configManager.getPartitionManager();
   }
 
+  /** Return the LoadManager facade used by route balance operations. */
   private LoadManager getLoadManager() {
     return configManager.getLoadManager();
   }
 
+  /** Trigger leader balance when DataNode-level load statistics change. */
   @Override
   public void onNodeStatisticsChanged(NodeStatisticsChangeEvent event) {
-    balanceRegionLeader();
+    handleBalanceAction(balanceAllEnabledRegionLeaders());
   }
 
+  /**
+   * Trigger leader balance only for RegionGroup types present in RegionGroup 
statistics changes.
+   */
   @Override
   public void onRegionGroupStatisticsChanged(RegionGroupStatisticsChangeEvent 
event) {
-    balanceRegionLeader();
+    handleBalanceAction(
+        balanceRegionLeadersForChangedGroups(
+            event.getDifferentRegionGroupStatisticsMap().keySet()));
   }
 
+  /** Trigger leader balance and route priority update after consensus leader 
statistics change. */
   @Override
   public void 
onConsensusGroupStatisticsChanged(ConsensusGroupStatisticsChangeEvent event) {
-    balanceRegionLeader();
+    DataRegionLeaderBalanceResult leaderBalanceResult =
+        balanceRegionLeadersForChangedGroups(
+            event.getDifferentConsensusGroupStatisticsMap().keySet());
     balanceRegionPriority();
-    handleBalanceAction();
+    handleBalanceAction(leaderBalanceResult);
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java
index b4086e2dbbd..595cf9a0691 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java
@@ -40,7 +40,7 @@ public abstract class AbstractLeaderBalancer {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractLeaderBalancer.class);
   public static final String GREEDY_POLICY = "GREEDY";
-  public static final String CFD_POLICY = "CFD";
+  public static final String CFS_POLICY = "CFS";
   public static final String HASH_POLICY = "HASH";
 
   // Set<RegionGroupId>
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java
index 472ddd019b8..2cb283d400e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java
@@ -180,6 +180,21 @@ public class CreateRegionGroupsProcedure
           LOGGER.warn(
               
ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE, 
e);
         }
+        setNextState(CreateRegionGroupsState.REBALANCE_DATA_PARTITION_POLICY);
+        break;
+      case REBALANCE_DATA_PARTITION_POLICY:
+        if (TConsensusGroupType.DataRegion.equals(consensusGroupType)) {
+          // Re-balance all corresponding DataPartitionPolicyTable before the 
newly created
+          // RegionGroups become available for serving partitions.
+          persistPlan
+              .getRegionGroupMap()
+              .keySet()
+              .forEach(
+                  database ->
+                      env.getConfigManager()
+                          .getLoadManager()
+                          .reBalanceDataPartitionPolicy(database));
+        }
         setNextState(CreateRegionGroupsState.ACTIVATE_REGION_GROUPS);
         break;
       case ACTIVATE_REGION_GROUPS:
@@ -240,17 +255,6 @@ public class CreateRegionGroupsProcedure
         setNextState(CreateRegionGroupsState.CREATE_REGION_GROUPS_FINISH);
         break;
       case CREATE_REGION_GROUPS_FINISH:
-        if (TConsensusGroupType.DataRegion.equals(consensusGroupType)) {
-          // Re-balance all corresponding DataPartitionPolicyTable
-          persistPlan
-              .getRegionGroupMap()
-              .keySet()
-              .forEach(
-                  database ->
-                      env.getConfigManager()
-                          .getLoadManager()
-                          .reBalanceDataPartitionPolicy(database));
-        }
         return Flow.NO_MORE_STATE;
     }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateRegionGroupsState.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateRegionGroupsState.java
index 4ff90132f59..43cdc4b3f6e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateRegionGroupsState.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateRegionGroupsState.java
@@ -31,6 +31,10 @@ public enum CreateRegionGroupsState {
   // 3. Delete redundant RegionReplicas in contrast to case 2.
   SHUNT_REGION_REPLICAS,
 
+  // Re-balance the DataPartitionPolicyTable for the affected databases so 
that the newly
+  // created DataRegionGroups are taken into account before they start serving 
partitions.
+  REBALANCE_DATA_PARTITION_POLICY,
+
   // Mark RegionGroupCache as available for those RegionGroups that created 
successfully.
   // For DataRegionGroups that use iot consensus protocol, select leader by 
the way
   ACTIVATE_REGION_GROUPS,


Reply via email to