This is an automated email from the ASF dual-hosted git repository.
yongzao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7650b479347 [remove datanode] GCR load balancing implement for
removing datanode (#15282)
7650b479347 is described below
commit 7650b4793474b11405698bbe9290f814ef3afc06
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Sun Apr 27 16:46:27 2025 +0800
[remove datanode] GCR load balancing implement for removing datanode
(#15282)
---
.../region/GreedyCopySetRegionGroupAllocator.java | 358 ++++++++++++++++++++-
.../region/GreedyRegionGroupAllocator.java | 13 +
.../balancer/region/IRegionGroupAllocator.java | 21 ++
.../PartiteGraphPlacementRegionGroupAllocator.java | 13 +
.../procedure/env/RemoveDataNodeHandler.java | 186 +++++++++++
.../impl/node/RemoveDataNodesProcedure.java | 6 +-
.../GreedyCopySetRemoveNodeReplicaSelectTest.java | 342 ++++++++++++++++++++
7 files changed, 925 insertions(+), 14 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
index 2577455d41c..5e360230536 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
@@ -27,10 +27,13 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.Set;
import java.util.stream.Collectors;
import static java.util.Map.Entry.comparingByValue;
@@ -50,6 +53,8 @@ public class GreedyCopySetRegionGroupAllocator implements
IRegionGroupAllocator
private int[] databaseRegionCounter;
// The number of 2-Region combinations in current cluster
private int[][] combinationCounter;
+ // The initial load for each database on each datanode
+ private Map<String, int[]> initialDbLoad;
// First Key: the sum of Regions at the DataNodes in the allocation result
is minimal
int optimalRegionSum;
@@ -103,12 +108,9 @@ public class GreedyCopySetRegionGroupAllocator implements
IRegionGroupAllocator
int replicationFactor,
TConsensusGroupId consensusGroupId) {
try {
- prepare(
- replicationFactor,
- availableDataNodeMap,
- allocatedRegionGroups,
- databaseAllocatedRegionGroups);
- dfs(-1, 0, new int[replicationFactor], 0, 0);
+ this.replicationFactor = replicationFactor;
+ prepare(availableDataNodeMap, allocatedRegionGroups,
databaseAllocatedRegionGroups);
+ dfsAllocateReplica(-1, 0, new int[replicationFactor], 0, 0);
// Randomly pick one optimal plan as result
Collections.shuffle(optimalReplicaSets);
@@ -125,21 +127,355 @@ public class GreedyCopySetRegionGroupAllocator
implements IRegionGroupAllocator
}
}
+ @Override
+ public Map<TConsensusGroupId, TDataNodeConfiguration>
removeNodeReplicaSelect(
+ Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
+ Map<Integer, Double> freeDiskSpaceMap,
+ List<TRegionReplicaSet> allocatedRegionGroups,
+ Map<TConsensusGroupId, String> regionDatabaseMap,
+ Map<String, List<TRegionReplicaSet>> databaseAllocatedRegionGroupMap,
+ Map<TConsensusGroupId, TRegionReplicaSet> remainReplicasMap) {
+ try {
+ // 1. prepare: compute regionCounter, databaseRegionCounter, and
combinationCounter
+
+ List<TRegionReplicaSet> databaseAllocatedRegionGroups =
+ new ArrayList<>(databaseAllocatedRegionGroupMap.values()).get(0);
+ prepare(availableDataNodeMap, allocatedRegionGroups,
databaseAllocatedRegionGroups);
+ computeInitialDbLoad(databaseAllocatedRegionGroupMap);
+
+ // 2. Build allowed candidate set for each region that needs to be
migrated.
+ // For each region in remainReplicasMap, the candidate destination nodes
are all nodes in
+ // availableDataNodeMap
+ // excluding those already in the remain replica set.
+ List<TConsensusGroupId> regionKeys = new
ArrayList<>(remainReplicasMap.keySet());
+ Map<TConsensusGroupId, List<Integer>> allowedCandidatesMap = new
HashMap<>();
+ for (TConsensusGroupId regionId : regionKeys) {
+ TRegionReplicaSet remainReplicaSet = remainReplicasMap.get(regionId);
+ Set<Integer> notAllowedNodes = new HashSet<>();
+
+ // Exclude nodes already present in the remain replica set
+ for (TDataNodeLocation location :
remainReplicaSet.getDataNodeLocations()) {
+ notAllowedNodes.add(location.getDataNodeId());
+ }
+
+ // Allowed candidates are the nodes not in the exclusion set
+ List<Integer> candidates =
+ availableDataNodeMap.keySet().stream()
+ .filter(nodeId -> !notAllowedNodes.contains(nodeId))
+ .sorted(
+ (a, b) -> {
+ int cmp = Integer.compare(regionCounter[a],
regionCounter[b]);
+ if (cmp == 0) {
+ cmp = Integer.compare(databaseRegionCounter[a],
databaseRegionCounter[b]);
+ }
+ return cmp;
+ })
+ .collect(Collectors.toList());
+
+ // Sort candidates in ascending order of current global load
(regionCounter)
+ allowedCandidatesMap.put(regionId, candidates);
+ }
+
+ // Optionally, sort regionKeys by the size of its candidate list
(smaller candidate sets
+ // first)
+ regionKeys.sort(Comparator.comparingInt(id ->
allowedCandidatesMap.get(id).size()));
+
+ int n = regionKeys.size();
+ // Each element holds the candidate nodeId chosen for the region at that
index
+ int[] currentAssignment = new int[n];
+ // additionalLoad holds the number of extra regions assigned to each
node in this migration
+ // solution.
+ int[] additionalLoad = new int[regionCounter.length];
+
+ // 3. Create a buffer for candidate solutions
+ List<int[]> optimalAssignments = new ArrayList<>();
+ // bestMetrics holds the best found metrics: [maxGlobalLoad,
maxDatabaseLoad, scatterValue].
+ // Initialize to high values.
+ int[] bestMetrics = new int[] {Integer.MAX_VALUE, Integer.MAX_VALUE,
Integer.MAX_VALUE};
+
+ dfsRemoveNodeReplica(
+ 0,
+ regionKeys,
+ allowedCandidatesMap,
+ currentAssignment,
+ additionalLoad,
+ optimalAssignments,
+ bestMetrics,
+ remainReplicasMap,
+ regionDatabaseMap);
+
+ // 4. Randomly select one solution from the candidate buffer
+ if (optimalAssignments.isEmpty()) {
+ // This should not happen if there is at least one valid assignment
+ return Collections.emptyMap();
+ }
+ Collections.shuffle(optimalAssignments);
+ int[] bestAssignment = optimalAssignments.get(0);
+
+ // 5. Build and return the result mapping: region -> chosen destination
TDataNodeConfiguration
+ Map<TConsensusGroupId, TDataNodeConfiguration> result = new HashMap<>();
+ for (int i = 0; i < n; i++) {
+ TConsensusGroupId regionId = regionKeys.get(i);
+ int chosenNodeId = bestAssignment[i];
+ result.put(regionId, availableDataNodeMap.get(chosenNodeId));
+ }
+ return result;
+ } finally {
+ // Clear any temporary state to avoid impacting subsequent calls
+ clear();
+ }
+ }
+
+ /**
+ * DFS method that searches for migration target assignments.
+ *
+ * <p>It enumerates all possible assignments (one candidate for each region)
and collects
+ * candidate solutions in the optimalAssignments buffer. The evaluation
metrics for each complete
+ * assignment (i.e. when index == regionKeys.size()) are:
+ *
+ * <p>1. Max global load: the maximum over nodes of (regionCounter[node] +
additionalLoad[node])
+ * 2. Max database load: the maximum over nodes of
(databaseRegionCounter[node] +
+ * additionalLoad[node]) 3. Scatter value: computed per region, summing the
combinationCounter for
+ * every pair in the complete replica set. The complete replica set for a
region includes nodes in
+ * its remain replica set plus the newly assigned node.
+ *
+ * <p>The candidates are compared lexicographically (first by global load,
then by database load,
+ * then by scatter). When a better candidate is found, the
optimalAssignments buffer is cleared
+ * and updated; if the new candidate matches the best found metrics, it is
added to the buffer.
+ *
+ * <p>DFS search is pruned if the optimalAssignments buffer reaches CAPACITY.
+ *
+ * @param index Current DFS level, corresponding to regionKeys.get(index)
+ * @param regionKeys A list of regions that need to be migrated.
+ * @param allowedCandidatesMap For each region, the allowed candidate
destination node IDs.
+ * @param currentAssignment Current partial assignment; its length equals
the number of regions.
+ * @param additionalLoad Extra load currently assigned to each node.
+ * @param optimalAssignments Buffer holding candidate assignment arrays.
+ * @param bestMetrics An int array holding the best metrics found so far:
[maxGlobalLoad,
+ * maxDatabaseLoad, scatterValue].
+ * @param remainReplicasMap Mapping from region to its current remain
replica set.
+ */
+ private void dfsRemoveNodeReplica(
+ int index,
+ List<TConsensusGroupId> regionKeys,
+ Map<TConsensusGroupId, List<Integer>> allowedCandidatesMap,
+ int[] currentAssignment,
+ int[] additionalLoad,
+ List<int[]> optimalAssignments,
+ int[] bestMetrics,
+ Map<TConsensusGroupId, TRegionReplicaSet> remainReplicasMap,
+ Map<TConsensusGroupId, String> regionDatabaseMap) {
+ int n = regionKeys.size();
+ if (index == n) {
+ // A complete assignment has been generated.
+ // Compute metrics for this complete migration assignment.
+
+ // Compute the scatter value for the complete assignment.
+ int currentScatter = 0;
+ // For each region, calculate the scatter based on the
combinationCounter among all nodes
+ // in the full replica set (which includes the nodes in the remain
replica and the new
+ // candidate).
+ for (int r = 0; r < n; r++) {
+ TConsensusGroupId regionId = regionKeys.get(r);
+ for (TDataNodeLocation location :
remainReplicasMap.get(regionId).getDataNodeLocations()) {
+ int nodeA = currentAssignment[r];
+ int nodeB = location.getDataNodeId();
+ currentScatter += combinationCounter[nodeA][nodeB];
+ }
+ }
+
+ // Compute the maximum global load and maximum database load among all
nodes that received
+ // additional load.
+ int[] currentMetrics =
+ getCurrentMetrics(
+ additionalLoad, currentScatter, regionKeys, regionDatabaseMap,
currentAssignment);
+
+ // Lexicographically compare currentMetrics with bestMetrics.
+ // If currentMetrics is better than bestMetrics, update bestMetrics and
clear the candidate
+ // buffer.
+ boolean isBetter = false;
+ boolean isEqual = true;
+ for (int i = 0; i < 3; i++) {
+ if (currentMetrics[i] < bestMetrics[i]) {
+ isBetter = true;
+ isEqual = false;
+ break;
+ } else if (currentMetrics[i] > bestMetrics[i]) {
+ isEqual = false;
+ break;
+ }
+ }
+ if (isBetter) {
+ bestMetrics[0] = currentMetrics[0];
+ bestMetrics[1] = currentMetrics[1];
+ bestMetrics[2] = currentMetrics[2];
+ optimalAssignments.clear();
+ optimalAssignments.add(Arrays.copyOf(currentAssignment, n));
+ } else if (isEqual) {
+ optimalAssignments.add(Arrays.copyOf(currentAssignment, n));
+ // Prune search if we already have enough candidate solutions
+ if (optimalAssignments.size() >= GCR_MAX_OPTIMAL_PLAN_NUM) {
+ return;
+ }
+ }
+ return;
+ }
+
+ // Process the region at the current index.
+ TConsensusGroupId regionId = regionKeys.get(index);
+ List<Integer> candidates = allowedCandidatesMap.get(regionId);
+ for (Integer candidate : candidates) {
+ currentAssignment[index] = candidate;
+ additionalLoad[candidate]++;
+ dfsRemoveNodeReplica(
+ index + 1,
+ regionKeys,
+ allowedCandidatesMap,
+ currentAssignment,
+ additionalLoad,
+ optimalAssignments,
+ bestMetrics,
+ remainReplicasMap,
+ regionDatabaseMap);
+ // Backtrack
+ additionalLoad[candidate]--;
+ }
+ }
+
+ /**
+ * Computes the squared sum of the maximum load for each database.
+ *
+ * <p>For each database, this method calculates the maximum load on any data
node by summing the
+ * initial load (from {@code initialDbLoad}) with the additional load
assigned during migration
+ * (accumulated in {@code currentAssignment}), and then squares this maximum
load. Finally, it
+ * returns the sum of these squared maximum loads across all databases.
+ *
+ * @param currentAssignment an array where each element is the nodeId
assigned for the
+ * corresponding region in {@code regionKeys}.
+ * @param regionKeys a list of region identifiers (TConsensusGroupId)
representing the regions
+ * under migration.
+ * @param regionDatabaseMap a mapping from each region identifier to its
corresponding database
+ * name.
+ * @return the sum of the squares of the maximum loads computed for each
database.
+ */
+ private int computeDatabaseLoadSquaredSum(
+ int[] currentAssignment,
+ List<TConsensusGroupId> regionKeys,
+ Map<TConsensusGroupId, String> regionDatabaseMap) {
+ Map<String, int[]> extraLoadPerDb = new HashMap<>();
+ // Initialize extra load counters for each database using the number of
nodes from
+ // regionCounter.
+ for (String db : initialDbLoad.keySet()) {
+ extraLoadPerDb.put(db, new int[regionCounter.length]);
+ }
+ // Accumulate extra load per database based on the current assignment.
+ for (int i = 0; i < regionKeys.size(); i++) {
+ TConsensusGroupId regionId = regionKeys.get(i);
+ String db = regionDatabaseMap.get(regionId);
+ int nodeId = currentAssignment[i];
+ extraLoadPerDb.get(db)[nodeId]++;
+ }
+ int sumSquared = 0;
+ // For each database, compute the maximum load across nodes and add its
square to the sum.
+ for (String db : initialDbLoad.keySet()) {
+ int[] initLoads = initialDbLoad.get(db);
+ int[] extras = extraLoadPerDb.get(db);
+ int maxLoad = 0;
+ for (int nodeId = 0; nodeId < regionCounter.length; nodeId++) {
+ int load = initLoads[nodeId] + extras[nodeId];
+ if (load > maxLoad) {
+ maxLoad = load;
+ }
+ }
+ sumSquared += maxLoad * maxLoad;
+ }
+ return sumSquared;
+ }
+
+ /**
+ * Computes the current migration metrics.
+ *
+ * <p>This method calculates three key metrics:
+ *
+ * <ol>
+ * <li><strong>Max Global Load:</strong> The maximum load among all nodes,
computed as the sum
+ * of the initial region load (from {@code regionCounter}) and the
additional load (from
+ * {@code additionalLoad}).
+ * <li><strong>Database Load Squared Sum:</strong> The squared sum of the
maximum load per
+ * database, which is computed by {@link
#computeDatabaseLoadSquaredSum(int[], List, Map)}.
+ * <li><strong>Scatter Value:</strong> A provided metric that reflects
additional balancing
+ * criteria.
+ * </ol>
+ *
+ * The metrics are returned as an array of three integers in the order:
[maxGlobalLoad,
+ * databaseLoadSquaredSum, scatterValue].
+ *
+ * @param additionalLoad an array representing the additional load assigned
to each node during
+ * migration.
+ * @param currentScatter the current scatter value metric.
+ * @param regionKeys a list of region identifiers (TConsensusGroupId) for
which migration is being
+ * computed.
+ * @param regionDatabaseMap a mapping from each region identifier to its
corresponding database
+ * name.
+ * @param currentAssignment an array where each element is the nodeId
assigned for the
+ * corresponding region in {@code regionKeys}.
+ * @return an integer array of size 3: [maxGlobalLoad,
databaseLoadSquaredSum, scatterValue].
+ */
+ private int[] getCurrentMetrics(
+ int[] additionalLoad,
+ int currentScatter,
+ List<TConsensusGroupId> regionKeys,
+ Map<TConsensusGroupId, String> regionDatabaseMap,
+ int[] currentAssignment) {
+ int currentMaxGlobalLoad = 0;
+ // Calculate the maximum global load across all data nodes.
+ for (int nodeId = 0; nodeId < additionalLoad.length; nodeId++) {
+ int globalLoad = regionCounter[nodeId] + additionalLoad[nodeId];
+ currentMaxGlobalLoad = Math.max(currentMaxGlobalLoad, globalLoad);
+ }
+ // Compute the database load squared sum using the helper method.
+ int dbLoadSquaredSum =
+ computeDatabaseLoadSquaredSum(currentAssignment, regionKeys,
regionDatabaseMap);
+ // Build current metrics in order [maxGlobalLoad, maxDatabaseLoad,
scatterValue]
+ return new int[] {currentMaxGlobalLoad, dbLoadSquaredSum, currentScatter};
+ }
+
+ /**
+ * Compute the initial load for each database on each data node.
+ *
+ * @param databaseAllocatedRegionGroupMap Mapping of each database to its
list of replica sets.
+ */
+ private void computeInitialDbLoad(
+ Map<String, List<TRegionReplicaSet>> databaseAllocatedRegionGroupMap) {
+ initialDbLoad = new HashMap<>();
+
+ // Iterate over each database and count the number of regions on each data
node across all its
+ // replica sets.
+ for (String database : databaseAllocatedRegionGroupMap.keySet()) {
+ List<TRegionReplicaSet> replicaSets =
databaseAllocatedRegionGroupMap.get(database);
+ int[] load = new int[regionCounter.length];
+ for (TRegionReplicaSet replicaSet : replicaSets) {
+ for (TDataNodeLocation location : replicaSet.getDataNodeLocations()) {
+ int nodeId = location.getDataNodeId();
+ load[nodeId]++;
+ }
+ }
+ initialDbLoad.put(database, load);
+ }
+ }
+
/**
* Prepare some statistics before dfs.
*
- * @param replicationFactor replication factor in the cluster
* @param availableDataNodeMap currently available DataNodes, ensure size()
>= replicationFactor
* @param allocatedRegionGroups already allocated RegionGroups in the cluster
* @param databaseAllocatedRegionGroups already allocated RegionGroups in
the same Database
*/
private void prepare(
- int replicationFactor,
Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
List<TRegionReplicaSet> allocatedRegionGroups,
List<TRegionReplicaSet> databaseAllocatedRegionGroups) {
- this.replicationFactor = replicationFactor;
// Store the maximum DataNodeId
int maxDataNodeId =
Math.max(
@@ -225,7 +561,7 @@ public class GreedyCopySetRegionGroupAllocator implements
IRegionGroupAllocator
* current allocation plan
* @param regionSum the sum of Regions at the DataNodes in the current
allocation plan
*/
- private void dfs(
+ private void dfsAllocateReplica(
int lastIndex,
int currentReplica,
int[] currentReplicaSet,
@@ -274,7 +610,7 @@ public class GreedyCopySetRegionGroupAllocator implements
IRegionGroupAllocator
for (int i = lastIndex + 1; i < dataNodeIds.length; i++) {
// Decide the next DataNodeId in the allocation plan
currentReplicaSet[currentReplica] = dataNodeIds[i];
- dfs(
+ dfsAllocateReplica(
i,
currentReplica + 1,
currentReplicaSet,
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java
index 6535b426928..31f33143ede 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java
@@ -82,6 +82,19 @@ public class GreedyRegionGroupAllocator implements
IRegionGroupAllocator {
weightList.stream().limit(replicationFactor).collect(Collectors.toList()));
}
+ @Override
+ public Map<TConsensusGroupId, TDataNodeConfiguration>
removeNodeReplicaSelect(
+ Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
+ Map<Integer, Double> freeDiskSpaceMap,
+ List<TRegionReplicaSet> allocatedRegionGroups,
+ Map<TConsensusGroupId, String> regionDatabaseMap,
+ Map<String, List<TRegionReplicaSet>> databaseAllocatedRegionGroupMap,
+ Map<TConsensusGroupId, TRegionReplicaSet> remainReplicasMap) {
+ // TODO: Implement this method
+ throw new UnsupportedOperationException(
+ "The removeNodeReplicaSelect method of GreedyRegionGroupAllocator is
yet to be implemented.");
+ }
+
private List<TDataNodeLocation> buildWeightList(
Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
Map<Integer, Double> freeDiskSpaceMap,
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/IRegionGroupAllocator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/IRegionGroupAllocator.java
index 554168d8497..24200548163 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/IRegionGroupAllocator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/IRegionGroupAllocator.java
@@ -47,4 +47,25 @@ public interface IRegionGroupAllocator {
List<TRegionReplicaSet> databaseAllocatedRegionGroups,
int replicationFactor,
TConsensusGroupId consensusGroupId);
+
+ /**
+ * Select the optimal DataNode to place the new replica on along with the
remaining replica set.
+ *
+ * @param availableDataNodeMap DataNodes that can be used for allocation
+ * @param freeDiskSpaceMap The free disk space of the DataNodes
+ * @param allocatedRegionGroups Allocated RegionGroups
+ * @param regionDatabaseMap A mapping from each region identifier to its
corresponding database
+ * name
+ * @param databaseAllocatedRegionGroupMap Allocated RegionGroups within the
same Database with the
+ * replica set
+ * @param remainReplicasMap the remaining replica set excluding the removed
DataNodes
+ * @return The optimal DataNode to place the new replica on along with the
remaining replicas
+ */
+ Map<TConsensusGroupId, TDataNodeConfiguration> removeNodeReplicaSelect(
+ Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
+ Map<Integer, Double> freeDiskSpaceMap,
+ List<TRegionReplicaSet> allocatedRegionGroups,
+ Map<TConsensusGroupId, String> regionDatabaseMap,
+ Map<String, List<TRegionReplicaSet>> databaseAllocatedRegionGroupMap,
+ Map<TConsensusGroupId, TRegionReplicaSet> remainReplicasMap);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphPlacementRegionGroupAllocator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphPlacementRegionGroupAllocator.java
index 77e9f15844e..6a997522783 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphPlacementRegionGroupAllocator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphPlacementRegionGroupAllocator.java
@@ -114,6 +114,19 @@ public class PartiteGraphPlacementRegionGroupAllocator
implements IRegionGroupAl
return result;
}
+ @Override
+ public Map<TConsensusGroupId, TDataNodeConfiguration>
removeNodeReplicaSelect(
+ Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
+ Map<Integer, Double> freeDiskSpaceMap,
+ List<TRegionReplicaSet> allocatedRegionGroups,
+ Map<TConsensusGroupId, String> regionDatabaseMap,
+ Map<String, List<TRegionReplicaSet>> databaseAllocatedRegionGroupMap,
+ Map<TConsensusGroupId, TRegionReplicaSet> remainReplicasMap) {
+ // TODO: Implement this method
+ throw new UnsupportedOperationException(
+ "The removeNodeReplicaSelect method of
PartiteGraphPlacementRegionGroupAllocator is yet to be implemented.");
+ }
+
private void prepare(
int replicationFactor,
Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeHandler.java
index eaa16f47907..a60d732c4d9 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeHandler.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.confignode.procedure.env;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
@@ -37,6 +38,8 @@ import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import
org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
import
org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
import org.apache.iotdb.confignode.manager.ConfigManager;
+import
org.apache.iotdb.confignode.manager.load.balancer.region.GreedyRegionGroupAllocator;
+import
org.apache.iotdb.confignode.manager.load.balancer.region.IRegionGroupAllocator;
import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
import
org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
@@ -51,10 +54,13 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import java.util.function.Function;
import java.util.stream.Collectors;
import static
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
@@ -70,8 +76,22 @@ public class RemoveDataNodeHandler {
private final ConfigManager configManager;
+ private final IRegionGroupAllocator regionGroupAllocator;
+
public RemoveDataNodeHandler(ConfigManager configManager) {
this.configManager = configManager;
+
+ switch
(ConfigNodeDescriptor.getInstance().getConf().getRegionGroupAllocatePolicy()) {
+ case GREEDY:
+ this.regionGroupAllocator = new GreedyRegionGroupAllocator();
+ break;
+ case PGR:
+ this.regionGroupAllocator = new GreedyRegionGroupAllocator();
+ break;
+ case GCR:
+ default:
+ this.regionGroupAllocator = new GreedyRegionGroupAllocator();
+ }
}
/**
@@ -193,6 +213,172 @@ public class RemoveDataNodeHandler {
return regionMigrationPlans;
}
+ /**
+ * Retrieves all region migration plans for the specified removed DataNodes
and selects the
+ * destination.
+ *
+ * @param removedDataNodes the list of DataNodes from which to obtain
migration plans
+ * @return a list of region migration plans associated with the removed
DataNodes
+ */
+ public List<RegionMigrationPlan> selectedRegionMigrationPlans(
+ List<TDataNodeLocation> removedDataNodes) {
+
+ Set<Integer> removedDataNodesSet = new HashSet<>();
+ for (TDataNodeLocation removedDataNode : removedDataNodes) {
+ removedDataNodesSet.add(removedDataNode.dataNodeId);
+ }
+
+ final List<TDataNodeConfiguration> availableDataNodes =
+ configManager
+ .getNodeManager()
+ .filterDataNodeThroughStatus(NodeStatus.Running,
NodeStatus.Unknown)
+ .stream()
+ .filter(node ->
!removedDataNodesSet.contains(node.getLocation().getDataNodeId()))
+ .collect(Collectors.toList());
+
+ List<RegionMigrationPlan> regionMigrationPlans = new ArrayList<>();
+
+ regionMigrationPlans.addAll(
+ selectMigrationPlans(availableDataNodes,
TConsensusGroupType.DataRegion, removedDataNodes));
+
+ regionMigrationPlans.addAll(
+ selectMigrationPlans(
+ availableDataNodes, TConsensusGroupType.SchemaRegion,
removedDataNodes));
+
+ return regionMigrationPlans;
+ }
+
+ public List<RegionMigrationPlan> selectMigrationPlans(
+ List<TDataNodeConfiguration> availableDataNodes,
+ TConsensusGroupType consensusGroupType,
+ List<TDataNodeLocation> removedDataNodes) {
+
+ // Retrieve all allocated replica sets for the given consensus group type
+ List<TRegionReplicaSet> allocatedReplicaSets =
+
configManager.getPartitionManager().getAllReplicaSets(consensusGroupType);
+
+ // Step 1: Identify affected replica sets and record the removed DataNode
for each replica set
+ Map<TConsensusGroupId, TDataNodeLocation> removedNodeMap = new HashMap<>();
+ Set<TRegionReplicaSet> affectedReplicaSets =
+ identifyAffectedReplicaSets(allocatedReplicaSets, removedDataNodes,
removedNodeMap);
+
+ // Step 2: Update affected replica sets by removing the removed DataNode
+ updateReplicaSets(allocatedReplicaSets, affectedReplicaSets,
removedNodeMap);
+
+ // Build a mapping of available DataNodes and their free disk space
(computed only once)
+ Map<Integer, TDataNodeConfiguration> availableDataNodeMap =
+ buildAvailableDataNodeMap(availableDataNodes);
+ Map<Integer, Double> freeDiskSpaceMap =
buildFreeDiskSpaceMap(availableDataNodes);
+
+ // Step 3: For each affected replica set, select a new destination
DataNode and create a
+ // migration plan
+ List<RegionMigrationPlan> migrationPlans = new ArrayList<>();
+
+ Map<TConsensusGroupId, TRegionReplicaSet> remainReplicasMap = new
HashMap<>();
+ Map<TConsensusGroupId, String> regionDatabaseMap = new HashMap<>();
+ Map<String, List<TRegionReplicaSet>> databaseAllocatedRegionGroupMap = new
HashMap<>();
+
+ for (TRegionReplicaSet replicaSet : affectedReplicaSets) {
+ remainReplicasMap.put(replicaSet.getRegionId(), replicaSet);
+ String database =
+
configManager.getPartitionManager().getRegionDatabase(replicaSet.getRegionId());
+ List<TRegionReplicaSet> databaseAllocatedReplicaSets =
+ configManager.getPartitionManager().getAllReplicaSets(database,
consensusGroupType);
+ regionDatabaseMap.put(replicaSet.getRegionId(), database);
+ databaseAllocatedRegionGroupMap.put(database,
databaseAllocatedReplicaSets);
+ }
+
+ Map<TConsensusGroupId, TDataNodeConfiguration> result =
+ regionGroupAllocator.removeNodeReplicaSelect(
+ availableDataNodeMap,
+ freeDiskSpaceMap,
+ allocatedReplicaSets,
+ regionDatabaseMap,
+ databaseAllocatedRegionGroupMap,
+ remainReplicasMap);
+
+ for (TConsensusGroupId regionId : result.keySet()) {
+
+ TDataNodeConfiguration selectedNode = result.get(regionId);
+ LOGGER.info(
+ "Selected DataNode {} for Region {}",
+ selectedNode.getLocation().getDataNodeId(),
+ regionId);
+
+ // Create the migration plan
+ RegionMigrationPlan plan = RegionMigrationPlan.create(regionId,
removedNodeMap.get(regionId));
+ plan.setToDataNode(selectedNode.getLocation());
+ migrationPlans.add(plan);
+ }
+ return migrationPlans;
+ }
+
+ /**
+ * Identifies affected replica sets from allocatedReplicaSets that contain
any DataNode in
+ * removedDataNodes, and records the removed DataNode for each replica set.
+ */
+ private Set<TRegionReplicaSet> identifyAffectedReplicaSets(
+ List<TRegionReplicaSet> allocatedReplicaSets,
+ List<TDataNodeLocation> removedDataNodes,
+ Map<TConsensusGroupId, TDataNodeLocation> removedNodeMap) {
+
+ Set<TRegionReplicaSet> affectedReplicaSets = new HashSet<>();
+ // Create a copy of allocatedReplicaSets to avoid concurrent modifications
+ List<TRegionReplicaSet> allocatedCopy = new
ArrayList<>(allocatedReplicaSets);
+
+ for (TDataNodeLocation removedNode : removedDataNodes) {
+ allocatedCopy.stream()
+ .filter(replicaSet ->
replicaSet.getDataNodeLocations().contains(removedNode))
+ .forEach(
+ replicaSet -> {
+ removedNodeMap.put(replicaSet.getRegionId(), removedNode);
+ affectedReplicaSets.add(replicaSet);
+ });
+ }
+ return affectedReplicaSets;
+ }
+
+ /**
+ * Updates each affected replica set by removing the removed DataNode from
its list. The
+ * allocatedReplicaSets list is updated accordingly.
+ */
+ private void updateReplicaSets(
+ List<TRegionReplicaSet> allocatedReplicaSets,
+ Set<TRegionReplicaSet> affectedReplicaSets,
+ Map<TConsensusGroupId, TDataNodeLocation> removedNodeMap) {
+ for (TRegionReplicaSet replicaSet : affectedReplicaSets) {
+ // Remove the replica set, update its node list, then re-add it
+ allocatedReplicaSets.remove(replicaSet);
+
replicaSet.getDataNodeLocations().remove(removedNodeMap.get(replicaSet.getRegionId()));
+ allocatedReplicaSets.add(replicaSet);
+ }
+ }
+
+ /**
+ * Constructs a mapping from DataNodeId to TDataNodeConfiguration from the
available DataNodes.
+ */
+ private Map<Integer, TDataNodeConfiguration> buildAvailableDataNodeMap(
+ List<TDataNodeConfiguration> availableDataNodes) {
+ return availableDataNodes.stream()
+ .collect(
+ Collectors.toMap(
+ dataNode -> dataNode.getLocation().getDataNodeId(),
Function.identity()));
+ }
+
+ /** Constructs a mapping of free disk space for each DataNode. */
+ private Map<Integer, Double> buildFreeDiskSpaceMap(
+ List<TDataNodeConfiguration> availableDataNodes) {
+ Map<Integer, Double> freeDiskSpaceMap = new
HashMap<>(availableDataNodes.size());
+ availableDataNodes.forEach(
+ dataNode ->
+ freeDiskSpaceMap.put(
+ dataNode.getLocation().getDataNodeId(),
+ configManager
+ .getLoadManager()
+
.getFreeDiskSpace(dataNode.getLocation().getDataNodeId())));
+ return freeDiskSpaceMap;
+ }
+
/**
* Broadcasts DataNodes' status change, preventing disabled DataNodes from
accepting read or write
* requests.
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodesProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodesProcedure.java
index 1340d968a7e..d34e27a2321 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodesProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodesProcedure.java
@@ -121,7 +121,8 @@ public class RemoveDataNodesProcedure extends
AbstractNodeProcedure<RemoveDataNo
removedDataNodes.forEach(
dataNode -> removedNodeStatusMap.put(dataNode.getDataNodeId(),
NodeStatus.Removing));
removeDataNodeHandler.changeDataNodeStatus(removedDataNodes,
removedNodeStatusMap);
- regionMigrationPlans =
removeDataNodeHandler.getRegionMigrationPlans(removedDataNodes);
+ regionMigrationPlans =
+
removeDataNodeHandler.selectedRegionMigrationPlans(removedDataNodes);
LOG.info(
"{}, DataNode regions to be removed is {}",
REMOVE_DATANODE_PROCESS,
@@ -165,8 +166,7 @@ public class RemoveDataNodesProcedure extends
AbstractNodeProcedure<RemoveDataNo
regionMigrationPlan -> {
TConsensusGroupId regionId = regionMigrationPlan.getRegionId();
TDataNodeLocation removedDataNode =
regionMigrationPlan.getFromDataNode();
- TDataNodeLocation destDataNode =
- env.getRegionMaintainHandler().findDestDataNode(regionId);
+ TDataNodeLocation destDataNode = regionMigrationPlan.getToDataNode();
// TODO: need to improve the coordinator selection method here,
maybe through load
// balancing and other means.
final TDataNodeLocation coordinatorForAddPeer =
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRemoveNodeReplicaSelectTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRemoveNodeReplicaSelectTest.java
new file mode 100644
index 00000000000..557fba8ba7a
--- /dev/null
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRemoveNodeReplicaSelectTest.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.balancer.region;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class GreedyCopySetRemoveNodeReplicaSelectTest {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(GreedyCopySetRemoveNodeReplicaSelectTest.class);
+
+ private static final IRegionGroupAllocator GCR_ALLOCATOR =
+ new GreedyCopySetRegionGroupAllocator();
+
+ private static final TDataNodeLocation REMOVE_DATANODE_LOCATION =
+ new TDataNodeLocation().setDataNodeId(5);
+
+ private static final int TEST_DATA_NODE_NUM = 5;
+
+ private static final int DATA_REGION_PER_DATA_NODE = 4;
+
+ private static final int DATA_REPLICATION_FACTOR = 2;
+
+ private static final Map<Integer, TDataNodeConfiguration>
AVAILABLE_DATA_NODE_MAP =
+ new HashMap<>();
+
+ private static final Map<Integer, Double> FREE_SPACE_MAP = new HashMap<>();
+
+ @Before
+ public void setUp() {
+ // Construct TEST_DATA_NODE_NUM DataNodes
+ AVAILABLE_DATA_NODE_MAP.clear();
+ FREE_SPACE_MAP.clear();
+ for (int i = 1; i <= TEST_DATA_NODE_NUM; i++) {
+ AVAILABLE_DATA_NODE_MAP.put(
+ i, new TDataNodeConfiguration().setLocation(new
TDataNodeLocation().setDataNodeId(i)));
+ FREE_SPACE_MAP.put(i, Math.random());
+ }
+ }
+
+ @Test
+ public void testSelectDestNode() {
+ final int dataRegionGroupNum =
+ DATA_REGION_PER_DATA_NODE * TEST_DATA_NODE_NUM /
DATA_REPLICATION_FACTOR;
+
+ List<TRegionReplicaSet> allocateResult = new ArrayList<>();
+ for (int index = 0; index < dataRegionGroupNum; index++) {
+ allocateResult.add(
+ GCR_ALLOCATOR.generateOptimalRegionReplicasDistribution(
+ AVAILABLE_DATA_NODE_MAP,
+ FREE_SPACE_MAP,
+ allocateResult,
+ allocateResult,
+ DATA_REPLICATION_FACTOR,
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, index)));
+ }
+
+ List<TRegionReplicaSet> migratedReplicas =
+ allocateResult.stream()
+ .filter(
+ replicaSet ->
replicaSet.getDataNodeLocations().contains(REMOVE_DATANODE_LOCATION))
+ .collect(Collectors.toList());
+
+ AVAILABLE_DATA_NODE_MAP.remove(REMOVE_DATANODE_LOCATION.getDataNodeId());
+ FREE_SPACE_MAP.remove(REMOVE_DATANODE_LOCATION.getDataNodeId());
+
+ List<TRegionReplicaSet> remainReplicas = new ArrayList<>();
+ for (TRegionReplicaSet replicaSet : migratedReplicas) {
+ List<TDataNodeLocation> dataNodeLocations =
replicaSet.getDataNodeLocations();
+ allocateResult.remove(replicaSet);
+ dataNodeLocations.remove(REMOVE_DATANODE_LOCATION);
+ allocateResult.add(replicaSet);
+ remainReplicas.add(replicaSet);
+ }
+
+ Map<Integer, Integer> randomRegionCounter = new HashMap<>();
+ Map<Integer, Integer> PGPRegionCounter = new HashMap<>();
+ Set<Integer> randomSelectedNodeIds = new HashSet<>();
+ Set<Integer> PGPSelectedNodeIds = new HashSet<>();
+
+ int randomMaxRegionCount = 0;
+ int randomMinRegionCount = Integer.MAX_VALUE;
+ int PGPMaxRegionCount = 0;
+ int PGPMinRegionCount = Integer.MAX_VALUE;
+
+ AVAILABLE_DATA_NODE_MAP
+ .keySet()
+ .forEach(
+ nodeId -> {
+ randomRegionCounter.put(nodeId, 0);
+ PGPRegionCounter.put(nodeId, 0);
+ });
+
+ for (TRegionReplicaSet remainReplicaSet : remainReplicas) {
+ TDataNodeLocation selectedNode =
+
randomSelectNodeForRegion(remainReplicaSet.getDataNodeLocations()).get();
+ LOGGER.info(
+ "Random Selected DataNode {} for Region {}",
+ selectedNode.getDataNodeId(),
+ remainReplicaSet.regionId);
+ randomSelectedNodeIds.add(selectedNode.getDataNodeId());
+ randomRegionCounter.put(
+ selectedNode.getDataNodeId(),
randomRegionCounter.get(selectedNode.getDataNodeId()) + 1);
+ }
+
+ LOGGER.info("Remain Replicas... :");
+ for (TRegionReplicaSet remainReplicaSet : remainReplicas) {
+ LOGGER.info("Region Group Id: {}", remainReplicaSet.regionId.id);
+ List<TDataNodeLocation> dataNodeLocations =
remainReplicaSet.getDataNodeLocations();
+ for (TDataNodeLocation dataNodeLocation : dataNodeLocations) {
+ LOGGER.info("DataNode: {}", dataNodeLocation.getDataNodeId());
+ }
+ }
+ Map<TConsensusGroupId, TRegionReplicaSet> remainReplicasMap = new
HashMap<>();
+ Map<String, List<TRegionReplicaSet>> databaseAllocatedRegionGroupMap = new
HashMap<>();
+ databaseAllocatedRegionGroupMap.put("database", allocateResult);
+
+ for (TRegionReplicaSet remainReplicaSet : remainReplicas) {
+ remainReplicasMap.put(remainReplicaSet.getRegionId(), remainReplicaSet);
+ }
+ Map<TConsensusGroupId, String> regionDatabaseMap = new HashMap<>();
+ for (TRegionReplicaSet replicaSet : allocateResult) {
+ regionDatabaseMap.put(replicaSet.getRegionId(), "database");
+ }
+ Map<TConsensusGroupId, TDataNodeConfiguration> result =
+ GCR_ALLOCATOR.removeNodeReplicaSelect(
+ AVAILABLE_DATA_NODE_MAP,
+ FREE_SPACE_MAP,
+ allocateResult,
+ regionDatabaseMap,
+ databaseAllocatedRegionGroupMap,
+ remainReplicasMap);
+
+ for (TConsensusGroupId regionId : result.keySet()) {
+ TDataNodeConfiguration selectedNode = result.get(regionId);
+
+ LOGGER.info(
+ "GCR Selected DataNode {} for Region {}",
+ selectedNode.getLocation().getDataNodeId(),
+ regionId);
+ PGPSelectedNodeIds.add(selectedNode.getLocation().getDataNodeId());
+ PGPRegionCounter.put(
+ selectedNode.getLocation().getDataNodeId(),
+ PGPRegionCounter.get(selectedNode.getLocation().getDataNodeId()) +
1);
+ }
+
+ for (Integer i : randomRegionCounter.keySet()) {
+ Integer value = randomRegionCounter.get(i);
+ randomMaxRegionCount = Math.max(randomMaxRegionCount, value);
+ randomMinRegionCount = Math.min(randomMinRegionCount, value);
+ }
+
+ for (Integer i : PGPRegionCounter.keySet()) {
+ Integer value = PGPRegionCounter.get(i);
+ PGPMaxRegionCount = Math.max(PGPMaxRegionCount, value);
+ PGPMinRegionCount = Math.min(PGPMinRegionCount, value);
+ }
+
+ Assert.assertEquals(TEST_DATA_NODE_NUM - 1, PGPSelectedNodeIds.size());
+ Assert.assertTrue(PGPSelectedNodeIds.size() >=
randomSelectedNodeIds.size());
+ Assert.assertTrue(randomMaxRegionCount >= PGPMaxRegionCount);
+ Assert.assertTrue(randomMinRegionCount <= PGPMinRegionCount);
+ }
+
+ @Test
+ public void testSelectDestNodeMultiDatabase() {
+ // Pre‑allocate RegionReplicaSets for multiple databases
+ final String[] DB_NAMES = {"db0", "db1", "db2"};
+ final int TOTAL_RG_NUM =
+ DATA_REGION_PER_DATA_NODE * TEST_DATA_NODE_NUM /
DATA_REPLICATION_FACTOR;
+
+ int basePerDb = TOTAL_RG_NUM / DB_NAMES.length;
+ int remainder = TOTAL_RG_NUM % DB_NAMES.length; // first <remainder> DBs
get one extra
+
+ Map<String, List<TRegionReplicaSet>> dbAllocatedMap = new HashMap<>();
+ List<TRegionReplicaSet> globalAllocatedList = new ArrayList<>();
+ int globalIndex = 0;
+
+ for (int dbIdx = 0; dbIdx < DB_NAMES.length; dbIdx++) {
+ String db = DB_NAMES[dbIdx];
+ int rgToCreate = basePerDb + (dbIdx < remainder ? 1 : 0);
+ List<TRegionReplicaSet> perDbList = new ArrayList<>();
+ dbAllocatedMap.put(db, perDbList);
+
+ for (int i = 0; i < rgToCreate; i++) {
+ TRegionReplicaSet rs =
+ GCR_ALLOCATOR.generateOptimalRegionReplicasDistribution(
+ AVAILABLE_DATA_NODE_MAP,
+ FREE_SPACE_MAP,
+ globalAllocatedList,
+ perDbList,
+ DATA_REPLICATION_FACTOR,
+ new TConsensusGroupId(TConsensusGroupType.DataRegion,
globalIndex++));
+ globalAllocatedList.add(rs);
+ perDbList.add(rs);
+ }
+ }
+
+ // Identify the replica‑sets that contain the node to be removed
+ List<TRegionReplicaSet> impactedReplicas =
+ globalAllocatedList.stream()
+ .filter(rs ->
rs.getDataNodeLocations().contains(REMOVE_DATANODE_LOCATION))
+ .collect(Collectors.toList());
+
+ // Simulate removing the faulty/offline node
+ AVAILABLE_DATA_NODE_MAP.remove(REMOVE_DATANODE_LOCATION.getDataNodeId());
+ FREE_SPACE_MAP.remove(REMOVE_DATANODE_LOCATION.getDataNodeId());
+
+ List<TRegionReplicaSet> remainReplicas = new ArrayList<>();
+ for (TRegionReplicaSet rs : impactedReplicas) {
+ globalAllocatedList.remove(rs);
+ rs.getDataNodeLocations().remove(REMOVE_DATANODE_LOCATION);
+ globalAllocatedList.add(rs);
+ remainReplicas.add(rs);
+ }
+
+ // Build helper maps for removeNodeReplicaSelect
+ Map<TConsensusGroupId, TRegionReplicaSet> remainMap = new HashMap<>();
+ remainReplicas.forEach(r -> remainMap.put(r.getRegionId(), r));
+
+ Map<TConsensusGroupId, String> regionDbMap = new HashMap<>();
+ dbAllocatedMap.forEach((db, list) -> list.forEach(r ->
regionDbMap.put(r.getRegionId(), db)));
+
+ // Baseline: random selection for comparison
+ Map<Integer, Integer> rndCount = new HashMap<>();
+ Map<Integer, Integer> planCount = new HashMap<>();
+ Set<Integer> rndNodes = new HashSet<>();
+ Set<Integer> planNodes = new HashSet<>();
+ int rndMax = 0, rndMin = Integer.MAX_VALUE;
+ int planMax = 0, planMin = Integer.MAX_VALUE;
+
+ AVAILABLE_DATA_NODE_MAP
+ .keySet()
+ .forEach(
+ n -> {
+ rndCount.put(n, 0);
+ planCount.put(n, 0);
+ });
+
+ for (TRegionReplicaSet r : remainReplicas) {
+ TDataNodeLocation pick =
randomSelectNodeForRegion(r.getDataNodeLocations()).get();
+ LOGGER.info("Random Selected DataNode {} for Region {}",
pick.getDataNodeId(), r.regionId);
+ rndNodes.add(pick.getDataNodeId());
+ rndCount.merge(pick.getDataNodeId(), 1, Integer::sum);
+ }
+
+ LOGGER.info("Remain Replicas... :");
+ for (TRegionReplicaSet remainReplicaSet : remainReplicas) {
+ LOGGER.info("Region Group Id: {}", remainReplicaSet.regionId.id);
+ List<TDataNodeLocation> dataNodeLocations =
remainReplicaSet.getDataNodeLocations();
+ for (TDataNodeLocation dataNodeLocation : dataNodeLocations) {
+ LOGGER.info("DataNode: {}", dataNodeLocation.getDataNodeId());
+ }
+ }
+
+ // Call the method under test
+ Map<TConsensusGroupId, TDataNodeConfiguration> result =
+ GCR_ALLOCATOR.removeNodeReplicaSelect(
+ AVAILABLE_DATA_NODE_MAP,
+ FREE_SPACE_MAP,
+ globalAllocatedList,
+ regionDbMap,
+ dbAllocatedMap,
+ remainMap);
+
+ for (TConsensusGroupId regionId : result.keySet()) {
+ TDataNodeConfiguration selectedNode = result.get(regionId);
+
+ LOGGER.info(
+ "GCR Selected DataNode {} for Region {}",
+ selectedNode.getLocation().getDataNodeId(),
+ regionId);
+ planNodes.add(selectedNode.getLocation().getDataNodeId());
+ planCount.merge(selectedNode.getLocation().getDataNodeId(), 1,
Integer::sum);
+ }
+
+ // Calculate load distribution
+ for (int c : rndCount.values()) {
+ rndMax = Math.max(rndMax, c);
+ rndMin = Math.min(rndMin, c);
+ }
+ for (int c : planCount.values()) {
+ planMax = Math.max(planMax, c);
+ planMin = Math.min(planMin, c);
+ }
+
+ // Assertions
+ Assert.assertEquals(TEST_DATA_NODE_NUM - 1, planNodes.size());
+ Assert.assertTrue(planNodes.size() >= rndNodes.size());
+ Assert.assertTrue(rndMax >= planMax);
+ Assert.assertTrue(rndMin <= planMin);
+ }
+
+ private Optional<TDataNodeLocation> randomSelectNodeForRegion(
+ List<TDataNodeLocation> regionReplicaNodes) {
+ List<TDataNodeConfiguration> dataNodeConfigurations =
+ new ArrayList<>(AVAILABLE_DATA_NODE_MAP.values());
+ // Randomly selected to ensure a basic load balancing
+ Collections.shuffle(dataNodeConfigurations);
+ return dataNodeConfigurations.stream()
+ .map(TDataNodeConfiguration::getLocation)
+ .filter(e -> !regionReplicaNodes.contains(e))
+ .findAny();
+ }
+}