This is an automated email from the ASF dual-hosted git repository.
yongzao pushed a commit to branch region-multi-database
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/region-multi-database by this
push:
new 4d5e1749706 multi-database leader balance
4d5e1749706 is described below
commit 4d5e1749706ece63e0941086bcf6e160a27208c3
Author: YongzaoDan <[email protected]>
AuthorDate: Wed Mar 6 17:09:53 2024 +0800
multi-database leader balance
---
.../manager/load/balancer/RouteBalancer.java | 1 +
.../region/GreedyRegionGroupAllocator.java | 22 +--
.../router/leader/GreedyLeaderBalancer.java | 1 +
.../balancer/router/leader/ILeaderBalancer.java | 3 +
.../router/leader/MinCostFlowLeaderBalancer.java | 209 +++++++++++++--------
.../manager/partition/PartitionManager.java | 12 ++
.../persistence/partition/PartitionInfo.java | 21 +++
.../GreedyCopySetRegionGroupAllocatorTest.java | 47 +++--
.../router/leader/GreedyLeaderBalancerTest.java | 5 +-
.../leader/LeaderBalancerComparisonTest.java | 3 +-
.../leader/MinCostFlowLeaderBalancerTest.java | 53 +++---
11 files changed, 240 insertions(+), 137 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index f8f1c92484f..deca1df1ec3 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -140,6 +140,7 @@ public class RouteBalancer {
Map<TConsensusGroupId, Integer> currentLeaderMap =
getLoadManager().getRegionLeaderMap();
Map<TConsensusGroupId, Integer> optimalLeaderMap =
leaderBalancer.generateOptimalLeaderDistribution(
+ getPartitionManager().getAllRegionGroupIdMap(regionGroupType),
getPartitionManager().getAllReplicaSetsMap(regionGroupType),
currentLeaderMap,
getNodeManager()
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java
index a689c727467..2cd4855947c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java
@@ -88,16 +88,16 @@ public class GreedyRegionGroupAllocator implements
IRegionGroupAllocator {
freeDiskSpaceMap.getOrDefault(datanodeId, 0d))));
// Sort weightList
- return priorityMap.entrySet().stream()
- .sorted(
- comparingByValue(
- (o1, o2) ->
- !Objects.equals(o1.getLeft(), o2.getLeft())
- // Compare the first key(The number of Regions) by
ascending order
- ? o1.getLeft() - o2.getLeft()
- // Compare the second key(The free disk space) by
descending order
- : (int) (o2.getRight() - o1.getRight())))
- .map(entry -> entry.getKey().deepCopy())
- .collect(Collectors.toList());
+ return priorityMap.entrySet().stream()
+ .sorted(
+ comparingByValue(
+ (o1, o2) ->
+ !Objects.equals(o1.getLeft(), o2.getLeft())
+ // Compare the first key(The number of Regions) by
ascending order
+ ? o1.getLeft() - o2.getLeft()
+ // Compare the second key(The free disk space) by
descending order
+ : (int) (o2.getRight() - o1.getRight())))
+ .map(entry -> entry.getKey().deepCopy())
+ .collect(Collectors.toList());
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
index 0d3f813a689..723c8462b7a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
@@ -47,6 +47,7 @@ public class GreedyLeaderBalancer implements ILeaderBalancer {
@Override
public Map<TConsensusGroupId, Integer> generateOptimalLeaderDistribution(
+ Map<String, List<TConsensusGroupId>> databaseRegionGroupMap,
Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
Map<TConsensusGroupId, Integer> regionLeaderMap,
Set<Integer> disabledDataNodeSet) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/ILeaderBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/ILeaderBalancer.java
index a69ccc9491d..8cd7fae7020 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/ILeaderBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/ILeaderBalancer.java
@@ -21,6 +21,7 @@ package
org.apache.iotdb.confignode.manager.load.balancer.router.leader;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -32,6 +33,7 @@ public interface ILeaderBalancer {
/**
* Generate an optimal leader distribution.
*
+ * @param databaseRegionGroupMap RegionGroup held by each Database
* @param regionReplicaSetMap All RegionGroups the cluster currently have
* @param regionLeaderMap The current leader of each RegionGroup
* @param disabledDataNodeSet The DataNodes that currently unable to
work(can't place
@@ -39,6 +41,7 @@ public interface ILeaderBalancer {
* @return Map<TConsensusGroupId, Integer>, The optimal leader distribution
*/
Map<TConsensusGroupId, Integer> generateOptimalLeaderDistribution(
+ Map<String, List<TConsensusGroupId>> databaseRegionGroupMap,
Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
Map<TConsensusGroupId, Integer> regionLeaderMap,
Set<Integer> disabledDataNodeSet);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
index 9b775d71cd9..d69c50b9843 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
@@ -25,15 +25,14 @@ import org.apache.iotdb.commons.utils.TestOnly;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
/** Leader distribution balancer that uses minimum cost flow algorithm */
public class MinCostFlowLeaderBalancer implements ILeaderBalancer {
@@ -41,8 +40,9 @@ public class MinCostFlowLeaderBalancer implements
ILeaderBalancer {
private static final int INFINITY = Integer.MAX_VALUE;
/** Input parameters */
- private final Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap;
+ private final Map<String, List<TConsensusGroupId>> databaseRegionGroupMap;
+ private final Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap;
private final Map<TConsensusGroupId, Integer> regionLeaderMap;
private final Set<Integer> disabledDataNodeSet;
@@ -55,10 +55,12 @@ public class MinCostFlowLeaderBalancer implements
ILeaderBalancer {
private int maxNode = T_NODE + 1;
// Map<RegionGroupId, rNode>
private final Map<TConsensusGroupId, Integer> rNodeMap;
- // Map<DataNodeId, dNode>
- private final Map<Integer, Integer> dNodeMap;
- // Map<dNode, DataNodeId>
- private final Map<Integer, Integer> dNodeReflect;
+ // Map<Database, Map<DataNodeId, sDNode>>
+ private final Map<String, Map<Integer, Integer>> sDNodeMap;
+ // Map<Database, Map<sDNode, DataNodeId>>
+ private final Map<String, Map<Integer, Integer>> sDNodeReflect;
+ // Map<DataNodeId, tDNode>
+ private final Map<Integer, Integer> tDNodeMap;
/** Graph edges */
// Maximum index of graph edges
@@ -75,22 +77,25 @@ public class MinCostFlowLeaderBalancer implements
ILeaderBalancer {
private int minimumCost = 0;
public MinCostFlowLeaderBalancer() {
- this.regionReplicaSetMap = new HashMap<>();
- this.regionLeaderMap = new HashMap<>();
- this.disabledDataNodeSet = new HashSet<>();
- this.rNodeMap = new HashMap<>();
- this.dNodeMap = new HashMap<>();
- this.dNodeReflect = new HashMap<>();
+ this.databaseRegionGroupMap = new TreeMap<>();
+ this.regionReplicaSetMap = new TreeMap<>();
+ this.regionLeaderMap = new TreeMap<>();
+ this.disabledDataNodeSet = new TreeSet<>();
+ this.rNodeMap = new TreeMap<>();
+ this.sDNodeMap = new TreeMap<>();
+ this.sDNodeReflect = new TreeMap<>();
+ this.tDNodeMap = new TreeMap<>();
this.minCostFlowEdges = new ArrayList<>();
}
@Override
public Map<TConsensusGroupId, Integer> generateOptimalLeaderDistribution(
+ Map<String, List<TConsensusGroupId>> databaseRegionGroupMap,
Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
Map<TConsensusGroupId, Integer> regionLeaderMap,
Set<Integer> disabledDataNodeSet) {
- initialize(regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
+ initialize(databaseRegionGroupMap, regionReplicaSetMap, regionLeaderMap,
disabledDataNodeSet);
Map<TConsensusGroupId, Integer> result;
constructMCFGraph();
@@ -102,21 +107,26 @@ public class MinCostFlowLeaderBalancer implements
ILeaderBalancer {
}
private void initialize(
+ Map<String, List<TConsensusGroupId>> databaseRegionGroupMap,
Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
Map<TConsensusGroupId, Integer> regionLeaderMap,
Set<Integer> disabledDataNodeSet) {
+ this.databaseRegionGroupMap.putAll(databaseRegionGroupMap);
this.regionReplicaSetMap.putAll(regionReplicaSetMap);
this.regionLeaderMap.putAll(regionLeaderMap);
this.disabledDataNodeSet.addAll(disabledDataNodeSet);
}
private void clear() {
+ this.databaseRegionGroupMap.clear();
this.regionReplicaSetMap.clear();
this.regionLeaderMap.clear();
this.disabledDataNodeSet.clear();
+
this.rNodeMap.clear();
- this.dNodeMap.clear();
- this.dNodeReflect.clear();
+ this.sDNodeMap.clear();
+ this.sDNodeReflect.clear();
+ this.tDNodeMap.clear();
this.minCostFlowEdges.clear();
this.nodeHeadEdge = null;
@@ -133,13 +143,30 @@ public class MinCostFlowLeaderBalancer implements
ILeaderBalancer {
this.minimumCost = 0;
/* Indicate nodes in mcf */
- for (TRegionReplicaSet regionReplicaSet : regionReplicaSetMap.values()) {
- rNodeMap.put(regionReplicaSet.getRegionId(), maxNode++);
- for (TDataNodeLocation dataNodeLocation :
regionReplicaSet.getDataNodeLocations()) {
- if (!dNodeMap.containsKey(dataNodeLocation.getDataNodeId())) {
- dNodeMap.put(dataNodeLocation.getDataNodeId(), maxNode);
- dNodeReflect.put(maxNode, dataNodeLocation.getDataNodeId());
- maxNode += 1;
+ for (Map.Entry<String, List<TConsensusGroupId>> databaseEntry :
+ databaseRegionGroupMap.entrySet()) {
+ String database = databaseEntry.getKey();
+ sDNodeMap.put(database, new TreeMap<>());
+ sDNodeReflect.put(database, new TreeMap<>());
+ List<TConsensusGroupId> regionGroupIds = databaseEntry.getValue();
+ for (TConsensusGroupId regionGroupId : regionGroupIds) {
+ rNodeMap.put(regionGroupId, maxNode++);
+ for (TDataNodeLocation dataNodeLocation :
+ regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
+ int dataNodeId = dataNodeLocation.getDataNodeId();
+ if (disabledDataNodeSet.contains(dataNodeId)) {
+ // Skip disabled DataNode
+ continue;
+ }
+ if (!sDNodeMap.get(database).containsKey(dataNodeId)) {
+ sDNodeMap.get(database).put(dataNodeId, maxNode);
+ sDNodeReflect.get(database).put(maxNode, dataNodeId);
+ maxNode += 1;
+ }
+ if (!tDNodeMap.containsKey(dataNodeId)) {
+ tDNodeMap.put(dataNodeId, maxNode);
+ maxNode += 1;
+ }
}
}
}
@@ -153,57 +180,74 @@ public class MinCostFlowLeaderBalancer implements
ILeaderBalancer {
/* Construct edges: sNode -> rNodes */
for (int rNode : rNodeMap.values()) {
- // Cost: 0
+ // Capacity: 1, Cost: 0, each RegionGroup should elect exactly 1 leader
addAdjacentEdges(S_NODE, rNode, 1, 0);
}
- /* Construct edges: rNodes -> dNodes */
- for (TRegionReplicaSet regionReplicaSet : regionReplicaSetMap.values()) {
- int rNode = rNodeMap.get(regionReplicaSet.getRegionId());
- for (TDataNodeLocation dataNodeLocation :
regionReplicaSet.getDataNodeLocations()) {
- int dNode = dNodeMap.get(dataNodeLocation.getDataNodeId());
- // Cost: 1 if the dNode is corresponded to the current leader of the
rNode,
- // 0 otherwise.
- // Therefore, the RegionGroup will keep the leader as constant as
possible.
- int cost =
- regionLeaderMap.getOrDefault(regionReplicaSet.getRegionId(), -1)
- == dataNodeLocation.getDataNodeId()
- ? 0
- : 1;
- addAdjacentEdges(rNode, dNode, 1, cost);
+ /* Construct edges: rNodes -> sdNodes */
+ for (Map.Entry<String, List<TConsensusGroupId>> databaseEntry :
+ databaseRegionGroupMap.entrySet()) {
+ String database = databaseEntry.getKey();
+ for (TConsensusGroupId regionGroupId : databaseEntry.getValue()) {
+ int rNode = rNodeMap.get(regionGroupId);
+ for (TDataNodeLocation dataNodeLocation :
+ regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
+ int dataNodeId = dataNodeLocation.getDataNodeId();
+ if (disabledDataNodeSet.contains(dataNodeId)) {
+ // Skip disabled DataNode
+ continue;
+ }
+ int sDNode = sDNodeMap.get(database).get(dataNodeId);
+ // Capacity: 1, Cost: 1 if sDNode is the current leader of the
rNode, 0 otherwise.
+ // Therefore, the RegionGroup will keep the leader as constant as
possible.
+ int cost = regionLeaderMap.getOrDefault(regionGroupId, -1) ==
dataNodeId ? 0 : 1;
+ addAdjacentEdges(rNode, sDNode, 1, cost);
+ }
}
}
- /* Construct edges: dNodes -> tNode */
- // Count the possible maximum number of leader in each DataNode
- Map<Integer, AtomicInteger> maxLeaderCounter = new ConcurrentHashMap<>();
- regionReplicaSetMap
- .values()
- .forEach(
- regionReplicaSet ->
- regionReplicaSet
- .getDataNodeLocations()
- .forEach(
- dataNodeLocation ->
- maxLeaderCounter
- .computeIfAbsent(
- dataNodeLocation.getDataNodeId(), empty ->
new AtomicInteger(0))
- .getAndIncrement()));
-
- for (Map.Entry<Integer, Integer> dNodeEntry : dNodeMap.entrySet()) {
- int dataNodeId = dNodeEntry.getKey();
- int dNode = dNodeEntry.getValue();
-
- if (disabledDataNodeSet.contains(dataNodeId)) {
- // Skip disabled DataNode
- continue;
+ /* Construct edges: sDNodes -> tDNodes */
+ for (Map.Entry<String, List<TConsensusGroupId>> databaseEntry :
+ databaseRegionGroupMap.entrySet()) {
+ String database = databaseEntry.getKey();
+ // Map<DataNodeId, leader number>
+ Map<Integer, Integer> leaderCounter = new TreeMap<>();
+ for (TConsensusGroupId regionGroupId : databaseEntry.getValue()) {
+ for (TDataNodeLocation dataNodeLocation :
+ regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
+ int dataNodeId = dataNodeLocation.getDataNodeId();
+ if (disabledDataNodeSet.contains(dataNodeId)) {
+ // Skip disabled DataNode
+ continue;
+ }
+ int sDNode = sDNodeMap.get(database).get(dataNodeId);
+ int tDNode = tDNodeMap.get(dataNodeId);
+ int leaderCount = leaderCounter.merge(dataNodeId, 1, Integer::sum);
+ // Capacity: 1, Cost: x^2 for the x-th edge at the current sDNode.
+ // Thus, the leader distribution will be as balance as possible
within each Database
+ // based on the Jensen's-Inequality.
+ addAdjacentEdges(sDNode, tDNode, 1, leaderCount * leaderCount);
+ }
}
+ }
- int maxLeaderCount = maxLeaderCounter.get(dataNodeId).get();
- for (int extraEdge = 1; extraEdge <= maxLeaderCount; extraEdge++) {
+ /* Construct edges: tDNodes -> tNode */
+ // Map<DataNodeId, possible max leader> Count the possible maximum number
of leader in each
+ // DataNode
+ Map<Integer, Integer> maxLeaderCounter = new TreeMap<>();
+ for (TRegionReplicaSet regionReplicaSet : regionReplicaSetMap.values()) {
+ for (TDataNodeLocation dataNodeLocation :
regionReplicaSet.getDataNodeLocations()) {
+ int dataNodeId = dataNodeLocation.getDataNodeId();
+ if (disabledDataNodeSet.contains(dataNodeId)) {
+ // Skip disabled DataNode
+ continue;
+ }
+ int tDNode = tDNodeMap.get(dataNodeId);
+ int leaderCount = maxLeaderCounter.merge(dataNodeId, 1, Integer::sum);
// Cost: x^2 for the x-th edge at the current dNode.
- // Thus, the leader distribution will be as balance as possible.
- addAdjacentEdges(dNode, T_NODE, 1, extraEdge * extraEdge);
+ // Thus, the leader distribution will be as balance as possible within
the cluster
+ // Based on the Jensen's-Inequality.
+ addAdjacentEdges(tDNode, T_NODE, 1, leaderCount * leaderCount);
}
}
}
@@ -310,22 +354,23 @@ public class MinCostFlowLeaderBalancer implements
ILeaderBalancer {
private Map<TConsensusGroupId, Integer> collectLeaderDistribution() {
Map<TConsensusGroupId, Integer> result = new ConcurrentHashMap<>();
- rNodeMap.forEach(
- (regionGroupId, rNode) -> {
- boolean matchLeader = false;
- for (int currentEdge = nodeHeadEdge[rNode];
- currentEdge >= 0;
- currentEdge = minCostFlowEdges.get(currentEdge).nextEdge) {
- MinCostFlowEdge edge = minCostFlowEdges.get(currentEdge);
- if (edge.destNode != S_NODE && edge.capacity == 0) {
- matchLeader = true;
- result.put(regionGroupId, dNodeReflect.get(edge.destNode));
- }
- }
- if (!matchLeader) {
- result.put(regionGroupId,
regionLeaderMap.getOrDefault(regionGroupId, -1));
- }
- });
+ databaseRegionGroupMap.forEach(
+ (database, regionGroupIds) -> regionGroupIds.forEach(
+ regionGroupId -> {
+ boolean matchLeader = false;
+ for (int currentEdge = nodeHeadEdge[rNodeMap.get(regionGroupId)];
+ currentEdge >= 0;
+ currentEdge = minCostFlowEdges.get(currentEdge).nextEdge) {
+ MinCostFlowEdge edge = minCostFlowEdges.get(currentEdge);
+ if (edge.destNode != S_NODE && edge.capacity == 0) {
+ matchLeader = true;
+ result.put(regionGroupId,
sDNodeReflect.get(database).get(edge.destNode));
+ }
+ }
+ if (!matchLeader) {
+ result.put(regionGroupId,
regionLeaderMap.getOrDefault(regionGroupId, -1));
+ }
+ }));
return result;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index cf3bdc1c8d9..613b881cd0f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -788,6 +788,18 @@ public class PartitionManager {
return partitionInfo.getRegionGroupCount(database, type);
}
+ /**
+ * Only leader use this interface.
+ *
+ * <p>Get the all RegionGroups currently in the cluster
+ *
+ * @param type SchemaRegion or DataRegion
+ * @return Map<Database, List<RegionGroupIds>>
+ */
+ public Map<String, List<TConsensusGroupId>>
getAllRegionGroupIdMap(TConsensusGroupType type) {
+ return partitionInfo.getAllRegionGroupIdMap(type);
+ }
+
/**
* Only leader use this interface.
*
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index cdb3e902df5..027cc23107d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -88,6 +88,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.TreeMap;
import java.util.UUID;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
@@ -801,6 +802,26 @@ public class PartitionInfo implements SnapshotProcessor {
return databasePartitionTables.get(database).getRegionGroupCount(type);
}
+ /**
+ * Only leader use this interface.
+ *
+ * <p>Get the all RegionGroups currently in the cluster
+ *
+ * @param type SchemaRegion or DataRegion
+ * @return Map<Database, List<RegionGroupIds>>
+ */
+ public Map<String, List<TConsensusGroupId>>
getAllRegionGroupIdMap(TConsensusGroupType type) {
+ Map<String, List<TConsensusGroupId>> result = new TreeMap<>();
+ databasePartitionTables
+ .forEach(
+ (database, databasePartitionTable) -> {
+ if (databasePartitionTable.isNotPreDeleted()) {
+ result.put(database,
databasePartitionTable.getAllRegionGroupIds(type));
+ }
+ });
+ return result;
+ }
+
/**
* Only leader use this interface.
*
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocatorTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocatorTest.java
index b3c7eb3c0a1..7bf89eeff98 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocatorTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocatorTest.java
@@ -99,16 +99,20 @@ public class GreedyCopySetRegionGroupAllocatorTest {
greedyCopySetDatabaseResult.put(i, new ArrayList<>());
}
for (int index = 0; index < dataRegionGroupPerDatabase *
TEST_DATABASE_NUM; index++) {
- TRegionReplicaSet greedyRegionGroup =
GREEDY_ALLOCATOR.generateOptimalRegionReplicasDistribution(
- AVAILABLE_DATA_NODE_MAP,
- FREE_SPACE_MAP,
- greedyResult,
- greedyResult,
- replicationFactor,
- new TConsensusGroupId(TConsensusGroupType.DataRegion, index));
+ TRegionReplicaSet greedyRegionGroup =
+ GREEDY_ALLOCATOR.generateOptimalRegionReplicasDistribution(
+ AVAILABLE_DATA_NODE_MAP,
+ FREE_SPACE_MAP,
+ greedyResult,
+ greedyResult,
+ replicationFactor,
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, index));
greedyResult.add(greedyRegionGroup);
- greedyRegionGroup.getDataNodeLocations().forEach(
- dataNodeLocation ->
greedyRegionCounter.merge(dataNodeLocation.getDataNodeId(), 1, Integer::sum));
+ greedyRegionGroup
+ .getDataNodeLocations()
+ .forEach(
+ dataNodeLocation ->
+ greedyRegionCounter.merge(dataNodeLocation.getDataNodeId(),
1, Integer::sum));
int databaseId = RANDOM.nextInt(TEST_DATABASE_NUM);
TRegionReplicaSet greedyCopySetRegionGroup =
GREEDY_COPY_SET_ALLOCATOR.generateOptimalRegionReplicasDistribution(
@@ -120,20 +124,23 @@ public class GreedyCopySetRegionGroupAllocatorTest {
new TConsensusGroupId(TConsensusGroupType.DataRegion, index));
greedyCopySetResult.add(greedyCopySetRegionGroup);
greedyCopySetDatabaseResult.get(databaseId).add(greedyCopySetRegionGroup);
- greedyCopySetRegionGroup.getDataNodeLocations().forEach(
- dataNodeLocation -> {
- greedyCopySetRegionCounter.merge(dataNodeLocation.getDataNodeId(),
1, Integer::sum);
- greedyCopySetDatabaseRegionCounter
- .computeIfAbsent(databaseId, empty -> new TreeMap<>())
- .merge(dataNodeLocation.getDataNodeId(), 1, Integer::sum);
- });
+ greedyCopySetRegionGroup
+ .getDataNodeLocations()
+ .forEach(
+ dataNodeLocation -> {
+
greedyCopySetRegionCounter.merge(dataNodeLocation.getDataNodeId(), 1,
Integer::sum);
+ greedyCopySetDatabaseRegionCounter
+ .computeIfAbsent(databaseId, empty -> new TreeMap<>())
+ .merge(dataNodeLocation.getDataNodeId(), 1, Integer::sum);
+ });
LOGGER.info("After allocate RegionGroup: {}", index);
for (int i = 0; i < TEST_DATABASE_NUM; i++) {
LOGGER.info("Database {}: {}", i,
greedyCopySetDatabaseRegionCounter.get(i));
}
LOGGER.info("Cluster : {}", greedyCopySetRegionCounter);
for (int i = 1; i <= TEST_DATA_NODE_NUM; i++) {
- Assert.assertTrue(greedyCopySetRegionCounter.getOrDefault(i, 0) <=
DATA_REGION_PER_DATA_NODE);
+ Assert.assertTrue(
+ greedyCopySetRegionCounter.getOrDefault(i, 0) <=
DATA_REGION_PER_DATA_NODE);
}
}
@@ -208,9 +215,11 @@ public class GreedyCopySetRegionGroupAllocatorTest {
for (int j = 1; j <= TEST_DATA_NODE_NUM; j++) {
if (greedyCopySetDatabaseRegionCounter.get(i).containsKey(j)) {
greedyCopySetMinRegionCount =
- Math.min(greedyCopySetMinRegionCount,
greedyCopySetDatabaseRegionCounter.get(i).get(j));
+ Math.min(
+ greedyCopySetMinRegionCount,
greedyCopySetDatabaseRegionCounter.get(i).get(j));
greedyCopySetMaxRegionCount =
- Math.max(greedyCopySetMaxRegionCount,
greedyCopySetDatabaseRegionCounter.get(i).get(j));
+ Math.max(
+ greedyCopySetMaxRegionCount,
greedyCopySetDatabaseRegionCounter.get(i).get(j));
}
}
// The maximal Region count - minimal Region count should be less than
or equal to 1 for each
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java
index b676a62ecfb..a7f4103345f 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java
@@ -32,6 +32,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -75,7 +76,7 @@ public class GreedyLeaderBalancerTest {
}
Map<TConsensusGroupId, Integer> leaderDistribution =
- BALANCER.generateOptimalLeaderDistribution(
+ BALANCER.generateOptimalLeaderDistribution(new TreeMap<>(),
regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
Map<Integer, AtomicInteger> leaderCounter = new ConcurrentHashMap<>();
leaderDistribution.forEach(
@@ -128,7 +129,7 @@ public class GreedyLeaderBalancerTest {
}
Map<TConsensusGroupId, Integer> leaderDistribution =
- BALANCER.generateOptimalLeaderDistribution(
+ BALANCER.generateOptimalLeaderDistribution(new TreeMap<>(),
regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
Map<Integer, AtomicInteger> leaderCounter = new ConcurrentHashMap<>();
leaderDistribution.forEach(
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java
index 9c7bda4287f..f289d8746fa 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java
@@ -38,6 +38,7 @@ import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Random;
import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -263,7 +264,7 @@ public class LeaderBalancerComparisonTest {
Map<TConsensusGroupId, Integer> lastDistribution = new
ConcurrentHashMap<>(regionLeaderMap);
for (int rounds = 0; rounds < 1000; rounds++) {
Map<TConsensusGroupId, Integer> currentDistribution =
- leaderBalancer.generateOptimalLeaderDistribution(
+ leaderBalancer.generateOptimalLeaderDistribution(new TreeMap<>(),
regionReplicaSetMap, lastDistribution, disabledDataNodeSet);
if (currentDistribution.equals(lastDistribution)) {
// The leader distribution is stable
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancerTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancerTest.java
index f676956780e..6807d8b314b 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancerTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancerTest.java
@@ -28,6 +28,7 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -35,13 +36,17 @@ import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
public class MinCostFlowLeaderBalancerTest {
private static final MinCostFlowLeaderBalancer BALANCER = new
MinCostFlowLeaderBalancer();
+ private static final String DATABASE = "root.database";
+
/** This test shows a simple case that greedy algorithm might fail */
@Test
public void optimalLeaderDistributionTest() {
@@ -50,12 +55,10 @@ public class MinCostFlowLeaderBalancerTest {
for (int i = 0; i < 3; i++) {
regionGroupIds.add(new TConsensusGroupId(TConsensusGroupType.DataRegion,
i));
}
-
List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
for (int i = 0; i < 4; i++) {
dataNodeLocations.add(new TDataNodeLocation().setDataNodeId(i));
}
-
List<TRegionReplicaSet> regionReplicaSets = new ArrayList<>();
regionReplicaSets.add(
new TRegionReplicaSet(
@@ -74,11 +77,13 @@ public class MinCostFlowLeaderBalancerTest {
dataNodeLocations.get(0), dataNodeLocations.get(2),
dataNodeLocations.get(3))));
// Prepare input parameters
- Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new
HashMap<>();
+ Map<String, List<TConsensusGroupId>> databaseRegionGroupMap = new
TreeMap<>();
+ databaseRegionGroupMap.put(DATABASE, regionGroupIds);
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new
TreeMap<>();
regionReplicaSets.forEach(
regionReplicaSet ->
regionReplicaSetMap.put(regionReplicaSet.getRegionId(),
regionReplicaSet));
- Map<TConsensusGroupId, Integer> regionLeaderMap = new HashMap<>();
+ Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>();
regionReplicaSets.forEach(
regionReplicaSet ->
regionLeaderMap.put(regionReplicaSet.getRegionId(), 0));
Set<Integer> disabledDataNodeSet = new HashSet<>();
@@ -86,7 +91,7 @@ public class MinCostFlowLeaderBalancerTest {
// Do balancing
Map<TConsensusGroupId, Integer> leaderDistribution =
- BALANCER.generateOptimalLeaderDistribution(
+ BALANCER.generateOptimalLeaderDistribution(databaseRegionGroupMap,
regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
// All RegionGroup got a leader
Assert.assertEquals(3, leaderDistribution.size());
@@ -94,8 +99,9 @@ public class MinCostFlowLeaderBalancerTest {
Assert.assertEquals(3, new HashSet<>(leaderDistribution.values()).size());
// MaxFlow is 3
Assert.assertEquals(3, BALANCER.getMaximumFlow());
- // MinimumCost is 3(switch leader cost) + 3(load cost, 1 for each DataNode)
- Assert.assertEquals(3 + 3, BALANCER.getMinimumCost());
+ // MinimumCost is 3(switch leader cost) + 3(load cost, rNode -> sDNode)
+ // + 3(load cost, sDNode -> tDNode)
+ Assert.assertEquals(3 + 3 + 3, BALANCER.getMinimumCost());
}
/** The leader will remain the same if all DataNodes are disabled */
@@ -110,9 +116,11 @@ public class MinCostFlowLeaderBalancerTest {
new TDataNodeLocation().setDataNodeId(2)));
// Prepare input parameters
- Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new
HashMap<>();
+ Map<String, List<TConsensusGroupId>> databaseRegionGroupMap = new
TreeMap<>();
+ databaseRegionGroupMap.put(DATABASE,
Collections.singletonList(regionReplicaSet.getRegionId()));
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new
TreeMap<>();
regionReplicaSetMap.put(regionReplicaSet.getRegionId(), regionReplicaSet);
- Map<TConsensusGroupId, Integer> regionLeaderMap = new HashMap<>();
+ Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>();
regionLeaderMap.put(regionReplicaSet.getRegionId(), 1);
Set<Integer> disabledDataNodeSet = new HashSet<>();
disabledDataNodeSet.add(0);
@@ -121,7 +129,7 @@ public class MinCostFlowLeaderBalancerTest {
// Do balancing
Map<TConsensusGroupId, Integer> leaderDistribution =
- BALANCER.generateOptimalLeaderDistribution(
+ BALANCER.generateOptimalLeaderDistribution(databaseRegionGroupMap,
regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
Assert.assertEquals(1, leaderDistribution.size());
Assert.assertEquals(1, new HashSet<>(leaderDistribution.values()).size());
@@ -148,13 +156,15 @@ public class MinCostFlowLeaderBalancerTest {
// The loadCost for each DataNode are the same
int x = regionGroupNum / dataNodeNum;
- // i.e. formula of 1^2 + 2^2 + 3^2 + ...
- int loadCost = x * (x + 1) * (2 * x + 1) / 6;
+ // i.e. formula of (1^2 + 2^2 + 3^2 + ...) * 2
+ int loadCost = x * (x + 1) * (2 * x + 1) / 3;
int dataNodeId = 0;
Random random = new Random();
- Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new
HashMap<>();
- Map<TConsensusGroupId, Integer> regionLeaderMap = new HashMap<>();
+ Map<String, List<TConsensusGroupId>> databaseRegionGroupMap = new
TreeMap<>();
+ databaseRegionGroupMap.put(DATABASE, new ArrayList<>());
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new
TreeMap<>();
+ Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>();
for (int i = 0; i < regionGroupNum; i++) {
TConsensusGroupId regionGroupId = new
TConsensusGroupId(TConsensusGroupType.DataRegion, i);
int leaderId = (dataNodeId + random.nextInt(replicationFactor)) %
dataNodeNum;
@@ -166,31 +176,30 @@ public class MinCostFlowLeaderBalancerTest {
dataNodeId = (dataNodeId + 1) % dataNodeNum;
}
+ databaseRegionGroupMap.get(DATABASE).add(regionGroupId);
regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
regionLeaderMap.put(regionGroupId, leaderId);
}
// Do balancing
Map<TConsensusGroupId, Integer> leaderDistribution =
- BALANCER.generateOptimalLeaderDistribution(
+ BALANCER.generateOptimalLeaderDistribution(databaseRegionGroupMap,
regionReplicaSetMap, regionLeaderMap, new HashSet<>());
// All RegionGroup got a leader
Assert.assertEquals(regionGroupNum, leaderDistribution.size());
- Map<Integer, AtomicInteger> leaderCounter = new ConcurrentHashMap<>();
+ Map<Integer, Integer> leaderCounter = new ConcurrentHashMap<>();
leaderDistribution
.values()
.forEach(
leaderId ->
- leaderCounter
- .computeIfAbsent(leaderId, empty -> new AtomicInteger(0))
- .getAndIncrement());
+ leaderCounter.merge(leaderId, 1, Integer::sum));
// Every DataNode has leader
Assert.assertEquals(dataNodeNum, leaderCounter.size());
// Every DataNode has exactly regionGroupNum / dataNodeNum leaders
- leaderCounter
- .values()
- .forEach(leaderNum -> Assert.assertEquals(regionGroupNum /
dataNodeNum, leaderNum.get()));
+ for (int i = 0; i < dataNodeNum; i++) {
+ Assert.assertEquals(regionGroupNum / dataNodeNum,
leaderCounter.get(i).intValue());
+ }
// MaxFlow is regionGroupNum
Assert.assertEquals(regionGroupNum, BALANCER.getMaximumFlow());