This is an automated email from the ASF dual-hosted git repository.
hxpserein pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new b1ae6dc5cfb [To dev/1.3][remove datanode] cherry pick related commits
(#16711)
b1ae6dc5cfb is described below
commit b1ae6dc5cfb2aa866d54daecf88c3d30c4ccc548
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Fri Nov 7 19:15:43 2025 +0800
[To dev/1.3][remove datanode] cherry pick related commits (#16711)
* [remove datanode] GCR load balancing implement for removing datanode
(#15282)
(cherry picked from commit 7650b4793474b11405698bbe9290f814ef3afc06)
* [remove datanode] Fix IoTDBRemoveDataNodeNormalIT #15429
(cherry picked from commit 953780620df07eb6282d5391c0069fabd63cb40a)
* [remove datanode] Accelerate GCR load balancing implement (#15535)
(cherry picked from commit 51bad1ec88d5d20aa4e84f3ccd6841fbfba75390)
* [remove datanode] Fix ArrayIndexOutOfBoundsException in
computeInitialDbLoad (#15718)
(cherry picked from commit 346ee720e7a4a561e28bc210f9ec6111c45a1a46)
---
.../region/GreedyCopySetRegionGroupAllocator.java | 361 +++++++++++++++++++-
.../region/GreedyRegionGroupAllocator.java | 13 +
.../balancer/region/IRegionGroupAllocator.java | 21 ++
...artiteGraphReplicationRegionGroupAllocator.java | 13 +
.../procedure/env/RemoveDataNodeHandler.java | 186 +++++++++++
.../impl/node/RemoveDataNodesProcedure.java | 6 +-
.../GreedyCopySetRemoveNodeReplicaSelectTest.java | 370 +++++++++++++++++++++
7 files changed, 951 insertions(+), 19 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
index 2577455d41c..ae5f1c3eb44 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
@@ -27,10 +27,12 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.Set;
import java.util.stream.Collectors;
import static java.util.Map.Entry.comparingByValue;
@@ -39,7 +41,7 @@ import static java.util.Map.Entry.comparingByValue;
public class GreedyCopySetRegionGroupAllocator implements
IRegionGroupAllocator {
private static final Random RANDOM = new Random();
- private static final int GCR_MAX_OPTIMAL_PLAN_NUM = 100;
+ private static final int GCR_MAX_OPTIMAL_PLAN_NUM = 10;
private int replicationFactor;
// Sorted available DataNodeIds
@@ -50,16 +52,35 @@ public class GreedyCopySetRegionGroupAllocator implements
IRegionGroupAllocator
private int[] databaseRegionCounter;
// The number of 2-Region combinations in current cluster
private int[][] combinationCounter;
+ // The initial load for each database on each datanode
+ private Map<String, int[]> initialDbLoad;
// First Key: the sum of Regions at the DataNodes in the allocation result
is minimal
- int optimalRegionSum;
+ private int optimalRegionSum;
// Second Key: the sum of Regions at the DataNodes within the same Database
// in the allocation result is minimal
- int optimalDatabaseRegionSum;
+ private int optimalDatabaseRegionSum;
// Third Key: the sum of overlapped 2-Region combination Regions with
// other allocated RegionGroups is minimal
- int optimalCombinationSum;
- List<int[]> optimalReplicaSets;
+ private int optimalCombinationSum;
+ private List<int[]> optimalReplicaSets;
+
+ // Pre-calculation, scatterDelta[i][j] means the scatter increment between
region i and the old
+ // replica set when region i is placed on node j
+ private int[][] scatterDelta;
+ // For each region, the allowed candidate destination node IDs.
+ private Map<TConsensusGroupId, List<Integer>> allowedCandidatesMap;
+ // A list of regions that need to be migrated.
+ private List<TConsensusGroupId> dfsRegionKeys;
+ // A mapping from each region identifier to its corresponding database name.
+ private Map<TConsensusGroupId, String> regionDatabaseMap;
+ // Buffer holding best assignment arrays.
+ private int[] bestAssignment;
+ // An int array holding the best metrics found so far: [maxGlobalLoad,
maxDatabaseLoad,
+ // scatterValue].
+ private int[] bestMetrics;
+ // dfsRemoveNodeReplica batch size
+ private static final int BATCH_SIZE = 12;
private static class DataNodeEntry {
@@ -103,12 +124,9 @@ public class GreedyCopySetRegionGroupAllocator implements
IRegionGroupAllocator
int replicationFactor,
TConsensusGroupId consensusGroupId) {
try {
- prepare(
- replicationFactor,
- availableDataNodeMap,
- allocatedRegionGroups,
- databaseAllocatedRegionGroups);
- dfs(-1, 0, new int[replicationFactor], 0, 0);
+ this.replicationFactor = replicationFactor;
+ prepare(availableDataNodeMap, allocatedRegionGroups,
databaseAllocatedRegionGroups);
+ dfsAllocateReplica(-1, 0, new int[replicationFactor], 0, 0);
// Randomly pick one optimal plan as result
Collections.shuffle(optimalReplicaSets);
@@ -125,21 +143,332 @@ public class GreedyCopySetRegionGroupAllocator
implements IRegionGroupAllocator
}
}
+ @Override
+ public Map<TConsensusGroupId, TDataNodeConfiguration>
removeNodeReplicaSelect(
+ Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
+ Map<Integer, Double> freeDiskSpaceMap,
+ List<TRegionReplicaSet> allocatedRegionGroups,
+ Map<TConsensusGroupId, String> regionDatabaseMap,
+ Map<String, List<TRegionReplicaSet>> databaseAllocatedRegionGroupMap,
+ Map<TConsensusGroupId, TRegionReplicaSet> remainReplicasMap) {
+ try {
+ // 1. prepare: compute regionCounter, databaseRegionCounter, and
combinationCounter
+
+ prepare(availableDataNodeMap, allocatedRegionGroups,
Collections.emptyList());
+ computeInitialDbLoad(availableDataNodeMap,
databaseAllocatedRegionGroupMap);
+
+ // 2. Build allowed candidate set for each region that needs to be
migrated.
+ // For each region in remainReplicasMap, the candidate destination nodes
are all nodes in
+ // availableDataNodeMap
+ // excluding those already in the remain replica set.
+ List<TConsensusGroupId> regionKeys = new
ArrayList<>(remainReplicasMap.keySet());
+ allowedCandidatesMap = new HashMap<>();
+ this.regionDatabaseMap = regionDatabaseMap;
+ for (TConsensusGroupId regionId : regionKeys) {
+ TRegionReplicaSet remainReplicaSet = remainReplicasMap.get(regionId);
+ Set<Integer> notAllowedNodes =
+ remainReplicaSet.getDataNodeLocations().stream()
+ .map(TDataNodeLocation::getDataNodeId)
+ .collect(Collectors.toSet());
+
+ // Allowed candidates are the nodes not in the exclusion set
+ List<Integer> candidates =
+ availableDataNodeMap.keySet().stream()
+ .filter(nodeId -> !notAllowedNodes.contains(nodeId))
+ .sorted(
+ (a, b) -> {
+ int cmp = Integer.compare(regionCounter[a],
regionCounter[b]);
+ return (cmp != 0)
+ ? cmp
+ : Integer.compare(databaseRegionCounter[a],
databaseRegionCounter[b]);
+ })
+ .collect(Collectors.toList());
+ Collections.shuffle(candidates);
+
+ // Sort candidates in ascending order of current global load
(regionCounter)
+ allowedCandidatesMap.put(regionId, candidates);
+ }
+
+ // Optionally, sort regionKeys by the size of its candidate list
(smaller candidate sets
+ // first)
+ regionKeys.sort(Comparator.comparingInt(id ->
allowedCandidatesMap.get(id).size()));
+
+ // 3. Batch DFS
+ Map<TConsensusGroupId, TDataNodeConfiguration> result = new HashMap<>();
+
+ for (int start = 0; start < regionKeys.size(); start += BATCH_SIZE) {
+ dfsRegionKeys = regionKeys.subList(start, Math.min(start + BATCH_SIZE,
regionKeys.size()));
+ int batchSize = dfsRegionKeys.size();
+
+ // Initialize buffer
+ bestAssignment = new int[batchSize];
+ // bestMetrics holds the best found metrics: [maxGlobalLoad,
maxDatabaseLoad, scatterValue].
+ bestMetrics = new int[] {Integer.MAX_VALUE, Integer.MAX_VALUE,
Integer.MAX_VALUE};
+ // currentAssignment holds the candidate nodeId chosen for the region
at that index
+ int[] currentAssignment = new int[batchSize];
+ // additionalLoad holds the number of extra regions assigned to each
node in this migration
+ // solution.
+ int[] additionalLoad = new int[regionCounter.length];
+
+ scatterDelta = new int[batchSize][regionCounter.length];
+ for (int r = 0; r < batchSize; r++) {
+ TConsensusGroupId regionId = dfsRegionKeys.get(r);
+ for (int nodeId : allowedCandidatesMap.get(regionId)) {
+ int inc = 0;
+ for (TDataNodeLocation loc :
remainReplicasMap.get(regionId).getDataNodeLocations()) {
+ inc += combinationCounter[nodeId][loc.getDataNodeId()];
+ }
+ scatterDelta[r][nodeId] = inc;
+ }
+ }
+
+ int currentMaxGlobalLoad = 0;
+ for (int nodeId = 0; nodeId < regionCounter.length; nodeId++) {
+ currentMaxGlobalLoad = Math.max(currentMaxGlobalLoad,
regionCounter[nodeId]);
+ }
+
+ dfsRemoveNodeReplica(0, currentMaxGlobalLoad, 0, currentAssignment,
additionalLoad);
+
+ if (bestMetrics[0] == Integer.MAX_VALUE) {
+ // This should not happen if there is at least one valid assignment
+ return Collections.emptyMap();
+ }
+
+ for (int i = 0; i < batchSize; i++) {
+ TConsensusGroupId regionId = dfsRegionKeys.get(i);
+ int chosenNodeId = bestAssignment[i];
+ result.put(regionId, availableDataNodeMap.get(chosenNodeId));
+
+ regionCounter[chosenNodeId]++;
+ String db = regionDatabaseMap.get(regionId);
+ if (db != null) {
+ int[] dbLoad = initialDbLoad.computeIfAbsent(db, k -> new
int[regionCounter.length]);
+ dbLoad[chosenNodeId]++;
+ }
+ for (TDataNodeLocation loc :
remainReplicasMap.get(regionId).getDataNodeLocations()) {
+ combinationCounter[chosenNodeId][loc.getDataNodeId()]++;
+ combinationCounter[loc.getDataNodeId()][chosenNodeId]++;
+ }
+ }
+ }
+ return result;
+ } finally {
+ // Clear any temporary state to avoid impacting subsequent calls
+ clear();
+ }
+ }
+
+ /**
+ * DFS method that searches for migration target assignments.
+ *
+ * <p>It enumerates all possible assignments (one candidate for each region)
and collects
+ * candidate solutions in the optimalAssignments buffer. The evaluation
metrics for each complete
+ * assignment (i.e. when index == regionKeys.size()) are:
+ *
+ * <p>1. Max global load: the maximum over nodes of (regionCounter[node] +
additionalLoad[node])
+ * 2. Max database load: the maximum over nodes of
(databaseRegionCounter[node] +
+ * additionalLoad[node]) 3. Scatter value: computed per region, summing the
combinationCounter for
+ * every pair in the complete replica set. The complete replica set for a
region includes nodes in
+ * its remain replica set plus the newly assigned node.
+ *
+ * <p>The candidates are compared lexicographically (first by global load,
then by database load,
+ * then by scatter). When a better candidate is found, the
optimalAssignments buffer is cleared
+ * and updated; if the new candidate matches the best found metrics, it is
added to the buffer.
+ *
+ * <p>DFS search is pruned if the optimalAssignments buffer reaches CAPACITY.
+ *
+ * @param index Current DFS level, corresponding to regionKeys.get(index)
+ * @param currentMaxGlobalLoad The maximum global load across all data nodes.
+ * @param currentScatter The scatter value for the complete assignment.
+ * @param currentAssignment Current partial assignment; its length equals
the number of regions.
+ * @param additionalLoad Extra load currently assigned to each node.
+ */
+ private void dfsRemoveNodeReplica(
+ int index,
+ int currentMaxGlobalLoad,
+ int currentScatter,
+ int[] currentAssignment,
+ int[] additionalLoad) {
+ // Compute the maximum global load and maximum database load among all
nodes that received
+ // additional load.
+ int[] currentMetrics = getCurrentMetrics(additionalLoad, currentScatter,
currentAssignment);
+ // Lexicographically compare currentMetrics with bestMetrics.
+ // If currentMetrics is better than bestMetrics, update bestMetrics and
clear the candidate
+ // buffer.
+ boolean isBetter = false;
+ boolean isEqual = true;
+ for (int i = 0; i < 3; i++) {
+ if (currentMetrics[i] < bestMetrics[i]) {
+ isBetter = true;
+ isEqual = false;
+ break;
+ } else if (currentMetrics[i] > bestMetrics[i]) {
+ isEqual = false;
+ break;
+ }
+ }
+ if (!isBetter && !isEqual) {
+ return;
+ }
+
+ if (index == dfsRegionKeys.size()) {
+ if (isBetter) {
+ bestMetrics[0] = currentMetrics[0];
+ bestMetrics[1] = currentMetrics[1];
+ bestMetrics[2] = currentMetrics[2];
+ System.arraycopy(currentAssignment, 0, bestAssignment, 0,
dfsRegionKeys.size());
+ }
+ return;
+ }
+
+ // Process the region at the current index.
+ TConsensusGroupId regionId = dfsRegionKeys.get(index);
+ List<Integer> candidates = allowedCandidatesMap.get(regionId);
+ for (Integer candidate : candidates) {
+ currentAssignment[index] = candidate;
+ currentScatter += scatterDelta[index][currentAssignment[index]];
+ additionalLoad[candidate]++;
+ int nextMaxGlobalLoad =
+ Math.max(currentMaxGlobalLoad, regionCounter[candidate] +
additionalLoad[candidate]);
+
+ dfsRemoveNodeReplica(
+ index + 1, nextMaxGlobalLoad, currentScatter, currentAssignment,
additionalLoad);
+ // Backtrack
+ additionalLoad[candidate]--;
+ currentScatter -= scatterDelta[index][currentAssignment[index]];
+ }
+ }
+
+ /**
+ * Computes the squared sum of the maximum load for each database.
+ *
+ * <p>For each database, this method calculates the maximum load on any data
node by summing the
+ * initial load (from {@code initialDbLoad}) with the additional load
assigned during migration
+ * (accumulated in {@code currentAssignment}), and then squares this maximum
load. Finally, it
+ * returns the sum of these squared maximum loads across all databases.
+ *
+ * @param currentAssignment an array where each element is the nodeId
assigned for the
+ * corresponding region in {@code regionKeys}.
+ * @param regionKeys a list of region identifiers (TConsensusGroupId)
representing the regions
+ * under migration.
+ * @param regionDatabaseMap a mapping from each region identifier to its
corresponding database
+ * name.
+ * @return the sum of the squares of the maximum loads computed for each
database.
+ */
+ private int computeDatabaseLoadSquaredSum(
+ int[] currentAssignment,
+ List<TConsensusGroupId> regionKeys,
+ Map<TConsensusGroupId, String> regionDatabaseMap) {
+ Map<String, int[]> extraLoadPerDb = new HashMap<>();
+ // Initialize extra load counters for each database using the number of
nodes from
+ // regionCounter.
+ for (String db : initialDbLoad.keySet()) {
+ extraLoadPerDb.put(db, new int[regionCounter.length]);
+ }
+ // Accumulate extra load per database based on the current assignment.
+ for (int i = 0; i < regionKeys.size(); i++) {
+ TConsensusGroupId regionId = regionKeys.get(i);
+ String db = regionDatabaseMap.get(regionId);
+ int nodeId = currentAssignment[i];
+ extraLoadPerDb.get(db)[nodeId]++;
+ }
+ int sumSquared = 0;
+ // For each database, compute the maximum load across nodes and add its
square to the sum.
+ for (String db : initialDbLoad.keySet()) {
+ int[] initLoads = initialDbLoad.get(db);
+ int[] extras = extraLoadPerDb.get(db);
+ int maxLoad = 0;
+ for (int nodeId = 0; nodeId < regionCounter.length; nodeId++) {
+ int load = initLoads[nodeId] + extras[nodeId];
+ if (load > maxLoad) {
+ maxLoad = load;
+ }
+ }
+ sumSquared += maxLoad * maxLoad;
+ }
+ return sumSquared;
+ }
+
+ /**
+ * Computes the current migration metrics.
+ *
+ * <p>This method calculates three key metrics:
+ *
+ * <ol>
+ * <li><strong>Max Global Load:</strong> The maximum load among all nodes,
computed as the sum
+ * of the initial region load (from {@code regionCounter}) and the
additional load (from
+ * {@code additionalLoad}).
+ * <li><strong>Database Load Squared Sum:</strong> The squared sum of the
maximum load per
+ * database, which is computed by {@link
#computeDatabaseLoadSquaredSum(int[], List, Map)}.
+ * <li><strong>Scatter Value:</strong> A provided metric that reflects
additional balancing
+ * criteria.
+ * </ol>
+ *
+ * The metrics are returned as an array of three integers in the order:
[maxGlobalLoad,
+ * databaseLoadSquaredSum, scatterValue].
+ *
+ * @param additionalLoad an array representing the additional load assigned
to each node during
+ * migration.
+ * @param currentScatter the current scatter value metric.
+ * @param currentAssignment an array where each element is the nodeId
assigned for the
+ * corresponding region in {@code regionKeys}.
+ * @return an integer array of size 3: [maxGlobalLoad,
databaseLoadSquaredSum, scatterValue].
+ */
+ private int[] getCurrentMetrics(
+ int[] additionalLoad, int currentScatter, int[] currentAssignment) {
+ int currentMaxGlobalLoad = 0;
+ // Calculate the maximum global load across all data nodes.
+ for (int nodeId = 0; nodeId < additionalLoad.length; nodeId++) {
+ int globalLoad = regionCounter[nodeId] + additionalLoad[nodeId];
+ currentMaxGlobalLoad = Math.max(currentMaxGlobalLoad, globalLoad);
+ }
+ // Compute the database load squared sum using the helper method.
+ int dbLoadSquaredSum =
+ computeDatabaseLoadSquaredSum(currentAssignment, dfsRegionKeys,
regionDatabaseMap);
+ // Build current metrics in order [maxGlobalLoad, maxDatabaseLoad,
scatterValue]
+ return new int[] {currentMaxGlobalLoad, dbLoadSquaredSum, currentScatter};
+ }
+
+ /**
+ * Compute the initial load for each database on each data node.
+ *
+ * @param availableDataNodeMap currently available DataNodes, ensure size()
>= replicationFactor
+ * @param databaseAllocatedRegionGroupMap Mapping of each database to its
list of replica sets.
+ */
+ private void computeInitialDbLoad(
+ Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
+ Map<String, List<TRegionReplicaSet>> databaseAllocatedRegionGroupMap) {
+ initialDbLoad = new HashMap<>();
+
+ // Iterate over each database and count the number of regions on each data
node across all its
+ // replica sets.
+ for (String database : databaseAllocatedRegionGroupMap.keySet()) {
+ List<TRegionReplicaSet> replicaSets =
databaseAllocatedRegionGroupMap.get(database);
+ int[] load = new int[regionCounter.length];
+ for (TRegionReplicaSet replicaSet : replicaSets) {
+ for (TDataNodeLocation location : replicaSet.getDataNodeLocations()) {
+ int nodeId = location.getDataNodeId();
+ if (availableDataNodeMap.containsKey(nodeId)) {
+ load[nodeId]++;
+ }
+ }
+ }
+ initialDbLoad.put(database, load);
+ }
+ }
+
/**
* Prepare some statistics before dfs.
*
- * @param replicationFactor replication factor in the cluster
* @param availableDataNodeMap currently available DataNodes, ensure size()
>= replicationFactor
* @param allocatedRegionGroups already allocated RegionGroups in the cluster
* @param databaseAllocatedRegionGroups already allocated RegionGroups in
the same Database
*/
private void prepare(
- int replicationFactor,
Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
List<TRegionReplicaSet> allocatedRegionGroups,
List<TRegionReplicaSet> databaseAllocatedRegionGroups) {
- this.replicationFactor = replicationFactor;
// Store the maximum DataNodeId
int maxDataNodeId =
Math.max(
@@ -225,7 +554,7 @@ public class GreedyCopySetRegionGroupAllocator implements
IRegionGroupAllocator
* current allocation plan
* @param regionSum the sum of Regions at the DataNodes in the current
allocation plan
*/
- private void dfs(
+ private void dfsAllocateReplica(
int lastIndex,
int currentReplica,
int[] currentReplicaSet,
@@ -274,7 +603,7 @@ public class GreedyCopySetRegionGroupAllocator implements
IRegionGroupAllocator
for (int i = lastIndex + 1; i < dataNodeIds.length; i++) {
// Decide the next DataNodeId in the allocation plan
currentReplicaSet[currentReplica] = dataNodeIds[i];
- dfs(
+ dfsAllocateReplica(
i,
currentReplica + 1,
currentReplicaSet,
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java
index d05a8accbec..9c8f032cee0 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java
@@ -57,6 +57,19 @@ public class GreedyRegionGroupAllocator implements
IRegionGroupAllocator {
weightList.stream().limit(replicationFactor).collect(Collectors.toList()));
}
+ @Override
+ public Map<TConsensusGroupId, TDataNodeConfiguration>
removeNodeReplicaSelect(
+ Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
+ Map<Integer, Double> freeDiskSpaceMap,
+ List<TRegionReplicaSet> allocatedRegionGroups,
+ Map<TConsensusGroupId, String> regionDatabaseMap,
+ Map<String, List<TRegionReplicaSet>> databaseAllocatedRegionGroupMap,
+ Map<TConsensusGroupId, TRegionReplicaSet> remainReplicasMap) {
+ // TODO: Implement this method
+ throw new UnsupportedOperationException(
+ "The removeNodeReplicaSelect method of GreedyRegionGroupAllocator is
yet to be implemented.");
+ }
+
private List<TDataNodeLocation> buildWeightList(
Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
Map<Integer, Double> freeDiskSpaceMap,
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/IRegionGroupAllocator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/IRegionGroupAllocator.java
index 554168d8497..24200548163 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/IRegionGroupAllocator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/IRegionGroupAllocator.java
@@ -47,4 +47,25 @@ public interface IRegionGroupAllocator {
List<TRegionReplicaSet> databaseAllocatedRegionGroups,
int replicationFactor,
TConsensusGroupId consensusGroupId);
+
+ /**
+ * Select the optimal DataNode to place the new replica on along with the
remaining replica set.
+ *
+ * @param availableDataNodeMap DataNodes that can be used for allocation
+ * @param freeDiskSpaceMap The free disk space of the DataNodes
+ * @param allocatedRegionGroups Allocated RegionGroups
+ * @param regionDatabaseMap A mapping from each region identifier to its
corresponding database
+ * name
+ * @param databaseAllocatedRegionGroupMap Allocated RegionGroups within the
same Database with the
+ * replica set
+ * @param remainReplicasMap the remaining replica set excluding the removed
DataNodes
+ * @return The optimal DataNode to place the new replica on along with the
remaining replicas
+ */
+ Map<TConsensusGroupId, TDataNodeConfiguration> removeNodeReplicaSelect(
+ Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
+ Map<Integer, Double> freeDiskSpaceMap,
+ List<TRegionReplicaSet> allocatedRegionGroups,
+ Map<TConsensusGroupId, String> regionDatabaseMap,
+ Map<String, List<TRegionReplicaSet>> databaseAllocatedRegionGroupMap,
+ Map<TConsensusGroupId, TRegionReplicaSet> remainReplicasMap);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphReplicationRegionGroupAllocator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphReplicationRegionGroupAllocator.java
index 1205e0abc06..4a2237805ad 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphReplicationRegionGroupAllocator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphReplicationRegionGroupAllocator.java
@@ -113,6 +113,19 @@ public class PartiteGraphReplicationRegionGroupAllocator
implements IRegionGroup
return result;
}
+ @Override
+ public Map<TConsensusGroupId, TDataNodeConfiguration>
removeNodeReplicaSelect(
+ Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
+ Map<Integer, Double> freeDiskSpaceMap,
+ List<TRegionReplicaSet> allocatedRegionGroups,
+ Map<TConsensusGroupId, String> regionDatabaseMap,
+ Map<String, List<TRegionReplicaSet>> databaseAllocatedRegionGroupMap,
+ Map<TConsensusGroupId, TRegionReplicaSet> remainReplicasMap) {
+ // TODO: Implement this method
+ throw new UnsupportedOperationException(
+ "The removeNodeReplicaSelect method of
PartiteGraphPlacementRegionGroupAllocator is yet to be implemented.");
+ }
+
private void prepare(
int replicationFactor,
Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeHandler.java
index eaa16f47907..980dd712eb0 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeHandler.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.confignode.procedure.env;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
@@ -37,6 +38,8 @@ import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import
org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
import
org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
import org.apache.iotdb.confignode.manager.ConfigManager;
+import
org.apache.iotdb.confignode.manager.load.balancer.region.GreedyCopySetRegionGroupAllocator;
+import
org.apache.iotdb.confignode.manager.load.balancer.region.IRegionGroupAllocator;
import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
import
org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
@@ -51,10 +54,13 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import java.util.function.Function;
import java.util.stream.Collectors;
import static
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
@@ -70,8 +76,22 @@ public class RemoveDataNodeHandler {
private final ConfigManager configManager;
+ private final IRegionGroupAllocator regionGroupAllocator;
+
public RemoveDataNodeHandler(ConfigManager configManager) {
this.configManager = configManager;
+
+ switch
(ConfigNodeDescriptor.getInstance().getConf().getRegionGroupAllocatePolicy()) {
+ case GREEDY:
+ this.regionGroupAllocator = new GreedyCopySetRegionGroupAllocator();
+ break;
+ case PGR:
+ this.regionGroupAllocator = new GreedyCopySetRegionGroupAllocator();
+ break;
+ case GCR:
+ default:
+ this.regionGroupAllocator = new GreedyCopySetRegionGroupAllocator();
+ }
}
/**
@@ -193,6 +213,172 @@ public class RemoveDataNodeHandler {
return regionMigrationPlans;
}
+ /**
+ * Retrieves all region migration plans for the specified removed DataNodes
and selects the
+ * destination.
+ *
+ * @param removedDataNodes the list of DataNodes from which to obtain
migration plans
+ * @return a list of region migration plans associated with the removed
DataNodes
+ */
+ public List<RegionMigrationPlan> selectedRegionMigrationPlans(
+ List<TDataNodeLocation> removedDataNodes) {
+
+ Set<Integer> removedDataNodesSet = new HashSet<>();
+ for (TDataNodeLocation removedDataNode : removedDataNodes) {
+ removedDataNodesSet.add(removedDataNode.dataNodeId);
+ }
+
+ final List<TDataNodeConfiguration> availableDataNodes =
+ configManager
+ .getNodeManager()
+ .filterDataNodeThroughStatus(NodeStatus.Running,
NodeStatus.Unknown)
+ .stream()
+ .filter(node ->
!removedDataNodesSet.contains(node.getLocation().getDataNodeId()))
+ .collect(Collectors.toList());
+
+ List<RegionMigrationPlan> regionMigrationPlans = new ArrayList<>();
+
+ regionMigrationPlans.addAll(
+ selectMigrationPlans(availableDataNodes,
TConsensusGroupType.DataRegion, removedDataNodes));
+
+ regionMigrationPlans.addAll(
+ selectMigrationPlans(
+ availableDataNodes, TConsensusGroupType.SchemaRegion,
removedDataNodes));
+
+ return regionMigrationPlans;
+ }
+
+ public List<RegionMigrationPlan> selectMigrationPlans(
+ List<TDataNodeConfiguration> availableDataNodes,
+ TConsensusGroupType consensusGroupType,
+ List<TDataNodeLocation> removedDataNodes) {
+
+ // Retrieve all allocated replica sets for the given consensus group type
+ List<TRegionReplicaSet> allocatedReplicaSets =
+
configManager.getPartitionManager().getAllReplicaSets(consensusGroupType);
+
+ // Step 1: Identify affected replica sets and record the removed DataNode
for each replica set
+ Map<TConsensusGroupId, TDataNodeLocation> removedNodeMap = new HashMap<>();
+ Set<TRegionReplicaSet> affectedReplicaSets =
+ identifyAffectedReplicaSets(allocatedReplicaSets, removedDataNodes,
removedNodeMap);
+
+ // Step 2: Update affected replica sets by removing the removed DataNode
+ updateReplicaSets(allocatedReplicaSets, affectedReplicaSets,
removedNodeMap);
+
+ // Build a mapping of available DataNodes and their free disk space
(computed only once)
+ Map<Integer, TDataNodeConfiguration> availableDataNodeMap =
+ buildAvailableDataNodeMap(availableDataNodes);
+ Map<Integer, Double> freeDiskSpaceMap =
buildFreeDiskSpaceMap(availableDataNodes);
+
+ // Step 3: For each affected replica set, select a new destination
DataNode and create a
+ // migration plan
+ List<RegionMigrationPlan> migrationPlans = new ArrayList<>();
+
+ Map<TConsensusGroupId, TRegionReplicaSet> remainReplicasMap = new
HashMap<>();
+ Map<TConsensusGroupId, String> regionDatabaseMap = new HashMap<>();
+ Map<String, List<TRegionReplicaSet>> databaseAllocatedRegionGroupMap = new
HashMap<>();
+
+ for (TRegionReplicaSet replicaSet : affectedReplicaSets) {
+ remainReplicasMap.put(replicaSet.getRegionId(), replicaSet);
+ String database =
+
configManager.getPartitionManager().getRegionDatabase(replicaSet.getRegionId());
+ List<TRegionReplicaSet> databaseAllocatedReplicaSets =
+ configManager.getPartitionManager().getAllReplicaSets(database,
consensusGroupType);
+ regionDatabaseMap.put(replicaSet.getRegionId(), database);
+ databaseAllocatedRegionGroupMap.put(database,
databaseAllocatedReplicaSets);
+ }
+
+ Map<TConsensusGroupId, TDataNodeConfiguration> result =
+ regionGroupAllocator.removeNodeReplicaSelect(
+ availableDataNodeMap,
+ freeDiskSpaceMap,
+ allocatedReplicaSets,
+ regionDatabaseMap,
+ databaseAllocatedRegionGroupMap,
+ remainReplicasMap);
+
+ for (TConsensusGroupId regionId : result.keySet()) {
+
+ TDataNodeConfiguration selectedNode = result.get(regionId);
+ LOGGER.info(
+ "Selected DataNode {} for Region {}",
+ selectedNode.getLocation().getDataNodeId(),
+ regionId);
+
+ // Create the migration plan
+ RegionMigrationPlan plan = RegionMigrationPlan.create(regionId,
removedNodeMap.get(regionId));
+ plan.setToDataNode(selectedNode.getLocation());
+ migrationPlans.add(plan);
+ }
+ return migrationPlans;
+ }
+
+ /**
+ * Identifies affected replica sets from allocatedReplicaSets that contain
any DataNode in
+ * removedDataNodes, and records the removed DataNode for each replica set.
+ */
+ private Set<TRegionReplicaSet> identifyAffectedReplicaSets(
+ List<TRegionReplicaSet> allocatedReplicaSets,
+ List<TDataNodeLocation> removedDataNodes,
+ Map<TConsensusGroupId, TDataNodeLocation> removedNodeMap) {
+
+ Set<TRegionReplicaSet> affectedReplicaSets = new HashSet<>();
+ // Create a copy of allocatedReplicaSets to avoid concurrent modifications
+ List<TRegionReplicaSet> allocatedCopy = new
ArrayList<>(allocatedReplicaSets);
+
+ for (TDataNodeLocation removedNode : removedDataNodes) {
+ allocatedCopy.stream()
+ .filter(replicaSet ->
replicaSet.getDataNodeLocations().contains(removedNode))
+ .forEach(
+ replicaSet -> {
+ removedNodeMap.put(replicaSet.getRegionId(), removedNode);
+ affectedReplicaSets.add(replicaSet);
+ });
+ }
+ return affectedReplicaSets;
+ }
+
+ /**
+ * Updates each affected replica set by removing the removed DataNode from
its list. The
+ * allocatedReplicaSets list is updated accordingly.
+ */
+ private void updateReplicaSets(
+ List<TRegionReplicaSet> allocatedReplicaSets,
+ Set<TRegionReplicaSet> affectedReplicaSets,
+ Map<TConsensusGroupId, TDataNodeLocation> removedNodeMap) {
+ for (TRegionReplicaSet replicaSet : affectedReplicaSets) {
+ // Remove the replica set, update its node list, then re-add it
+ allocatedReplicaSets.remove(replicaSet);
+
replicaSet.getDataNodeLocations().remove(removedNodeMap.get(replicaSet.getRegionId()));
+ allocatedReplicaSets.add(replicaSet);
+ }
+ }
+
+ /**
+ * Constructs a mapping from DataNodeId to TDataNodeConfiguration from the
available DataNodes.
+ */
+ private Map<Integer, TDataNodeConfiguration> buildAvailableDataNodeMap(
+ List<TDataNodeConfiguration> availableDataNodes) {
+ return availableDataNodes.stream()
+ .collect(
+ Collectors.toMap(
+ dataNode -> dataNode.getLocation().getDataNodeId(),
Function.identity()));
+ }
+
+ /** Constructs a mapping of free disk space for each DataNode. */
+ private Map<Integer, Double> buildFreeDiskSpaceMap(
+ List<TDataNodeConfiguration> availableDataNodes) {
+ Map<Integer, Double> freeDiskSpaceMap = new
HashMap<>(availableDataNodes.size());
+ availableDataNodes.forEach(
+ dataNode ->
+ freeDiskSpaceMap.put(
+ dataNode.getLocation().getDataNodeId(),
+ configManager
+ .getLoadManager()
+
.getFreeDiskSpace(dataNode.getLocation().getDataNodeId())));
+ return freeDiskSpaceMap;
+ }
+
/**
* Broadcasts DataNodes' status change, preventing disabled DataNodes from
accepting read or write
* requests.
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodesProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodesProcedure.java
index 68b0550e0df..a531d67955c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodesProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodesProcedure.java
@@ -121,7 +121,8 @@ public class RemoveDataNodesProcedure extends
AbstractNodeProcedure<RemoveDataNo
removedDataNodes.forEach(
dataNode -> removedNodeStatusMap.put(dataNode.getDataNodeId(),
NodeStatus.Removing));
removeDataNodeHandler.changeDataNodeStatus(removedDataNodes,
removedNodeStatusMap);
- regionMigrationPlans =
removeDataNodeHandler.getRegionMigrationPlans(removedDataNodes);
+ regionMigrationPlans =
+
removeDataNodeHandler.selectedRegionMigrationPlans(removedDataNodes);
LOG.info(
"{}, DataNode regions to be removed is {}",
REMOVE_DATANODE_PROCESS,
@@ -165,8 +166,7 @@ public class RemoveDataNodesProcedure extends
AbstractNodeProcedure<RemoveDataNo
regionMigrationPlan -> {
TConsensusGroupId regionId = regionMigrationPlan.getRegionId();
TDataNodeLocation removedDataNode =
regionMigrationPlan.getFromDataNode();
- TDataNodeLocation destDataNode =
- env.getRegionMaintainHandler().findDestDataNode(regionId);
+ TDataNodeLocation destDataNode = regionMigrationPlan.getToDataNode();
// TODO: need to improve the coordinator selection method here,
maybe through load
// balancing and other means.
final TDataNodeLocation coordinatorForAddPeer =
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRemoveNodeReplicaSelectTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRemoveNodeReplicaSelectTest.java
new file mode 100644
index 00000000000..e28b4dda18f
--- /dev/null
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRemoveNodeReplicaSelectTest.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.balancer.region;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class GreedyCopySetRemoveNodeReplicaSelectTest {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(GreedyCopySetRemoveNodeReplicaSelectTest.class);
+
+ private static final IRegionGroupAllocator GCR_ALLOCATOR =
+ new GreedyCopySetRegionGroupAllocator();
+
+ private static final TDataNodeLocation REMOVE_DATANODE_LOCATION =
+ new TDataNodeLocation().setDataNodeId(5);
+
+ private static final int TEST_DATA_NODE_NUM = 5;
+
+ private static final int DATA_REGION_PER_DATA_NODE = 30;
+
+ private static final int DATA_REPLICATION_FACTOR = 2;
+
+ private static final Map<Integer, TDataNodeConfiguration>
AVAILABLE_DATA_NODE_MAP =
+ new HashMap<>();
+
+ private static final Map<Integer, Double> FREE_SPACE_MAP = new HashMap<>();
+
+ @Before
+ public void setUp() {
+ // Construct TEST_DATA_NODE_NUM DataNodes
+ AVAILABLE_DATA_NODE_MAP.clear();
+ FREE_SPACE_MAP.clear();
+ for (int i = 1; i <= TEST_DATA_NODE_NUM; i++) {
+ AVAILABLE_DATA_NODE_MAP.put(
+ i, new TDataNodeConfiguration().setLocation(new
TDataNodeLocation().setDataNodeId(i)));
+ FREE_SPACE_MAP.put(i, Math.random());
+ }
+ }
+
+ @Test
+ public void testSelectDestNode() {
+ final int dataRegionGroupNum =
+ DATA_REGION_PER_DATA_NODE * TEST_DATA_NODE_NUM /
DATA_REPLICATION_FACTOR;
+
+ List<TRegionReplicaSet> allocateResult = new ArrayList<>();
+ List<TRegionReplicaSet> databaseAllocateResult = new ArrayList<>();
+ for (int index = 0; index < dataRegionGroupNum; index++) {
+ TRegionReplicaSet replicaSet =
+ GCR_ALLOCATOR.generateOptimalRegionReplicasDistribution(
+ AVAILABLE_DATA_NODE_MAP,
+ FREE_SPACE_MAP,
+ allocateResult,
+ allocateResult,
+ DATA_REPLICATION_FACTOR,
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, index));
+ TRegionReplicaSet replicaSetCopy = new TRegionReplicaSet(replicaSet);
+
+ allocateResult.add(replicaSet);
+ databaseAllocateResult.add(replicaSetCopy);
+ }
+
+ List<TRegionReplicaSet> migratedReplicas =
+ allocateResult.stream()
+ .filter(
+ replicaSet ->
replicaSet.getDataNodeLocations().contains(REMOVE_DATANODE_LOCATION))
+ .collect(Collectors.toList());
+
+ AVAILABLE_DATA_NODE_MAP.remove(REMOVE_DATANODE_LOCATION.getDataNodeId());
+ FREE_SPACE_MAP.remove(REMOVE_DATANODE_LOCATION.getDataNodeId());
+
+ List<TRegionReplicaSet> remainReplicas = new ArrayList<>();
+ for (TRegionReplicaSet replicaSet : migratedReplicas) {
+ List<TDataNodeLocation> dataNodeLocations =
replicaSet.getDataNodeLocations();
+ allocateResult.remove(replicaSet);
+ dataNodeLocations.remove(REMOVE_DATANODE_LOCATION);
+ allocateResult.add(replicaSet);
+ remainReplicas.add(replicaSet);
+ }
+
+ Map<Integer, Integer> randomRegionCounter = new HashMap<>();
+ Map<Integer, Integer> PGPRegionCounter = new HashMap<>();
+ Set<Integer> randomSelectedNodeIds = new HashSet<>();
+ Set<Integer> PGPSelectedNodeIds = new HashSet<>();
+
+ int randomMaxRegionCount = 0;
+ int randomMinRegionCount = Integer.MAX_VALUE;
+ int PGPMaxRegionCount = 0;
+ int PGPMinRegionCount = Integer.MAX_VALUE;
+
+ AVAILABLE_DATA_NODE_MAP
+ .keySet()
+ .forEach(
+ nodeId -> {
+ randomRegionCounter.put(nodeId, 0);
+ PGPRegionCounter.put(nodeId, 0);
+ });
+
+ for (TRegionReplicaSet replicaSet : allocateResult) {
+ for (TDataNodeLocation loc : replicaSet.getDataNodeLocations()) {
+ randomRegionCounter.put(
+ loc.getDataNodeId(), randomRegionCounter.get(loc.getDataNodeId())
+ 1);
+ PGPRegionCounter.put(loc.getDataNodeId(),
PGPRegionCounter.get(loc.getDataNodeId()) + 1);
+ }
+ }
+
+ for (TRegionReplicaSet remainReplicaSet : remainReplicas) {
+ TDataNodeLocation selectedNode =
+
randomSelectNodeForRegion(remainReplicaSet.getDataNodeLocations()).get();
+ LOGGER.info(
+ "Random Selected DataNode {} for Region {}",
+ selectedNode.getDataNodeId(),
+ remainReplicaSet.regionId);
+ randomSelectedNodeIds.add(selectedNode.getDataNodeId());
+ randomRegionCounter.put(
+ selectedNode.getDataNodeId(),
randomRegionCounter.get(selectedNode.getDataNodeId()) + 1);
+ }
+
+ LOGGER.info("Remain Replicas... :");
+ for (TRegionReplicaSet remainReplicaSet : remainReplicas) {
+ LOGGER.info("Region Group Id: {}", remainReplicaSet.regionId.id);
+ List<TDataNodeLocation> dataNodeLocations =
remainReplicaSet.getDataNodeLocations();
+ for (TDataNodeLocation dataNodeLocation : dataNodeLocations) {
+ LOGGER.info("DataNode: {}", dataNodeLocation.getDataNodeId());
+ }
+ }
+ Map<TConsensusGroupId, TRegionReplicaSet> remainReplicasMap = new
HashMap<>();
+ Map<String, List<TRegionReplicaSet>> databaseAllocatedRegionGroupMap = new
HashMap<>();
+ databaseAllocatedRegionGroupMap.put("database", databaseAllocateResult);
+
+ for (TRegionReplicaSet remainReplicaSet : remainReplicas) {
+ remainReplicasMap.put(remainReplicaSet.getRegionId(), remainReplicaSet);
+ }
+ Map<TConsensusGroupId, String> regionDatabaseMap = new HashMap<>();
+ for (TRegionReplicaSet replicaSet : allocateResult) {
+ regionDatabaseMap.put(replicaSet.getRegionId(), "database");
+ }
+ Map<TConsensusGroupId, TDataNodeConfiguration> result =
+ GCR_ALLOCATOR.removeNodeReplicaSelect(
+ AVAILABLE_DATA_NODE_MAP,
+ FREE_SPACE_MAP,
+ allocateResult,
+ regionDatabaseMap,
+ databaseAllocatedRegionGroupMap,
+ remainReplicasMap);
+
+ for (TConsensusGroupId regionId : result.keySet()) {
+ TDataNodeConfiguration selectedNode = result.get(regionId);
+
+ LOGGER.info(
+ "GCR Selected DataNode {} for Region {}",
+ selectedNode.getLocation().getDataNodeId(),
+ regionId);
+ PGPSelectedNodeIds.add(selectedNode.getLocation().getDataNodeId());
+ PGPRegionCounter.put(
+ selectedNode.getLocation().getDataNodeId(),
+ PGPRegionCounter.get(selectedNode.getLocation().getDataNodeId()) +
1);
+ }
+
+ LOGGER.info("randomRegionCount:");
+
+ for (Integer i : randomRegionCounter.keySet()) {
+ Integer value = randomRegionCounter.get(i);
+ randomMaxRegionCount = Math.max(randomMaxRegionCount, value);
+ randomMinRegionCount = Math.min(randomMinRegionCount, value);
+ LOGGER.info("{} : {}", i, value);
+ }
+
+ LOGGER.info("PGPRegionCount:");
+
+ for (Integer i : PGPRegionCounter.keySet()) {
+ Integer value = PGPRegionCounter.get(i);
+ PGPMaxRegionCount = Math.max(PGPMaxRegionCount, value);
+ PGPMinRegionCount = Math.min(PGPMinRegionCount, value);
+ LOGGER.info("{} : {}", i, value);
+ }
+
+ LOGGER.info("PGPSelectedNodeIds size: {}", PGPSelectedNodeIds.size());
+ Assert.assertEquals(TEST_DATA_NODE_NUM - 1, PGPSelectedNodeIds.size());
+ LOGGER.info("randomSelectedNodeIds size: {}",
randomSelectedNodeIds.size());
+ Assert.assertTrue(PGPSelectedNodeIds.size() >=
randomSelectedNodeIds.size());
+ LOGGER.info(
+ "randomMaxRegionCount: {}, PGPMaxRegionCount: {}",
randomMaxRegionCount, PGPMaxRegionCount);
+ Assert.assertTrue(randomMaxRegionCount >= PGPMaxRegionCount);
+ }
+
+ @Test
+ public void testSelectDestNodeMultiDatabase() {
+ // Pre‑allocate RegionReplicaSets for multiple databases
+ final String[] DB_NAMES = {"db0", "db1", "db2"};
+ final int TOTAL_RG_NUM =
+ DATA_REGION_PER_DATA_NODE * TEST_DATA_NODE_NUM /
DATA_REPLICATION_FACTOR;
+
+ int basePerDb = TOTAL_RG_NUM / DB_NAMES.length;
+ int remainder = TOTAL_RG_NUM % DB_NAMES.length; // first <remainder> DBs
get one extra
+
+ Map<String, List<TRegionReplicaSet>> dbAllocatedMap = new HashMap<>();
+ List<TRegionReplicaSet> globalAllocatedList = new ArrayList<>();
+ int globalIndex = 0;
+
+ for (int dbIdx = 0; dbIdx < DB_NAMES.length; dbIdx++) {
+ String db = DB_NAMES[dbIdx];
+ int rgToCreate = basePerDb + (dbIdx < remainder ? 1 : 0);
+ List<TRegionReplicaSet> perDbList = new ArrayList<>();
+ dbAllocatedMap.put(db, perDbList);
+
+ for (int i = 0; i < rgToCreate; i++) {
+ TRegionReplicaSet rs =
+ GCR_ALLOCATOR.generateOptimalRegionReplicasDistribution(
+ AVAILABLE_DATA_NODE_MAP,
+ FREE_SPACE_MAP,
+ globalAllocatedList,
+ perDbList,
+ DATA_REPLICATION_FACTOR,
+ new TConsensusGroupId(TConsensusGroupType.DataRegion,
globalIndex++));
+ globalAllocatedList.add(rs);
+ perDbList.add(rs);
+ }
+ }
+
+ // Identify the replica‑sets that contain the node to be removed
+ List<TRegionReplicaSet> impactedReplicas =
+ globalAllocatedList.stream()
+ .filter(rs ->
rs.getDataNodeLocations().contains(REMOVE_DATANODE_LOCATION))
+ .collect(Collectors.toList());
+
+ // Simulate removing the faulty/offline node
+ AVAILABLE_DATA_NODE_MAP.remove(REMOVE_DATANODE_LOCATION.getDataNodeId());
+ FREE_SPACE_MAP.remove(REMOVE_DATANODE_LOCATION.getDataNodeId());
+
+ List<TRegionReplicaSet> remainReplicas = new ArrayList<>();
+ for (TRegionReplicaSet rs : impactedReplicas) {
+ globalAllocatedList.remove(rs);
+ rs.getDataNodeLocations().remove(REMOVE_DATANODE_LOCATION);
+ globalAllocatedList.add(rs);
+ remainReplicas.add(rs);
+ }
+
+ // Build helper maps for removeNodeReplicaSelect
+ Map<TConsensusGroupId, TRegionReplicaSet> remainMap = new HashMap<>();
+ remainReplicas.forEach(r -> remainMap.put(r.getRegionId(), r));
+
+ Map<TConsensusGroupId, String> regionDbMap = new HashMap<>();
+ dbAllocatedMap.forEach((db, list) -> list.forEach(r ->
regionDbMap.put(r.getRegionId(), db)));
+
+ // Baseline: random selection for comparison
+ Map<Integer, Integer> rndCount = new HashMap<>();
+ Map<Integer, Integer> planCount = new HashMap<>();
+ Set<Integer> rndNodes = new HashSet<>();
+ Set<Integer> planNodes = new HashSet<>();
+ int rndMax = 0, rndMin = Integer.MAX_VALUE;
+ int planMax = 0, planMin = Integer.MAX_VALUE;
+
+ AVAILABLE_DATA_NODE_MAP
+ .keySet()
+ .forEach(
+ n -> {
+ rndCount.put(n, 0);
+ planCount.put(n, 0);
+ });
+
+ for (TRegionReplicaSet replicaSet : globalAllocatedList) {
+ for (TDataNodeLocation loc : replicaSet.getDataNodeLocations()) {
+ rndCount.merge(loc.getDataNodeId(), 1, Integer::sum);
+ planCount.merge(loc.getDataNodeId(), 1, Integer::sum);
+ }
+ }
+
+ for (TRegionReplicaSet r : remainReplicas) {
+ TDataNodeLocation pick =
randomSelectNodeForRegion(r.getDataNodeLocations()).get();
+ LOGGER.info("Random Selected DataNode {} for Region {}",
pick.getDataNodeId(), r.regionId);
+ rndNodes.add(pick.getDataNodeId());
+ rndCount.merge(pick.getDataNodeId(), 1, Integer::sum);
+ }
+
+ LOGGER.info("Remain Replicas... :");
+ for (TRegionReplicaSet remainReplicaSet : remainReplicas) {
+ LOGGER.info("Region Group Id: {}", remainReplicaSet.regionId.id);
+ List<TDataNodeLocation> dataNodeLocations =
remainReplicaSet.getDataNodeLocations();
+ for (TDataNodeLocation dataNodeLocation : dataNodeLocations) {
+ LOGGER.info("DataNode: {}", dataNodeLocation.getDataNodeId());
+ }
+ }
+
+ // Call the method under test
+ Map<TConsensusGroupId, TDataNodeConfiguration> result =
+ GCR_ALLOCATOR.removeNodeReplicaSelect(
+ AVAILABLE_DATA_NODE_MAP,
+ FREE_SPACE_MAP,
+ globalAllocatedList,
+ regionDbMap,
+ dbAllocatedMap,
+ remainMap);
+
+ for (TConsensusGroupId regionId : result.keySet()) {
+ TDataNodeConfiguration selectedNode = result.get(regionId);
+
+ LOGGER.info(
+ "GCR Selected DataNode {} for Region {}",
+ selectedNode.getLocation().getDataNodeId(),
+ regionId);
+ planNodes.add(selectedNode.getLocation().getDataNodeId());
+ planCount.merge(selectedNode.getLocation().getDataNodeId(), 1,
Integer::sum);
+ }
+
+ // Calculate load distribution
+ for (int c : rndCount.values()) {
+ rndMax = Math.max(rndMax, c);
+ rndMin = Math.min(rndMin, c);
+ }
+ for (int c : planCount.values()) {
+ planMax = Math.max(planMax, c);
+ planMin = Math.min(planMin, c);
+ }
+
+ // Assertions
+ Assert.assertEquals(TEST_DATA_NODE_NUM - 1, planNodes.size());
+ Assert.assertTrue(planNodes.size() >= rndNodes.size());
+ Assert.assertTrue(rndMax >= planMax);
+ }
+
+ private Optional<TDataNodeLocation> randomSelectNodeForRegion(
+ List<TDataNodeLocation> regionReplicaNodes) {
+ List<TDataNodeConfiguration> dataNodeConfigurations =
+ new ArrayList<>(AVAILABLE_DATA_NODE_MAP.values());
+ // Randomly selected to ensure a basic load balancing
+ Collections.shuffle(dataNodeConfigurations);
+ return dataNodeConfigurations.stream()
+ .map(TDataNodeConfiguration::getLocation)
+ .filter(e -> !regionReplicaNodes.contains(e))
+ .findAny();
+ }
+}