This is an automated email from the ASF dual-hosted git repository.

hxpserein pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new b1ae6dc5cfb [To dev/1.3][remove datanode] cherry pick related commits 
(#16711)
b1ae6dc5cfb is described below

commit b1ae6dc5cfb2aa866d54daecf88c3d30c4ccc548
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Fri Nov 7 19:15:43 2025 +0800

    [To dev/1.3][remove datanode] cherry pick related commits (#16711)
    
    * [remove datanode] GCR load balancing implement for removing datanode 
(#15282)
    
    (cherry picked from commit 7650b4793474b11405698bbe9290f814ef3afc06)
    
    * [remove datanode] Fix IoTDBRemoveDataNodeNormalIT #15429
    
    (cherry picked from commit 953780620df07eb6282d5391c0069fabd63cb40a)
    
    * [remove datanode] Accelerate GCR load balancing implement (#15535)
    
    (cherry picked from commit 51bad1ec88d5d20aa4e84f3ccd6841fbfba75390)
    
    * [remove datanode] Fix ArrayIndexOutOfBoundsException in 
computeInitialDbLoad (#15718)
    
    (cherry picked from commit 346ee720e7a4a561e28bc210f9ec6111c45a1a46)
---
 .../region/GreedyCopySetRegionGroupAllocator.java  | 361 +++++++++++++++++++-
 .../region/GreedyRegionGroupAllocator.java         |  13 +
 .../balancer/region/IRegionGroupAllocator.java     |  21 ++
 ...artiteGraphReplicationRegionGroupAllocator.java |  13 +
 .../procedure/env/RemoveDataNodeHandler.java       | 186 +++++++++++
 .../impl/node/RemoveDataNodesProcedure.java        |   6 +-
 .../GreedyCopySetRemoveNodeReplicaSelectTest.java  | 370 +++++++++++++++++++++
 7 files changed, 951 insertions(+), 19 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
index 2577455d41c..ae5f1c3eb44 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
@@ -27,10 +27,12 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import static java.util.Map.Entry.comparingByValue;
@@ -39,7 +41,7 @@ import static java.util.Map.Entry.comparingByValue;
 public class GreedyCopySetRegionGroupAllocator implements 
IRegionGroupAllocator {
 
   private static final Random RANDOM = new Random();
-  private static final int GCR_MAX_OPTIMAL_PLAN_NUM = 100;
+  private static final int GCR_MAX_OPTIMAL_PLAN_NUM = 10;
 
   private int replicationFactor;
   // Sorted available DataNodeIds
@@ -50,16 +52,35 @@ public class GreedyCopySetRegionGroupAllocator implements 
IRegionGroupAllocator
   private int[] databaseRegionCounter;
   // The number of 2-Region combinations in current cluster
   private int[][] combinationCounter;
+  // The initial load for each database on each datanode
+  private Map<String, int[]> initialDbLoad;
 
   // First Key: the sum of Regions at the DataNodes in the allocation result 
is minimal
-  int optimalRegionSum;
+  private int optimalRegionSum;
   // Second Key: the sum of Regions at the DataNodes within the same Database
   // in the allocation result is minimal
-  int optimalDatabaseRegionSum;
+  private int optimalDatabaseRegionSum;
   // Third Key: the sum of overlapped 2-Region combination Regions with
   // other allocated RegionGroups is minimal
-  int optimalCombinationSum;
-  List<int[]> optimalReplicaSets;
+  private int optimalCombinationSum;
+  private List<int[]> optimalReplicaSets;
+
+  // Pre-calculation, scatterDelta[i][j] means the scatter increment between 
region i and the old
+  // replica set when region i is placed on node j
+  private int[][] scatterDelta;
+  // For each region, the allowed candidate destination node IDs.
+  private Map<TConsensusGroupId, List<Integer>> allowedCandidatesMap;
+  // A list of regions that need to be migrated.
+  private List<TConsensusGroupId> dfsRegionKeys;
+  // A mapping from each region identifier to its corresponding database name.
+  private Map<TConsensusGroupId, String> regionDatabaseMap;
+  // Buffer holding best assignment arrays.
+  private int[] bestAssignment;
+  // An int array holding the best metrics found so far: [maxGlobalLoad, 
maxDatabaseLoad,
+  // scatterValue].
+  private int[] bestMetrics;
+  // dfsRemoveNodeReplica batch size
+  private static final int BATCH_SIZE = 12;
 
   private static class DataNodeEntry {
 
@@ -103,12 +124,9 @@ public class GreedyCopySetRegionGroupAllocator implements 
IRegionGroupAllocator
       int replicationFactor,
       TConsensusGroupId consensusGroupId) {
     try {
-      prepare(
-          replicationFactor,
-          availableDataNodeMap,
-          allocatedRegionGroups,
-          databaseAllocatedRegionGroups);
-      dfs(-1, 0, new int[replicationFactor], 0, 0);
+      this.replicationFactor = replicationFactor;
+      prepare(availableDataNodeMap, allocatedRegionGroups, 
databaseAllocatedRegionGroups);
+      dfsAllocateReplica(-1, 0, new int[replicationFactor], 0, 0);
 
       // Randomly pick one optimal plan as result
       Collections.shuffle(optimalReplicaSets);
@@ -125,21 +143,332 @@ public class GreedyCopySetRegionGroupAllocator 
implements IRegionGroupAllocator
     }
   }
 
+  @Override
+  public Map<TConsensusGroupId, TDataNodeConfiguration> 
removeNodeReplicaSelect(
+      Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
+      Map<Integer, Double> freeDiskSpaceMap,
+      List<TRegionReplicaSet> allocatedRegionGroups,
+      Map<TConsensusGroupId, String> regionDatabaseMap,
+      Map<String, List<TRegionReplicaSet>> databaseAllocatedRegionGroupMap,
+      Map<TConsensusGroupId, TRegionReplicaSet> remainReplicasMap) {
+    try {
+      // 1. prepare: compute regionCounter, databaseRegionCounter, and 
combinationCounter
+
+      prepare(availableDataNodeMap, allocatedRegionGroups, 
Collections.emptyList());
+      computeInitialDbLoad(availableDataNodeMap, 
databaseAllocatedRegionGroupMap);
+
+      // 2. Build allowed candidate set for each region that needs to be 
migrated.
+      // For each region in remainReplicasMap, the candidate destination nodes 
are all nodes in
+      // availableDataNodeMap
+      // excluding those already in the remain replica set.
+      List<TConsensusGroupId> regionKeys = new 
ArrayList<>(remainReplicasMap.keySet());
+      allowedCandidatesMap = new HashMap<>();
+      this.regionDatabaseMap = regionDatabaseMap;
+      for (TConsensusGroupId regionId : regionKeys) {
+        TRegionReplicaSet remainReplicaSet = remainReplicasMap.get(regionId);
+        Set<Integer> notAllowedNodes =
+            remainReplicaSet.getDataNodeLocations().stream()
+                .map(TDataNodeLocation::getDataNodeId)
+                .collect(Collectors.toSet());
+
+        // Allowed candidates are the nodes not in the exclusion set
+        List<Integer> candidates =
+            availableDataNodeMap.keySet().stream()
+                .filter(nodeId -> !notAllowedNodes.contains(nodeId))
+                .sorted(
+                    (a, b) -> {
+                      int cmp = Integer.compare(regionCounter[a], 
regionCounter[b]);
+                      return (cmp != 0)
+                          ? cmp
+                          : Integer.compare(databaseRegionCounter[a], 
databaseRegionCounter[b]);
+                    })
+                .collect(Collectors.toList());
+        Collections.shuffle(candidates);
+
+        // Sort candidates in ascending order of current global load 
(regionCounter)
+        allowedCandidatesMap.put(regionId, candidates);
+      }
+
+      // Optionally, sort regionKeys by the size of its candidate list 
(smaller candidate sets
+      // first)
+      regionKeys.sort(Comparator.comparingInt(id -> 
allowedCandidatesMap.get(id).size()));
+
+      // 3. Batch DFS
+      Map<TConsensusGroupId, TDataNodeConfiguration> result = new HashMap<>();
+
+      for (int start = 0; start < regionKeys.size(); start += BATCH_SIZE) {
+        dfsRegionKeys = regionKeys.subList(start, Math.min(start + BATCH_SIZE, 
regionKeys.size()));
+        int batchSize = dfsRegionKeys.size();
+
+        // Initialize buffer
+        bestAssignment = new int[batchSize];
+        // bestMetrics holds the best found metrics: [maxGlobalLoad, 
maxDatabaseLoad, scatterValue].
+        bestMetrics = new int[] {Integer.MAX_VALUE, Integer.MAX_VALUE, 
Integer.MAX_VALUE};
+        // currentAssignment holds the candidate nodeId chosen for the region 
at that index
+        int[] currentAssignment = new int[batchSize];
+        // additionalLoad holds the number of extra regions assigned to each 
node in this migration
+        // solution.
+        int[] additionalLoad = new int[regionCounter.length];
+
+        scatterDelta = new int[batchSize][regionCounter.length];
+        for (int r = 0; r < batchSize; r++) {
+          TConsensusGroupId regionId = dfsRegionKeys.get(r);
+          for (int nodeId : allowedCandidatesMap.get(regionId)) {
+            int inc = 0;
+            for (TDataNodeLocation loc : 
remainReplicasMap.get(regionId).getDataNodeLocations()) {
+              inc += combinationCounter[nodeId][loc.getDataNodeId()];
+            }
+            scatterDelta[r][nodeId] = inc;
+          }
+        }
+
+        int currentMaxGlobalLoad = 0;
+        for (int nodeId = 0; nodeId < regionCounter.length; nodeId++) {
+          currentMaxGlobalLoad = Math.max(currentMaxGlobalLoad, 
regionCounter[nodeId]);
+        }
+
+        dfsRemoveNodeReplica(0, currentMaxGlobalLoad, 0, currentAssignment, 
additionalLoad);
+
+        if (bestMetrics[0] == Integer.MAX_VALUE) {
+          // This should not happen if there is at least one valid assignment
+          return Collections.emptyMap();
+        }
+
+        for (int i = 0; i < batchSize; i++) {
+          TConsensusGroupId regionId = dfsRegionKeys.get(i);
+          int chosenNodeId = bestAssignment[i];
+          result.put(regionId, availableDataNodeMap.get(chosenNodeId));
+
+          regionCounter[chosenNodeId]++;
+          String db = regionDatabaseMap.get(regionId);
+          if (db != null) {
+            int[] dbLoad = initialDbLoad.computeIfAbsent(db, k -> new 
int[regionCounter.length]);
+            dbLoad[chosenNodeId]++;
+          }
+          for (TDataNodeLocation loc : 
remainReplicasMap.get(regionId).getDataNodeLocations()) {
+            combinationCounter[chosenNodeId][loc.getDataNodeId()]++;
+            combinationCounter[loc.getDataNodeId()][chosenNodeId]++;
+          }
+        }
+      }
+      return result;
+    } finally {
+      // Clear any temporary state to avoid impacting subsequent calls
+      clear();
+    }
+  }
+
+  /**
+   * DFS method that searches for migration target assignments.
+   *
+   * <p>It enumerates all possible assignments (one candidate for each region) 
and collects
+   * candidate solutions in the optimalAssignments buffer. The evaluation 
metrics for each complete
+   * assignment (i.e. when index == regionKeys.size()) are:
+   *
+   * <p>1. Max global load: the maximum over nodes of (regionCounter[node] + 
additionalLoad[node])
+   * 2. Max database load: the maximum over nodes of 
(databaseRegionCounter[node] +
+   * additionalLoad[node]) 3. Scatter value: computed per region, summing the 
combinationCounter for
+   * every pair in the complete replica set. The complete replica set for a 
region includes nodes in
+   * its remain replica set plus the newly assigned node.
+   *
+   * <p>The candidates are compared lexicographically (first by global load, 
then by database load,
+   * then by scatter). When a better candidate is found, the 
optimalAssignments buffer is cleared
+   * and updated; if the new candidate matches the best found metrics, it is 
added to the buffer.
+   *
+   * <p>DFS search is pruned if the optimalAssignments buffer reaches CAPACITY.
+   *
+   * @param index Current DFS level, corresponding to regionKeys.get(index)
+   * @param currentMaxGlobalLoad The maximum global load across all data nodes.
+   * @param currentScatter The scatter value for the complete assignment.
+   * @param currentAssignment Current partial assignment; its length equals 
the number of regions.
+   * @param additionalLoad Extra load currently assigned to each node.
+   */
+  private void dfsRemoveNodeReplica(
+      int index,
+      int currentMaxGlobalLoad,
+      int currentScatter,
+      int[] currentAssignment,
+      int[] additionalLoad) {
+    // Compute the maximum global load and maximum database load among all 
nodes that received
+    // additional load.
+    int[] currentMetrics = getCurrentMetrics(additionalLoad, currentScatter, 
currentAssignment);
+    // Lexicographically compare currentMetrics with bestMetrics.
+    // If currentMetrics is better than bestMetrics, update bestMetrics and 
clear the candidate
+    // buffer.
+    boolean isBetter = false;
+    boolean isEqual = true;
+    for (int i = 0; i < 3; i++) {
+      if (currentMetrics[i] < bestMetrics[i]) {
+        isBetter = true;
+        isEqual = false;
+        break;
+      } else if (currentMetrics[i] > bestMetrics[i]) {
+        isEqual = false;
+        break;
+      }
+    }
+    if (!isBetter && !isEqual) {
+      return;
+    }
+
+    if (index == dfsRegionKeys.size()) {
+      if (isBetter) {
+        bestMetrics[0] = currentMetrics[0];
+        bestMetrics[1] = currentMetrics[1];
+        bestMetrics[2] = currentMetrics[2];
+        System.arraycopy(currentAssignment, 0, bestAssignment, 0, 
dfsRegionKeys.size());
+      }
+      return;
+    }
+
+    // Process the region at the current index.
+    TConsensusGroupId regionId = dfsRegionKeys.get(index);
+    List<Integer> candidates = allowedCandidatesMap.get(regionId);
+    for (Integer candidate : candidates) {
+      currentAssignment[index] = candidate;
+      currentScatter += scatterDelta[index][currentAssignment[index]];
+      additionalLoad[candidate]++;
+      int nextMaxGlobalLoad =
+          Math.max(currentMaxGlobalLoad, regionCounter[candidate] + 
additionalLoad[candidate]);
+
+      dfsRemoveNodeReplica(
+          index + 1, nextMaxGlobalLoad, currentScatter, currentAssignment, 
additionalLoad);
+      // Backtrack
+      additionalLoad[candidate]--;
+      currentScatter -= scatterDelta[index][currentAssignment[index]];
+    }
+  }
+
+  /**
+   * Computes the squared sum of the maximum load for each database.
+   *
+   * <p>For each database, this method calculates the maximum load on any data 
node by summing the
+   * initial load (from {@code initialDbLoad}) with the additional load 
assigned during migration
+   * (accumulated in {@code currentAssignment}), and then squares this maximum 
load. Finally, it
+   * returns the sum of these squared maximum loads across all databases.
+   *
+   * @param currentAssignment an array where each element is the nodeId 
assigned for the
+   *     corresponding region in {@code regionKeys}.
+   * @param regionKeys a list of region identifiers (TConsensusGroupId) 
representing the regions
+   *     under migration.
+   * @param regionDatabaseMap a mapping from each region identifier to its 
corresponding database
+   *     name.
+   * @return the sum of the squares of the maximum loads computed for each 
database.
+   */
+  private int computeDatabaseLoadSquaredSum(
+      int[] currentAssignment,
+      List<TConsensusGroupId> regionKeys,
+      Map<TConsensusGroupId, String> regionDatabaseMap) {
+    Map<String, int[]> extraLoadPerDb = new HashMap<>();
+    // Initialize extra load counters for each database using the number of 
nodes from
+    // regionCounter.
+    for (String db : initialDbLoad.keySet()) {
+      extraLoadPerDb.put(db, new int[regionCounter.length]);
+    }
+    // Accumulate extra load per database based on the current assignment.
+    for (int i = 0; i < regionKeys.size(); i++) {
+      TConsensusGroupId regionId = regionKeys.get(i);
+      String db = regionDatabaseMap.get(regionId);
+      int nodeId = currentAssignment[i];
+      extraLoadPerDb.get(db)[nodeId]++;
+    }
+    int sumSquared = 0;
+    // For each database, compute the maximum load across nodes and add its 
square to the sum.
+    for (String db : initialDbLoad.keySet()) {
+      int[] initLoads = initialDbLoad.get(db);
+      int[] extras = extraLoadPerDb.get(db);
+      int maxLoad = 0;
+      for (int nodeId = 0; nodeId < regionCounter.length; nodeId++) {
+        int load = initLoads[nodeId] + extras[nodeId];
+        if (load > maxLoad) {
+          maxLoad = load;
+        }
+      }
+      sumSquared += maxLoad * maxLoad;
+    }
+    return sumSquared;
+  }
+
+  /**
+   * Computes the current migration metrics.
+   *
+   * <p>This method calculates three key metrics:
+   *
+   * <ol>
+   *   <li><strong>Max Global Load:</strong> The maximum load among all nodes, 
computed as the sum
+   *       of the initial region load (from {@code regionCounter}) and the 
additional load (from
+   *       {@code additionalLoad}).
+   *   <li><strong>Database Load Squared Sum:</strong> The squared sum of the 
maximum load per
+   *       database, which is computed by {@link 
#computeDatabaseLoadSquaredSum(int[], List, Map)}.
+   *   <li><strong>Scatter Value:</strong> A provided metric that reflects 
additional balancing
+   *       criteria.
+   * </ol>
+   *
+   * The metrics are returned as an array of three integers in the order: 
[maxGlobalLoad,
+   * databaseLoadSquaredSum, scatterValue].
+   *
+   * @param additionalLoad an array representing the additional load assigned 
to each node during
+   *     migration.
+   * @param currentScatter the current scatter value metric.
+   * @param currentAssignment an array where each element is the nodeId 
assigned for the
+   *     corresponding region in {@code regionKeys}.
+   * @return an integer array of size 3: [maxGlobalLoad, 
databaseLoadSquaredSum, scatterValue].
+   */
+  private int[] getCurrentMetrics(
+      int[] additionalLoad, int currentScatter, int[] currentAssignment) {
+    int currentMaxGlobalLoad = 0;
+    // Calculate the maximum global load across all data nodes.
+    for (int nodeId = 0; nodeId < additionalLoad.length; nodeId++) {
+      int globalLoad = regionCounter[nodeId] + additionalLoad[nodeId];
+      currentMaxGlobalLoad = Math.max(currentMaxGlobalLoad, globalLoad);
+    }
+    // Compute the database load squared sum using the helper method.
+    int dbLoadSquaredSum =
+        computeDatabaseLoadSquaredSum(currentAssignment, dfsRegionKeys, 
regionDatabaseMap);
+    // Build current metrics in order [maxGlobalLoad, maxDatabaseLoad, 
scatterValue]
+    return new int[] {currentMaxGlobalLoad, dbLoadSquaredSum, currentScatter};
+  }
+
+  /**
+   * Compute the initial load for each database on each data node.
+   *
+   * @param availableDataNodeMap currently available DataNodes, ensure size() 
>= replicationFactor
+   * @param databaseAllocatedRegionGroupMap Mapping of each database to its 
list of replica sets.
+   */
+  private void computeInitialDbLoad(
+      Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
+      Map<String, List<TRegionReplicaSet>> databaseAllocatedRegionGroupMap) {
+    initialDbLoad = new HashMap<>();
+
+    // Iterate over each database and count the number of regions on each data 
node across all its
+    // replica sets.
+    for (String database : databaseAllocatedRegionGroupMap.keySet()) {
+      List<TRegionReplicaSet> replicaSets = 
databaseAllocatedRegionGroupMap.get(database);
+      int[] load = new int[regionCounter.length];
+      for (TRegionReplicaSet replicaSet : replicaSets) {
+        for (TDataNodeLocation location : replicaSet.getDataNodeLocations()) {
+          int nodeId = location.getDataNodeId();
+          if (availableDataNodeMap.containsKey(nodeId)) {
+            load[nodeId]++;
+          }
+        }
+      }
+      initialDbLoad.put(database, load);
+    }
+  }
+
   /**
    * Prepare some statistics before dfs.
    *
-   * @param replicationFactor replication factor in the cluster
    * @param availableDataNodeMap currently available DataNodes, ensure size() 
>= replicationFactor
    * @param allocatedRegionGroups already allocated RegionGroups in the cluster
    * @param databaseAllocatedRegionGroups already allocated RegionGroups in 
the same Database
    */
   private void prepare(
-      int replicationFactor,
       Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
       List<TRegionReplicaSet> allocatedRegionGroups,
       List<TRegionReplicaSet> databaseAllocatedRegionGroups) {
 
-    this.replicationFactor = replicationFactor;
     // Store the maximum DataNodeId
     int maxDataNodeId =
         Math.max(
@@ -225,7 +554,7 @@ public class GreedyCopySetRegionGroupAllocator implements 
IRegionGroupAllocator
    *     current allocation plan
    * @param regionSum the sum of Regions at the DataNodes in the current 
allocation plan
    */
-  private void dfs(
+  private void dfsAllocateReplica(
       int lastIndex,
       int currentReplica,
       int[] currentReplicaSet,
@@ -274,7 +603,7 @@ public class GreedyCopySetRegionGroupAllocator implements 
IRegionGroupAllocator
     for (int i = lastIndex + 1; i < dataNodeIds.length; i++) {
       // Decide the next DataNodeId in the allocation plan
       currentReplicaSet[currentReplica] = dataNodeIds[i];
-      dfs(
+      dfsAllocateReplica(
           i,
           currentReplica + 1,
           currentReplicaSet,
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java
index d05a8accbec..9c8f032cee0 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java
@@ -57,6 +57,19 @@ public class GreedyRegionGroupAllocator implements 
IRegionGroupAllocator {
         
weightList.stream().limit(replicationFactor).collect(Collectors.toList()));
   }
 
+  @Override
+  public Map<TConsensusGroupId, TDataNodeConfiguration> 
removeNodeReplicaSelect(
+      Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
+      Map<Integer, Double> freeDiskSpaceMap,
+      List<TRegionReplicaSet> allocatedRegionGroups,
+      Map<TConsensusGroupId, String> regionDatabaseMap,
+      Map<String, List<TRegionReplicaSet>> databaseAllocatedRegionGroupMap,
+      Map<TConsensusGroupId, TRegionReplicaSet> remainReplicasMap) {
+    // TODO: Implement this method
+    throw new UnsupportedOperationException(
+        "The removeNodeReplicaSelect method of GreedyRegionGroupAllocator is 
yet to be implemented.");
+  }
+
   private List<TDataNodeLocation> buildWeightList(
       Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
       Map<Integer, Double> freeDiskSpaceMap,
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/IRegionGroupAllocator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/IRegionGroupAllocator.java
index 554168d8497..24200548163 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/IRegionGroupAllocator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/IRegionGroupAllocator.java
@@ -47,4 +47,25 @@ public interface IRegionGroupAllocator {
       List<TRegionReplicaSet> databaseAllocatedRegionGroups,
       int replicationFactor,
       TConsensusGroupId consensusGroupId);
+
+  /**
+   * Select the optimal DataNode to place the new replica on along with the 
remaining replica set.
+   *
+   * @param availableDataNodeMap DataNodes that can be used for allocation
+   * @param freeDiskSpaceMap The free disk space of the DataNodes
+   * @param allocatedRegionGroups Allocated RegionGroups
+   * @param regionDatabaseMap A mapping from each region identifier to its 
corresponding database
+   *     name
+   * @param databaseAllocatedRegionGroupMap Allocated RegionGroups within the 
same Database with the
+   *     replica set
+   * @param remainReplicasMap the remaining replica set excluding the removed 
DataNodes
+   * @return The optimal DataNode to place the new replica on along with the 
remaining replicas
+   */
+  Map<TConsensusGroupId, TDataNodeConfiguration> removeNodeReplicaSelect(
+      Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
+      Map<Integer, Double> freeDiskSpaceMap,
+      List<TRegionReplicaSet> allocatedRegionGroups,
+      Map<TConsensusGroupId, String> regionDatabaseMap,
+      Map<String, List<TRegionReplicaSet>> databaseAllocatedRegionGroupMap,
+      Map<TConsensusGroupId, TRegionReplicaSet> remainReplicasMap);
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphReplicationRegionGroupAllocator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphReplicationRegionGroupAllocator.java
index 1205e0abc06..4a2237805ad 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphReplicationRegionGroupAllocator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphReplicationRegionGroupAllocator.java
@@ -113,6 +113,19 @@ public class PartiteGraphReplicationRegionGroupAllocator 
implements IRegionGroup
     return result;
   }
 
+  @Override
+  public Map<TConsensusGroupId, TDataNodeConfiguration> 
removeNodeReplicaSelect(
+      Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
+      Map<Integer, Double> freeDiskSpaceMap,
+      List<TRegionReplicaSet> allocatedRegionGroups,
+      Map<TConsensusGroupId, String> regionDatabaseMap,
+      Map<String, List<TRegionReplicaSet>> databaseAllocatedRegionGroupMap,
+      Map<TConsensusGroupId, TRegionReplicaSet> remainReplicasMap) {
+    // TODO: Implement this method
+    throw new UnsupportedOperationException(
+        "The removeNodeReplicaSelect method of 
PartiteGraphPlacementRegionGroupAllocator is yet to be implemented.");
+  }
+
   private void prepare(
       int replicationFactor,
       Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeHandler.java
index eaa16f47907..980dd712eb0 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeHandler.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.confignode.procedure.env;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
@@ -37,6 +38,8 @@ import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import 
org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
 import 
org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
 import org.apache.iotdb.confignode.manager.ConfigManager;
+import 
org.apache.iotdb.confignode.manager.load.balancer.region.GreedyCopySetRegionGroupAllocator;
+import 
org.apache.iotdb.confignode.manager.load.balancer.region.IRegionGroupAllocator;
 import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
 import 
org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
 import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
@@ -51,10 +54,13 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static 
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
@@ -70,8 +76,22 @@ public class RemoveDataNodeHandler {
 
   private final ConfigManager configManager;
 
+  private final IRegionGroupAllocator regionGroupAllocator;
+
   public RemoveDataNodeHandler(ConfigManager configManager) {
     this.configManager = configManager;
+
+    switch 
(ConfigNodeDescriptor.getInstance().getConf().getRegionGroupAllocatePolicy()) {
+      case GREEDY:
+        this.regionGroupAllocator = new GreedyCopySetRegionGroupAllocator();
+        break;
+      case PGR:
+        this.regionGroupAllocator = new GreedyCopySetRegionGroupAllocator();
+        break;
+      case GCR:
+      default:
+        this.regionGroupAllocator = new GreedyCopySetRegionGroupAllocator();
+    }
   }
 
   /**
@@ -193,6 +213,172 @@ public class RemoveDataNodeHandler {
     return regionMigrationPlans;
   }
 
+  /**
+   * Retrieves all region migration plans for the specified removed DataNodes 
and selects the
+   * destination.
+   *
+   * @param removedDataNodes the list of DataNodes from which to obtain 
migration plans
+   * @return a list of region migration plans associated with the removed 
DataNodes
+   */
+  public List<RegionMigrationPlan> selectedRegionMigrationPlans(
+      List<TDataNodeLocation> removedDataNodes) {
+
+    Set<Integer> removedDataNodesSet = new HashSet<>();
+    for (TDataNodeLocation removedDataNode : removedDataNodes) {
+      removedDataNodesSet.add(removedDataNode.dataNodeId);
+    }
+
+    final List<TDataNodeConfiguration> availableDataNodes =
+        configManager
+            .getNodeManager()
+            .filterDataNodeThroughStatus(NodeStatus.Running, 
NodeStatus.Unknown)
+            .stream()
+            .filter(node -> 
!removedDataNodesSet.contains(node.getLocation().getDataNodeId()))
+            .collect(Collectors.toList());
+
+    List<RegionMigrationPlan> regionMigrationPlans = new ArrayList<>();
+
+    regionMigrationPlans.addAll(
+        selectMigrationPlans(availableDataNodes, 
TConsensusGroupType.DataRegion, removedDataNodes));
+
+    regionMigrationPlans.addAll(
+        selectMigrationPlans(
+            availableDataNodes, TConsensusGroupType.SchemaRegion, 
removedDataNodes));
+
+    return regionMigrationPlans;
+  }
+
+  public List<RegionMigrationPlan> selectMigrationPlans(
+      List<TDataNodeConfiguration> availableDataNodes,
+      TConsensusGroupType consensusGroupType,
+      List<TDataNodeLocation> removedDataNodes) {
+
+    // Retrieve all allocated replica sets for the given consensus group type
+    List<TRegionReplicaSet> allocatedReplicaSets =
+        
configManager.getPartitionManager().getAllReplicaSets(consensusGroupType);
+
+    // Step 1: Identify affected replica sets and record the removed DataNode 
for each replica set
+    Map<TConsensusGroupId, TDataNodeLocation> removedNodeMap = new HashMap<>();
+    Set<TRegionReplicaSet> affectedReplicaSets =
+        identifyAffectedReplicaSets(allocatedReplicaSets, removedDataNodes, 
removedNodeMap);
+
+    // Step 2: Update affected replica sets by removing the removed DataNode
+    updateReplicaSets(allocatedReplicaSets, affectedReplicaSets, 
removedNodeMap);
+
+    // Build a mapping of available DataNodes and their free disk space 
(computed only once)
+    Map<Integer, TDataNodeConfiguration> availableDataNodeMap =
+        buildAvailableDataNodeMap(availableDataNodes);
+    Map<Integer, Double> freeDiskSpaceMap = 
buildFreeDiskSpaceMap(availableDataNodes);
+
+    // Step 3: For each affected replica set, select a new destination 
DataNode and create a
+    // migration plan
+    List<RegionMigrationPlan> migrationPlans = new ArrayList<>();
+
+    Map<TConsensusGroupId, TRegionReplicaSet> remainReplicasMap = new 
HashMap<>();
+    Map<TConsensusGroupId, String> regionDatabaseMap = new HashMap<>();
+    Map<String, List<TRegionReplicaSet>> databaseAllocatedRegionGroupMap = new 
HashMap<>();
+
+    for (TRegionReplicaSet replicaSet : affectedReplicaSets) {
+      remainReplicasMap.put(replicaSet.getRegionId(), replicaSet);
+      String database =
+          
configManager.getPartitionManager().getRegionDatabase(replicaSet.getRegionId());
+      List<TRegionReplicaSet> databaseAllocatedReplicaSets =
+          configManager.getPartitionManager().getAllReplicaSets(database, 
consensusGroupType);
+      regionDatabaseMap.put(replicaSet.getRegionId(), database);
+      databaseAllocatedRegionGroupMap.put(database, 
databaseAllocatedReplicaSets);
+    }
+
+    Map<TConsensusGroupId, TDataNodeConfiguration> result =
+        regionGroupAllocator.removeNodeReplicaSelect(
+            availableDataNodeMap,
+            freeDiskSpaceMap,
+            allocatedReplicaSets,
+            regionDatabaseMap,
+            databaseAllocatedRegionGroupMap,
+            remainReplicasMap);
+
+    for (TConsensusGroupId regionId : result.keySet()) {
+
+      TDataNodeConfiguration selectedNode = result.get(regionId);
+      LOGGER.info(
+          "Selected DataNode {} for Region {}",
+          selectedNode.getLocation().getDataNodeId(),
+          regionId);
+
+      // Create the migration plan
+      RegionMigrationPlan plan = RegionMigrationPlan.create(regionId, 
removedNodeMap.get(regionId));
+      plan.setToDataNode(selectedNode.getLocation());
+      migrationPlans.add(plan);
+    }
+    return migrationPlans;
+  }
+
+  /**
+   * Identifies affected replica sets from allocatedReplicaSets that contain 
any DataNode in
+   * removedDataNodes, and records the removed DataNode for each replica set.
+   */
+  private Set<TRegionReplicaSet> identifyAffectedReplicaSets(
+      List<TRegionReplicaSet> allocatedReplicaSets,
+      List<TDataNodeLocation> removedDataNodes,
+      Map<TConsensusGroupId, TDataNodeLocation> removedNodeMap) {
+
+    Set<TRegionReplicaSet> affectedReplicaSets = new HashSet<>();
+    // Create a copy of allocatedReplicaSets to avoid concurrent modifications
+    List<TRegionReplicaSet> allocatedCopy = new 
ArrayList<>(allocatedReplicaSets);
+
+    for (TDataNodeLocation removedNode : removedDataNodes) {
+      allocatedCopy.stream()
+          .filter(replicaSet -> 
replicaSet.getDataNodeLocations().contains(removedNode))
+          .forEach(
+              replicaSet -> {
+                removedNodeMap.put(replicaSet.getRegionId(), removedNode);
+                affectedReplicaSets.add(replicaSet);
+              });
+    }
+    return affectedReplicaSets;
+  }
+
+  /**
+   * Updates each affected replica set by removing the removed DataNode from 
its list. The
+   * allocatedReplicaSets list is updated accordingly.
+   */
+  private void updateReplicaSets(
+      List<TRegionReplicaSet> allocatedReplicaSets,
+      Set<TRegionReplicaSet> affectedReplicaSets,
+      Map<TConsensusGroupId, TDataNodeLocation> removedNodeMap) {
+    for (TRegionReplicaSet replicaSet : affectedReplicaSets) {
+      // Remove the replica set, update its node list, then re-add it
+      allocatedReplicaSets.remove(replicaSet);
+      
replicaSet.getDataNodeLocations().remove(removedNodeMap.get(replicaSet.getRegionId()));
+      allocatedReplicaSets.add(replicaSet);
+    }
+  }
+
+  /**
+   * Constructs a mapping from DataNodeId to TDataNodeConfiguration from the 
available DataNodes.
+   */
+  private Map<Integer, TDataNodeConfiguration> buildAvailableDataNodeMap(
+      List<TDataNodeConfiguration> availableDataNodes) {
+    return availableDataNodes.stream()
+        .collect(
+            Collectors.toMap(
+                dataNode -> dataNode.getLocation().getDataNodeId(), 
Function.identity()));
+  }
+
+  /** Constructs a mapping of free disk space for each DataNode. */
+  private Map<Integer, Double> buildFreeDiskSpaceMap(
+      List<TDataNodeConfiguration> availableDataNodes) {
+    Map<Integer, Double> freeDiskSpaceMap = new 
HashMap<>(availableDataNodes.size());
+    availableDataNodes.forEach(
+        dataNode ->
+            freeDiskSpaceMap.put(
+                dataNode.getLocation().getDataNodeId(),
+                configManager
+                    .getLoadManager()
+                    
.getFreeDiskSpace(dataNode.getLocation().getDataNodeId())));
+    return freeDiskSpaceMap;
+  }
+
   /**
    * Broadcasts DataNodes' status change, preventing disabled DataNodes from 
accepting read or write
    * requests.
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodesProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodesProcedure.java
index 68b0550e0df..a531d67955c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodesProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodesProcedure.java
@@ -121,7 +121,8 @@ public class RemoveDataNodesProcedure extends 
AbstractNodeProcedure<RemoveDataNo
           removedDataNodes.forEach(
               dataNode -> removedNodeStatusMap.put(dataNode.getDataNodeId(), 
NodeStatus.Removing));
           removeDataNodeHandler.changeDataNodeStatus(removedDataNodes, 
removedNodeStatusMap);
-          regionMigrationPlans = 
removeDataNodeHandler.getRegionMigrationPlans(removedDataNodes);
+          regionMigrationPlans =
+              
removeDataNodeHandler.selectedRegionMigrationPlans(removedDataNodes);
           LOG.info(
               "{}, DataNode regions to be removed is {}",
               REMOVE_DATANODE_PROCESS,
@@ -165,8 +166,7 @@ public class RemoveDataNodesProcedure extends 
AbstractNodeProcedure<RemoveDataNo
         regionMigrationPlan -> {
           TConsensusGroupId regionId = regionMigrationPlan.getRegionId();
           TDataNodeLocation removedDataNode = 
regionMigrationPlan.getFromDataNode();
-          TDataNodeLocation destDataNode =
-              env.getRegionMaintainHandler().findDestDataNode(regionId);
+          TDataNodeLocation destDataNode = regionMigrationPlan.getToDataNode();
           // TODO: need to improve the coordinator selection method here, 
maybe through load
           // balancing and other means.
           final TDataNodeLocation coordinatorForAddPeer =
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRemoveNodeReplicaSelectTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRemoveNodeReplicaSelectTest.java
new file mode 100644
index 00000000000..e28b4dda18f
--- /dev/null
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRemoveNodeReplicaSelectTest.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.balancer.region;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class GreedyCopySetRemoveNodeReplicaSelectTest {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(GreedyCopySetRemoveNodeReplicaSelectTest.class);
+
+  private static final IRegionGroupAllocator GCR_ALLOCATOR =
+      new GreedyCopySetRegionGroupAllocator();
+
+  private static final TDataNodeLocation REMOVE_DATANODE_LOCATION =
+      new TDataNodeLocation().setDataNodeId(5);
+
+  private static final int TEST_DATA_NODE_NUM = 5;
+
+  private static final int DATA_REGION_PER_DATA_NODE = 30;
+
+  private static final int DATA_REPLICATION_FACTOR = 2;
+
+  private static final Map<Integer, TDataNodeConfiguration> 
AVAILABLE_DATA_NODE_MAP =
+      new HashMap<>();
+
+  private static final Map<Integer, Double> FREE_SPACE_MAP = new HashMap<>();
+
+  @Before
+  public void setUp() {
+    // Construct TEST_DATA_NODE_NUM DataNodes
+    AVAILABLE_DATA_NODE_MAP.clear();
+    FREE_SPACE_MAP.clear();
+    for (int i = 1; i <= TEST_DATA_NODE_NUM; i++) {
+      AVAILABLE_DATA_NODE_MAP.put(
+          i, new TDataNodeConfiguration().setLocation(new 
TDataNodeLocation().setDataNodeId(i)));
+      FREE_SPACE_MAP.put(i, Math.random());
+    }
+  }
+
+  @Test
+  public void testSelectDestNode() {
+    final int dataRegionGroupNum =
+        DATA_REGION_PER_DATA_NODE * TEST_DATA_NODE_NUM / 
DATA_REPLICATION_FACTOR;
+
+    List<TRegionReplicaSet> allocateResult = new ArrayList<>();
+    List<TRegionReplicaSet> databaseAllocateResult = new ArrayList<>();
+    for (int index = 0; index < dataRegionGroupNum; index++) {
+      TRegionReplicaSet replicaSet =
+          GCR_ALLOCATOR.generateOptimalRegionReplicasDistribution(
+              AVAILABLE_DATA_NODE_MAP,
+              FREE_SPACE_MAP,
+              allocateResult,
+              allocateResult,
+              DATA_REPLICATION_FACTOR,
+              new TConsensusGroupId(TConsensusGroupType.DataRegion, index));
+      TRegionReplicaSet replicaSetCopy = new TRegionReplicaSet(replicaSet);
+
+      allocateResult.add(replicaSet);
+      databaseAllocateResult.add(replicaSetCopy);
+    }
+
+    List<TRegionReplicaSet> migratedReplicas =
+        allocateResult.stream()
+            .filter(
+                replicaSet -> 
replicaSet.getDataNodeLocations().contains(REMOVE_DATANODE_LOCATION))
+            .collect(Collectors.toList());
+
+    AVAILABLE_DATA_NODE_MAP.remove(REMOVE_DATANODE_LOCATION.getDataNodeId());
+    FREE_SPACE_MAP.remove(REMOVE_DATANODE_LOCATION.getDataNodeId());
+
+    List<TRegionReplicaSet> remainReplicas = new ArrayList<>();
+    for (TRegionReplicaSet replicaSet : migratedReplicas) {
+      List<TDataNodeLocation> dataNodeLocations = 
replicaSet.getDataNodeLocations();
+      allocateResult.remove(replicaSet);
+      dataNodeLocations.remove(REMOVE_DATANODE_LOCATION);
+      allocateResult.add(replicaSet);
+      remainReplicas.add(replicaSet);
+    }
+
+    Map<Integer, Integer> randomRegionCounter = new HashMap<>();
+    Map<Integer, Integer> PGPRegionCounter = new HashMap<>();
+    Set<Integer> randomSelectedNodeIds = new HashSet<>();
+    Set<Integer> PGPSelectedNodeIds = new HashSet<>();
+
+    int randomMaxRegionCount = 0;
+    int randomMinRegionCount = Integer.MAX_VALUE;
+    int PGPMaxRegionCount = 0;
+    int PGPMinRegionCount = Integer.MAX_VALUE;
+
+    AVAILABLE_DATA_NODE_MAP
+        .keySet()
+        .forEach(
+            nodeId -> {
+              randomRegionCounter.put(nodeId, 0);
+              PGPRegionCounter.put(nodeId, 0);
+            });
+
+    for (TRegionReplicaSet replicaSet : allocateResult) {
+      for (TDataNodeLocation loc : replicaSet.getDataNodeLocations()) {
+        randomRegionCounter.put(
+            loc.getDataNodeId(), randomRegionCounter.get(loc.getDataNodeId()) 
+ 1);
+        PGPRegionCounter.put(loc.getDataNodeId(), 
PGPRegionCounter.get(loc.getDataNodeId()) + 1);
+      }
+    }
+
+    for (TRegionReplicaSet remainReplicaSet : remainReplicas) {
+      TDataNodeLocation selectedNode =
+          
randomSelectNodeForRegion(remainReplicaSet.getDataNodeLocations()).get();
+      LOGGER.info(
+          "Random Selected DataNode {} for Region {}",
+          selectedNode.getDataNodeId(),
+          remainReplicaSet.regionId);
+      randomSelectedNodeIds.add(selectedNode.getDataNodeId());
+      randomRegionCounter.put(
+          selectedNode.getDataNodeId(), 
randomRegionCounter.get(selectedNode.getDataNodeId()) + 1);
+    }
+
+    LOGGER.info("Remain Replicas... :");
+    for (TRegionReplicaSet remainReplicaSet : remainReplicas) {
+      LOGGER.info("Region Group Id: {}", remainReplicaSet.regionId.id);
+      List<TDataNodeLocation> dataNodeLocations = 
remainReplicaSet.getDataNodeLocations();
+      for (TDataNodeLocation dataNodeLocation : dataNodeLocations) {
+        LOGGER.info("DataNode: {}", dataNodeLocation.getDataNodeId());
+      }
+    }
+    Map<TConsensusGroupId, TRegionReplicaSet> remainReplicasMap = new 
HashMap<>();
+    Map<String, List<TRegionReplicaSet>> databaseAllocatedRegionGroupMap = new 
HashMap<>();
+    databaseAllocatedRegionGroupMap.put("database", databaseAllocateResult);
+
+    for (TRegionReplicaSet remainReplicaSet : remainReplicas) {
+      remainReplicasMap.put(remainReplicaSet.getRegionId(), remainReplicaSet);
+    }
+    Map<TConsensusGroupId, String> regionDatabaseMap = new HashMap<>();
+    for (TRegionReplicaSet replicaSet : allocateResult) {
+      regionDatabaseMap.put(replicaSet.getRegionId(), "database");
+    }
+    Map<TConsensusGroupId, TDataNodeConfiguration> result =
+        GCR_ALLOCATOR.removeNodeReplicaSelect(
+            AVAILABLE_DATA_NODE_MAP,
+            FREE_SPACE_MAP,
+            allocateResult,
+            regionDatabaseMap,
+            databaseAllocatedRegionGroupMap,
+            remainReplicasMap);
+
+    for (TConsensusGroupId regionId : result.keySet()) {
+      TDataNodeConfiguration selectedNode = result.get(regionId);
+
+      LOGGER.info(
+          "GCR Selected DataNode {} for Region {}",
+          selectedNode.getLocation().getDataNodeId(),
+          regionId);
+      PGPSelectedNodeIds.add(selectedNode.getLocation().getDataNodeId());
+      PGPRegionCounter.put(
+          selectedNode.getLocation().getDataNodeId(),
+          PGPRegionCounter.get(selectedNode.getLocation().getDataNodeId()) + 
1);
+    }
+
+    LOGGER.info("randomRegionCount:");
+
+    for (Integer i : randomRegionCounter.keySet()) {
+      Integer value = randomRegionCounter.get(i);
+      randomMaxRegionCount = Math.max(randomMaxRegionCount, value);
+      randomMinRegionCount = Math.min(randomMinRegionCount, value);
+      LOGGER.info("{} : {}", i, value);
+    }
+
+    LOGGER.info("PGPRegionCount:");
+
+    for (Integer i : PGPRegionCounter.keySet()) {
+      Integer value = PGPRegionCounter.get(i);
+      PGPMaxRegionCount = Math.max(PGPMaxRegionCount, value);
+      PGPMinRegionCount = Math.min(PGPMinRegionCount, value);
+      LOGGER.info("{} : {}", i, value);
+    }
+
+    LOGGER.info("PGPSelectedNodeIds size: {}", PGPSelectedNodeIds.size());
+    Assert.assertEquals(TEST_DATA_NODE_NUM - 1, PGPSelectedNodeIds.size());
+    LOGGER.info("randomSelectedNodeIds size: {}", 
randomSelectedNodeIds.size());
+    Assert.assertTrue(PGPSelectedNodeIds.size() >= 
randomSelectedNodeIds.size());
+    LOGGER.info(
+        "randomMaxRegionCount: {}, PGPMaxRegionCount: {}", 
randomMaxRegionCount, PGPMaxRegionCount);
+    Assert.assertTrue(randomMaxRegionCount >= PGPMaxRegionCount);
+  }
+
+  @Test
+  public void testSelectDestNodeMultiDatabase() {
+    // Pre‑allocate RegionReplicaSets for multiple databases
+    final String[] DB_NAMES = {"db0", "db1", "db2"};
+    final int TOTAL_RG_NUM =
+        DATA_REGION_PER_DATA_NODE * TEST_DATA_NODE_NUM / 
DATA_REPLICATION_FACTOR;
+
+    int basePerDb = TOTAL_RG_NUM / DB_NAMES.length;
+    int remainder = TOTAL_RG_NUM % DB_NAMES.length; // first <remainder> DBs 
get one extra
+
+    Map<String, List<TRegionReplicaSet>> dbAllocatedMap = new HashMap<>();
+    List<TRegionReplicaSet> globalAllocatedList = new ArrayList<>();
+    int globalIndex = 0;
+
+    for (int dbIdx = 0; dbIdx < DB_NAMES.length; dbIdx++) {
+      String db = DB_NAMES[dbIdx];
+      int rgToCreate = basePerDb + (dbIdx < remainder ? 1 : 0);
+      List<TRegionReplicaSet> perDbList = new ArrayList<>();
+      dbAllocatedMap.put(db, perDbList);
+
+      for (int i = 0; i < rgToCreate; i++) {
+        TRegionReplicaSet rs =
+            GCR_ALLOCATOR.generateOptimalRegionReplicasDistribution(
+                AVAILABLE_DATA_NODE_MAP,
+                FREE_SPACE_MAP,
+                globalAllocatedList,
+                perDbList,
+                DATA_REPLICATION_FACTOR,
+                new TConsensusGroupId(TConsensusGroupType.DataRegion, 
globalIndex++));
+        globalAllocatedList.add(rs);
+        perDbList.add(rs);
+      }
+    }
+
+    // Identify the replica‑sets that contain the node to be removed
+    List<TRegionReplicaSet> impactedReplicas =
+        globalAllocatedList.stream()
+            .filter(rs -> 
rs.getDataNodeLocations().contains(REMOVE_DATANODE_LOCATION))
+            .collect(Collectors.toList());
+
+    // Simulate removing the faulty/offline node
+    AVAILABLE_DATA_NODE_MAP.remove(REMOVE_DATANODE_LOCATION.getDataNodeId());
+    FREE_SPACE_MAP.remove(REMOVE_DATANODE_LOCATION.getDataNodeId());
+
+    List<TRegionReplicaSet> remainReplicas = new ArrayList<>();
+    for (TRegionReplicaSet rs : impactedReplicas) {
+      globalAllocatedList.remove(rs);
+      rs.getDataNodeLocations().remove(REMOVE_DATANODE_LOCATION);
+      globalAllocatedList.add(rs);
+      remainReplicas.add(rs);
+    }
+
+    // Build helper maps for removeNodeReplicaSelect
+    Map<TConsensusGroupId, TRegionReplicaSet> remainMap = new HashMap<>();
+    remainReplicas.forEach(r -> remainMap.put(r.getRegionId(), r));
+
+    Map<TConsensusGroupId, String> regionDbMap = new HashMap<>();
+    dbAllocatedMap.forEach((db, list) -> list.forEach(r -> 
regionDbMap.put(r.getRegionId(), db)));
+
+    // Baseline: random selection for comparison
+    Map<Integer, Integer> rndCount = new HashMap<>();
+    Map<Integer, Integer> planCount = new HashMap<>();
+    Set<Integer> rndNodes = new HashSet<>();
+    Set<Integer> planNodes = new HashSet<>();
+    int rndMax = 0, rndMin = Integer.MAX_VALUE;
+    int planMax = 0, planMin = Integer.MAX_VALUE;
+
+    AVAILABLE_DATA_NODE_MAP
+        .keySet()
+        .forEach(
+            n -> {
+              rndCount.put(n, 0);
+              planCount.put(n, 0);
+            });
+
+    for (TRegionReplicaSet replicaSet : globalAllocatedList) {
+      for (TDataNodeLocation loc : replicaSet.getDataNodeLocations()) {
+        rndCount.merge(loc.getDataNodeId(), 1, Integer::sum);
+        planCount.merge(loc.getDataNodeId(), 1, Integer::sum);
+      }
+    }
+
+    for (TRegionReplicaSet r : remainReplicas) {
+      TDataNodeLocation pick = 
randomSelectNodeForRegion(r.getDataNodeLocations()).get();
+      LOGGER.info("Random Selected DataNode {} for Region {}", 
pick.getDataNodeId(), r.regionId);
+      rndNodes.add(pick.getDataNodeId());
+      rndCount.merge(pick.getDataNodeId(), 1, Integer::sum);
+    }
+
+    LOGGER.info("Remain Replicas... :");
+    for (TRegionReplicaSet remainReplicaSet : remainReplicas) {
+      LOGGER.info("Region Group Id: {}", remainReplicaSet.regionId.id);
+      List<TDataNodeLocation> dataNodeLocations = 
remainReplicaSet.getDataNodeLocations();
+      for (TDataNodeLocation dataNodeLocation : dataNodeLocations) {
+        LOGGER.info("DataNode: {}", dataNodeLocation.getDataNodeId());
+      }
+    }
+
+    // Call the method under test
+    Map<TConsensusGroupId, TDataNodeConfiguration> result =
+        GCR_ALLOCATOR.removeNodeReplicaSelect(
+            AVAILABLE_DATA_NODE_MAP,
+            FREE_SPACE_MAP,
+            globalAllocatedList,
+            regionDbMap,
+            dbAllocatedMap,
+            remainMap);
+
+    for (TConsensusGroupId regionId : result.keySet()) {
+      TDataNodeConfiguration selectedNode = result.get(regionId);
+
+      LOGGER.info(
+          "GCR Selected DataNode {} for Region {}",
+          selectedNode.getLocation().getDataNodeId(),
+          regionId);
+      planNodes.add(selectedNode.getLocation().getDataNodeId());
+      planCount.merge(selectedNode.getLocation().getDataNodeId(), 1, 
Integer::sum);
+    }
+
+    // Calculate load distribution
+    for (int c : rndCount.values()) {
+      rndMax = Math.max(rndMax, c);
+      rndMin = Math.min(rndMin, c);
+    }
+    for (int c : planCount.values()) {
+      planMax = Math.max(planMax, c);
+      planMin = Math.min(planMin, c);
+    }
+
+    // Assertions
+    Assert.assertEquals(TEST_DATA_NODE_NUM - 1, planNodes.size());
+    Assert.assertTrue(planNodes.size() >= rndNodes.size());
+    Assert.assertTrue(rndMax >= planMax);
+  }
+
+  private Optional<TDataNodeLocation> randomSelectNodeForRegion(
+      List<TDataNodeLocation> regionReplicaNodes) {
+    List<TDataNodeConfiguration> dataNodeConfigurations =
+        new ArrayList<>(AVAILABLE_DATA_NODE_MAP.values());
+    // Randomly selected to ensure a basic load balancing
+    Collections.shuffle(dataNodeConfigurations);
+    return dataNodeConfigurations.stream()
+        .map(TDataNodeConfiguration::getLocation)
+        .filter(e -> !regionReplicaNodes.contains(e))
+        .findAny();
+  }
+}

Reply via email to