This is an automated email from the ASF dual-hosted git repository.

yongzao pushed a commit to branch region-multi-database
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/region-multi-database by this 
push:
     new 4d5e1749706 multi-database leader balance
4d5e1749706 is described below

commit 4d5e1749706ece63e0941086bcf6e160a27208c3
Author: YongzaoDan <[email protected]>
AuthorDate: Wed Mar 6 17:09:53 2024 +0800

    multi-database leader balance
---
 .../manager/load/balancer/RouteBalancer.java       |   1 +
 .../region/GreedyRegionGroupAllocator.java         |  22 +--
 .../router/leader/GreedyLeaderBalancer.java        |   1 +
 .../balancer/router/leader/ILeaderBalancer.java    |   3 +
 .../router/leader/MinCostFlowLeaderBalancer.java   | 209 +++++++++++++--------
 .../manager/partition/PartitionManager.java        |  12 ++
 .../persistence/partition/PartitionInfo.java       |  21 +++
 .../GreedyCopySetRegionGroupAllocatorTest.java     |  47 +++--
 .../router/leader/GreedyLeaderBalancerTest.java    |   5 +-
 .../leader/LeaderBalancerComparisonTest.java       |   3 +-
 .../leader/MinCostFlowLeaderBalancerTest.java      |  53 +++---
 11 files changed, 240 insertions(+), 137 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index f8f1c92484f..deca1df1ec3 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -140,6 +140,7 @@ public class RouteBalancer {
     Map<TConsensusGroupId, Integer> currentLeaderMap = 
getLoadManager().getRegionLeaderMap();
     Map<TConsensusGroupId, Integer> optimalLeaderMap =
         leaderBalancer.generateOptimalLeaderDistribution(
+          getPartitionManager().getAllRegionGroupIdMap(regionGroupType),
             getPartitionManager().getAllReplicaSetsMap(regionGroupType),
             currentLeaderMap,
             getNodeManager()
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java
index a689c727467..2cd4855947c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java
@@ -88,16 +88,16 @@ public class GreedyRegionGroupAllocator implements 
IRegionGroupAllocator {
                     freeDiskSpaceMap.getOrDefault(datanodeId, 0d))));
 
     // Sort weightList
-   return priorityMap.entrySet().stream()
-            .sorted(
-                comparingByValue(
-                    (o1, o2) ->
-                        !Objects.equals(o1.getLeft(), o2.getLeft())
-                            // Compare the first key(The number of Regions) by 
ascending order
-                            ? o1.getLeft() - o2.getLeft()
-                            // Compare the second key(The free disk space) by 
descending order
-                            : (int) (o2.getRight() - o1.getRight())))
-            .map(entry -> entry.getKey().deepCopy())
-            .collect(Collectors.toList());
+    return priorityMap.entrySet().stream()
+        .sorted(
+            comparingByValue(
+                (o1, o2) ->
+                    !Objects.equals(o1.getLeft(), o2.getLeft())
+                        // Compare the first key(The number of Regions) by 
ascending order
+                        ? o1.getLeft() - o2.getLeft()
+                        // Compare the second key(The free disk space) by 
descending order
+                        : (int) (o2.getRight() - o1.getRight())))
+        .map(entry -> entry.getKey().deepCopy())
+        .collect(Collectors.toList());
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
index 0d3f813a689..723c8462b7a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
@@ -47,6 +47,7 @@ public class GreedyLeaderBalancer implements ILeaderBalancer {
 
   @Override
   public Map<TConsensusGroupId, Integer> generateOptimalLeaderDistribution(
+    Map<String, List<TConsensusGroupId>> databaseRegionGroupMap,
       Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
       Map<TConsensusGroupId, Integer> regionLeaderMap,
       Set<Integer> disabledDataNodeSet) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/ILeaderBalancer.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/ILeaderBalancer.java
index a69ccc9491d..8cd7fae7020 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/ILeaderBalancer.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/ILeaderBalancer.java
@@ -21,6 +21,7 @@ package 
org.apache.iotdb.confignode.manager.load.balancer.router.leader;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -32,6 +33,7 @@ public interface ILeaderBalancer {
   /**
    * Generate an optimal leader distribution.
    *
+   * @param databaseRegionGroupMap RegionGroup held by each Database
    * @param regionReplicaSetMap All RegionGroups the cluster currently have
    * @param regionLeaderMap The current leader of each RegionGroup
    * @param disabledDataNodeSet The DataNodes that currently unable to 
work(can't place
@@ -39,6 +41,7 @@ public interface ILeaderBalancer {
    * @return Map<TConsensusGroupId, Integer>, The optimal leader distribution
    */
   Map<TConsensusGroupId, Integer> generateOptimalLeaderDistribution(
+      Map<String, List<TConsensusGroupId>> databaseRegionGroupMap,
       Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
       Map<TConsensusGroupId, Integer> regionLeaderMap,
       Set<Integer> disabledDataNodeSet);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
index 9b775d71cd9..d69c50b9843 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
@@ -25,15 +25,14 @@ import org.apache.iotdb.commons.utils.TestOnly;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /** Leader distribution balancer that uses minimum cost flow algorithm */
 public class MinCostFlowLeaderBalancer implements ILeaderBalancer {
@@ -41,8 +40,9 @@ public class MinCostFlowLeaderBalancer implements 
ILeaderBalancer {
   private static final int INFINITY = Integer.MAX_VALUE;
 
   /** Input parameters */
-  private final Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap;
+  private final Map<String, List<TConsensusGroupId>> databaseRegionGroupMap;
 
+  private final Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap;
   private final Map<TConsensusGroupId, Integer> regionLeaderMap;
   private final Set<Integer> disabledDataNodeSet;
 
@@ -55,10 +55,12 @@ public class MinCostFlowLeaderBalancer implements 
ILeaderBalancer {
   private int maxNode = T_NODE + 1;
   // Map<RegionGroupId, rNode>
   private final Map<TConsensusGroupId, Integer> rNodeMap;
-  // Map<DataNodeId, dNode>
-  private final Map<Integer, Integer> dNodeMap;
-  // Map<dNode, DataNodeId>
-  private final Map<Integer, Integer> dNodeReflect;
+  // Map<Database, Map<DataNodeId, sDNode>>
+  private final Map<String, Map<Integer, Integer>> sDNodeMap;
+  // Map<Database, Map<sDNode, DataNodeId>>
+  private final Map<String, Map<Integer, Integer>> sDNodeReflect;
+  // Map<DataNodeId, tDNode>
+  private final Map<Integer, Integer> tDNodeMap;
 
   /** Graph edges */
   // Maximum index of graph edges
@@ -75,22 +77,25 @@ public class MinCostFlowLeaderBalancer implements 
ILeaderBalancer {
   private int minimumCost = 0;
 
   public MinCostFlowLeaderBalancer() {
-    this.regionReplicaSetMap = new HashMap<>();
-    this.regionLeaderMap = new HashMap<>();
-    this.disabledDataNodeSet = new HashSet<>();
-    this.rNodeMap = new HashMap<>();
-    this.dNodeMap = new HashMap<>();
-    this.dNodeReflect = new HashMap<>();
+    this.databaseRegionGroupMap = new TreeMap<>();
+    this.regionReplicaSetMap = new TreeMap<>();
+    this.regionLeaderMap = new TreeMap<>();
+    this.disabledDataNodeSet = new TreeSet<>();
+    this.rNodeMap = new TreeMap<>();
+    this.sDNodeMap = new TreeMap<>();
+    this.sDNodeReflect = new TreeMap<>();
+    this.tDNodeMap = new TreeMap<>();
     this.minCostFlowEdges = new ArrayList<>();
   }
 
   @Override
   public Map<TConsensusGroupId, Integer> generateOptimalLeaderDistribution(
+      Map<String, List<TConsensusGroupId>> databaseRegionGroupMap,
       Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
       Map<TConsensusGroupId, Integer> regionLeaderMap,
       Set<Integer> disabledDataNodeSet) {
 
-    initialize(regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
+    initialize(databaseRegionGroupMap, regionReplicaSetMap, regionLeaderMap, 
disabledDataNodeSet);
 
     Map<TConsensusGroupId, Integer> result;
     constructMCFGraph();
@@ -102,21 +107,26 @@ public class MinCostFlowLeaderBalancer implements 
ILeaderBalancer {
   }
 
   private void initialize(
+      Map<String, List<TConsensusGroupId>> databaseRegionGroupMap,
       Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
       Map<TConsensusGroupId, Integer> regionLeaderMap,
       Set<Integer> disabledDataNodeSet) {
+    this.databaseRegionGroupMap.putAll(databaseRegionGroupMap);
     this.regionReplicaSetMap.putAll(regionReplicaSetMap);
     this.regionLeaderMap.putAll(regionLeaderMap);
     this.disabledDataNodeSet.addAll(disabledDataNodeSet);
   }
 
   private void clear() {
+    this.databaseRegionGroupMap.clear();
     this.regionReplicaSetMap.clear();
     this.regionLeaderMap.clear();
     this.disabledDataNodeSet.clear();
+
     this.rNodeMap.clear();
-    this.dNodeMap.clear();
-    this.dNodeReflect.clear();
+    this.sDNodeMap.clear();
+    this.sDNodeReflect.clear();
+    this.tDNodeMap.clear();
     this.minCostFlowEdges.clear();
 
     this.nodeHeadEdge = null;
@@ -133,13 +143,30 @@ public class MinCostFlowLeaderBalancer implements 
ILeaderBalancer {
     this.minimumCost = 0;
 
     /* Indicate nodes in mcf */
-    for (TRegionReplicaSet regionReplicaSet : regionReplicaSetMap.values()) {
-      rNodeMap.put(regionReplicaSet.getRegionId(), maxNode++);
-      for (TDataNodeLocation dataNodeLocation : 
regionReplicaSet.getDataNodeLocations()) {
-        if (!dNodeMap.containsKey(dataNodeLocation.getDataNodeId())) {
-          dNodeMap.put(dataNodeLocation.getDataNodeId(), maxNode);
-          dNodeReflect.put(maxNode, dataNodeLocation.getDataNodeId());
-          maxNode += 1;
+    for (Map.Entry<String, List<TConsensusGroupId>> databaseEntry :
+        databaseRegionGroupMap.entrySet()) {
+      String database = databaseEntry.getKey();
+      sDNodeMap.put(database, new TreeMap<>());
+      sDNodeReflect.put(database, new TreeMap<>());
+      List<TConsensusGroupId> regionGroupIds = databaseEntry.getValue();
+      for (TConsensusGroupId regionGroupId : regionGroupIds) {
+        rNodeMap.put(regionGroupId, maxNode++);
+        for (TDataNodeLocation dataNodeLocation :
+            regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
+          int dataNodeId = dataNodeLocation.getDataNodeId();
+          if (disabledDataNodeSet.contains(dataNodeId)) {
+            // Skip disabled DataNode
+            continue;
+          }
+          if (!sDNodeMap.get(database).containsKey(dataNodeId)) {
+            sDNodeMap.get(database).put(dataNodeId, maxNode);
+            sDNodeReflect.get(database).put(maxNode, dataNodeId);
+            maxNode += 1;
+          }
+          if (!tDNodeMap.containsKey(dataNodeId)) {
+            tDNodeMap.put(dataNodeId, maxNode);
+            maxNode += 1;
+          }
         }
       }
     }
@@ -153,57 +180,74 @@ public class MinCostFlowLeaderBalancer implements 
ILeaderBalancer {
 
     /* Construct edges: sNode -> rNodes */
     for (int rNode : rNodeMap.values()) {
-      // Cost: 0
+      // Capacity: 1, Cost: 0, each RegionGroup should elect exactly 1 leader
       addAdjacentEdges(S_NODE, rNode, 1, 0);
     }
 
-    /* Construct edges: rNodes -> dNodes */
-    for (TRegionReplicaSet regionReplicaSet : regionReplicaSetMap.values()) {
-      int rNode = rNodeMap.get(regionReplicaSet.getRegionId());
-      for (TDataNodeLocation dataNodeLocation : 
regionReplicaSet.getDataNodeLocations()) {
-        int dNode = dNodeMap.get(dataNodeLocation.getDataNodeId());
-        // Cost: 1 if the dNode is corresponded to the current leader of the 
rNode,
-        //       0 otherwise.
-        // Therefore, the RegionGroup will keep the leader as constant as 
possible.
-        int cost =
-            regionLeaderMap.getOrDefault(regionReplicaSet.getRegionId(), -1)
-                    == dataNodeLocation.getDataNodeId()
-                ? 0
-                : 1;
-        addAdjacentEdges(rNode, dNode, 1, cost);
+    /* Construct edges: rNodes -> sdNodes */
+    for (Map.Entry<String, List<TConsensusGroupId>> databaseEntry :
+        databaseRegionGroupMap.entrySet()) {
+      String database = databaseEntry.getKey();
+      for (TConsensusGroupId regionGroupId : databaseEntry.getValue()) {
+        int rNode = rNodeMap.get(regionGroupId);
+        for (TDataNodeLocation dataNodeLocation :
+            regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
+          int dataNodeId = dataNodeLocation.getDataNodeId();
+          if (disabledDataNodeSet.contains(dataNodeId)) {
+            // Skip disabled DataNode
+            continue;
+          }
+          int sDNode = sDNodeMap.get(database).get(dataNodeId);
+          // Capacity: 1, Cost: 1 if sDNode is the current leader of the 
rNode, 0 otherwise.
+          // Therefore, the RegionGroup will keep the leader as constant as 
possible.
+          int cost = regionLeaderMap.getOrDefault(regionGroupId, -1) == 
dataNodeId ? 0 : 1;
+          addAdjacentEdges(rNode, sDNode, 1, cost);
+        }
       }
     }
 
-    /* Construct edges: dNodes -> tNode */
-    // Count the possible maximum number of leader in each DataNode
-    Map<Integer, AtomicInteger> maxLeaderCounter = new ConcurrentHashMap<>();
-    regionReplicaSetMap
-        .values()
-        .forEach(
-            regionReplicaSet ->
-                regionReplicaSet
-                    .getDataNodeLocations()
-                    .forEach(
-                        dataNodeLocation ->
-                            maxLeaderCounter
-                                .computeIfAbsent(
-                                    dataNodeLocation.getDataNodeId(), empty -> 
new AtomicInteger(0))
-                                .getAndIncrement()));
-
-    for (Map.Entry<Integer, Integer> dNodeEntry : dNodeMap.entrySet()) {
-      int dataNodeId = dNodeEntry.getKey();
-      int dNode = dNodeEntry.getValue();
-
-      if (disabledDataNodeSet.contains(dataNodeId)) {
-        // Skip disabled DataNode
-        continue;
+    /* Construct edges: sDNodes -> tDNodes */
+    for (Map.Entry<String, List<TConsensusGroupId>> databaseEntry :
+        databaseRegionGroupMap.entrySet()) {
+      String database = databaseEntry.getKey();
+      // Map<DataNodeId, leader number>
+      Map<Integer, Integer> leaderCounter = new TreeMap<>();
+      for (TConsensusGroupId regionGroupId : databaseEntry.getValue()) {
+        for (TDataNodeLocation dataNodeLocation :
+            regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
+          int dataNodeId = dataNodeLocation.getDataNodeId();
+          if (disabledDataNodeSet.contains(dataNodeId)) {
+            // Skip disabled DataNode
+            continue;
+          }
+          int sDNode = sDNodeMap.get(database).get(dataNodeId);
+          int tDNode = tDNodeMap.get(dataNodeId);
+          int leaderCount = leaderCounter.merge(dataNodeId, 1, Integer::sum);
+          // Capacity: 1, Cost: x^2 for the x-th edge at the current sDNode.
+          // Thus, the leader distribution will be as balance as possible 
within each Database
+          // based on the Jensen's-Inequality.
+          addAdjacentEdges(sDNode, tDNode, 1, leaderCount * leaderCount);
+        }
       }
+    }
 
-      int maxLeaderCount = maxLeaderCounter.get(dataNodeId).get();
-      for (int extraEdge = 1; extraEdge <= maxLeaderCount; extraEdge++) {
+    /* Construct edges: tDNodes -> tNode */
+    // Map<DataNodeId, possible max leader> Count the possible maximum number 
of leader in each
+    // DataNode
+    Map<Integer, Integer> maxLeaderCounter = new TreeMap<>();
+    for (TRegionReplicaSet regionReplicaSet : regionReplicaSetMap.values()) {
+      for (TDataNodeLocation dataNodeLocation : 
regionReplicaSet.getDataNodeLocations()) {
+        int dataNodeId = dataNodeLocation.getDataNodeId();
+        if (disabledDataNodeSet.contains(dataNodeId)) {
+          // Skip disabled DataNode
+          continue;
+        }
+        int tDNode = tDNodeMap.get(dataNodeId);
+        int leaderCount = maxLeaderCounter.merge(dataNodeId, 1, Integer::sum);
         // Cost: x^2 for the x-th edge at the current dNode.
-        // Thus, the leader distribution will be as balance as possible.
-        addAdjacentEdges(dNode, T_NODE, 1, extraEdge * extraEdge);
+        // Thus, the leader distribution will be as balance as possible within 
the cluster
+        // Based on the Jensen's-Inequality.
+        addAdjacentEdges(tDNode, T_NODE, 1, leaderCount * leaderCount);
       }
     }
   }
@@ -310,22 +354,23 @@ public class MinCostFlowLeaderBalancer implements 
ILeaderBalancer {
   private Map<TConsensusGroupId, Integer> collectLeaderDistribution() {
     Map<TConsensusGroupId, Integer> result = new ConcurrentHashMap<>();
 
-    rNodeMap.forEach(
-        (regionGroupId, rNode) -> {
-          boolean matchLeader = false;
-          for (int currentEdge = nodeHeadEdge[rNode];
-              currentEdge >= 0;
-              currentEdge = minCostFlowEdges.get(currentEdge).nextEdge) {
-            MinCostFlowEdge edge = minCostFlowEdges.get(currentEdge);
-            if (edge.destNode != S_NODE && edge.capacity == 0) {
-              matchLeader = true;
-              result.put(regionGroupId, dNodeReflect.get(edge.destNode));
-            }
-          }
-          if (!matchLeader) {
-            result.put(regionGroupId, 
regionLeaderMap.getOrDefault(regionGroupId, -1));
-          }
-        });
+    databaseRegionGroupMap.forEach(
+        (database, regionGroupIds) -> regionGroupIds.forEach(
+            regionGroupId -> {
+              boolean matchLeader = false;
+              for (int currentEdge = nodeHeadEdge[rNodeMap.get(regionGroupId)];
+                  currentEdge >= 0;
+                  currentEdge = minCostFlowEdges.get(currentEdge).nextEdge) {
+                MinCostFlowEdge edge = minCostFlowEdges.get(currentEdge);
+                if (edge.destNode != S_NODE && edge.capacity == 0) {
+                  matchLeader = true;
+                  result.put(regionGroupId, 
sDNodeReflect.get(database).get(edge.destNode));
+                }
+              }
+              if (!matchLeader) {
+                result.put(regionGroupId, 
regionLeaderMap.getOrDefault(regionGroupId, -1));
+              }
+            }));
 
     return result;
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index cf3bdc1c8d9..613b881cd0f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -788,6 +788,18 @@ public class PartitionManager {
     return partitionInfo.getRegionGroupCount(database, type);
   }
 
+  /**
+   * Only leader use this interface.
+   *
+   * <p>Get the all RegionGroups currently in the cluster
+   *
+   * @param type SchemaRegion or DataRegion
+   * @return Map<Database, List<RegionGroupIds>>
+   */
+  public Map<String, List<TConsensusGroupId>> 
getAllRegionGroupIdMap(TConsensusGroupType type) {
+    return partitionInfo.getAllRegionGroupIdMap(type);
+  }
+
   /**
    * Only leader use this interface.
    *
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index cdb3e902df5..027cc23107d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -88,6 +88,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.UUID;
 import java.util.Vector;
 import java.util.concurrent.ConcurrentHashMap;
@@ -801,6 +802,26 @@ public class PartitionInfo implements SnapshotProcessor {
     return databasePartitionTables.get(database).getRegionGroupCount(type);
   }
 
+  /**
+   * Only leader use this interface.
+   *
+   * <p>Get the all RegionGroups currently in the cluster
+   *
+   * @param type SchemaRegion or DataRegion
+   * @return Map<Database, List<RegionGroupIds>>
+   */
+  public Map<String, List<TConsensusGroupId>> 
getAllRegionGroupIdMap(TConsensusGroupType type) {
+    Map<String, List<TConsensusGroupId>> result = new TreeMap<>();
+    databasePartitionTables
+        .forEach(
+            (database, databasePartitionTable) -> {
+              if (databasePartitionTable.isNotPreDeleted()) {
+                result.put(database, 
databasePartitionTable.getAllRegionGroupIds(type));
+              }
+            });
+    return result;
+  }
+
   /**
    * Only leader use this interface.
    *
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocatorTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocatorTest.java
index b3c7eb3c0a1..7bf89eeff98 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocatorTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocatorTest.java
@@ -99,16 +99,20 @@ public class GreedyCopySetRegionGroupAllocatorTest {
       greedyCopySetDatabaseResult.put(i, new ArrayList<>());
     }
     for (int index = 0; index < dataRegionGroupPerDatabase * 
TEST_DATABASE_NUM; index++) {
-      TRegionReplicaSet greedyRegionGroup = 
GREEDY_ALLOCATOR.generateOptimalRegionReplicasDistribution(
-          AVAILABLE_DATA_NODE_MAP,
-          FREE_SPACE_MAP,
-          greedyResult,
-          greedyResult,
-          replicationFactor,
-          new TConsensusGroupId(TConsensusGroupType.DataRegion, index));
+      TRegionReplicaSet greedyRegionGroup =
+          GREEDY_ALLOCATOR.generateOptimalRegionReplicasDistribution(
+              AVAILABLE_DATA_NODE_MAP,
+              FREE_SPACE_MAP,
+              greedyResult,
+              greedyResult,
+              replicationFactor,
+              new TConsensusGroupId(TConsensusGroupType.DataRegion, index));
       greedyResult.add(greedyRegionGroup);
-      greedyRegionGroup.getDataNodeLocations().forEach(
-          dataNodeLocation -> 
greedyRegionCounter.merge(dataNodeLocation.getDataNodeId(), 1, Integer::sum));
+      greedyRegionGroup
+          .getDataNodeLocations()
+          .forEach(
+              dataNodeLocation ->
+                  greedyRegionCounter.merge(dataNodeLocation.getDataNodeId(), 
1, Integer::sum));
       int databaseId = RANDOM.nextInt(TEST_DATABASE_NUM);
       TRegionReplicaSet greedyCopySetRegionGroup =
           GREEDY_COPY_SET_ALLOCATOR.generateOptimalRegionReplicasDistribution(
@@ -120,20 +124,23 @@ public class GreedyCopySetRegionGroupAllocatorTest {
               new TConsensusGroupId(TConsensusGroupType.DataRegion, index));
       greedyCopySetResult.add(greedyCopySetRegionGroup);
       
greedyCopySetDatabaseResult.get(databaseId).add(greedyCopySetRegionGroup);
-      greedyCopySetRegionGroup.getDataNodeLocations().forEach(
-          dataNodeLocation -> {
-            greedyCopySetRegionCounter.merge(dataNodeLocation.getDataNodeId(), 
1, Integer::sum);
-            greedyCopySetDatabaseRegionCounter
-                .computeIfAbsent(databaseId, empty -> new TreeMap<>())
-                .merge(dataNodeLocation.getDataNodeId(), 1, Integer::sum);
-          });
+      greedyCopySetRegionGroup
+          .getDataNodeLocations()
+          .forEach(
+              dataNodeLocation -> {
+                
greedyCopySetRegionCounter.merge(dataNodeLocation.getDataNodeId(), 1, 
Integer::sum);
+                greedyCopySetDatabaseRegionCounter
+                    .computeIfAbsent(databaseId, empty -> new TreeMap<>())
+                    .merge(dataNodeLocation.getDataNodeId(), 1, Integer::sum);
+              });
       LOGGER.info("After allocate RegionGroup: {}", index);
       for (int i = 0; i < TEST_DATABASE_NUM; i++) {
         LOGGER.info("Database {}: {}", i, 
greedyCopySetDatabaseRegionCounter.get(i));
       }
       LOGGER.info("Cluster   : {}", greedyCopySetRegionCounter);
       for (int i = 1; i <= TEST_DATA_NODE_NUM; i++) {
-        Assert.assertTrue(greedyCopySetRegionCounter.getOrDefault(i, 0) <= 
DATA_REGION_PER_DATA_NODE);
+        Assert.assertTrue(
+            greedyCopySetRegionCounter.getOrDefault(i, 0) <= 
DATA_REGION_PER_DATA_NODE);
       }
     }
 
@@ -208,9 +215,11 @@ public class GreedyCopySetRegionGroupAllocatorTest {
       for (int j = 1; j <= TEST_DATA_NODE_NUM; j++) {
         if (greedyCopySetDatabaseRegionCounter.get(i).containsKey(j)) {
           greedyCopySetMinRegionCount =
-            Math.min(greedyCopySetMinRegionCount, 
greedyCopySetDatabaseRegionCounter.get(i).get(j));
+              Math.min(
+                  greedyCopySetMinRegionCount, 
greedyCopySetDatabaseRegionCounter.get(i).get(j));
           greedyCopySetMaxRegionCount =
-            Math.max(greedyCopySetMaxRegionCount, 
greedyCopySetDatabaseRegionCounter.get(i).get(j));
+              Math.max(
+                  greedyCopySetMaxRegionCount, 
greedyCopySetDatabaseRegionCounter.get(i).get(j));
         }
       }
       // The maximal Region count - minimal Region count should be less than 
or equal to 1 for each
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java
index b676a62ecfb..a7f4103345f 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java
@@ -32,6 +32,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -75,7 +76,7 @@ public class GreedyLeaderBalancerTest {
     }
 
     Map<TConsensusGroupId, Integer> leaderDistribution =
-        BALANCER.generateOptimalLeaderDistribution(
+        BALANCER.generateOptimalLeaderDistribution(new TreeMap<>(),
             regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
     Map<Integer, AtomicInteger> leaderCounter = new ConcurrentHashMap<>();
     leaderDistribution.forEach(
@@ -128,7 +129,7 @@ public class GreedyLeaderBalancerTest {
     }
 
     Map<TConsensusGroupId, Integer> leaderDistribution =
-        BALANCER.generateOptimalLeaderDistribution(
+        BALANCER.generateOptimalLeaderDistribution(new TreeMap<>(),
             regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
     Map<Integer, AtomicInteger> leaderCounter = new ConcurrentHashMap<>();
     leaderDistribution.forEach(
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java
index 9c7bda4287f..f289d8746fa 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java
@@ -38,6 +38,7 @@ import java.util.Objects;
 import java.util.PriorityQueue;
 import java.util.Random;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -263,7 +264,7 @@ public class LeaderBalancerComparisonTest {
     Map<TConsensusGroupId, Integer> lastDistribution = new 
ConcurrentHashMap<>(regionLeaderMap);
     for (int rounds = 0; rounds < 1000; rounds++) {
       Map<TConsensusGroupId, Integer> currentDistribution =
-          leaderBalancer.generateOptimalLeaderDistribution(
+          leaderBalancer.generateOptimalLeaderDistribution(new TreeMap<>(),
               regionReplicaSetMap, lastDistribution, disabledDataNodeSet);
       if (currentDistribution.equals(lastDistribution)) {
         // The leader distribution is stable
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancerTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancerTest.java
index f676956780e..6807d8b314b 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancerTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancerTest.java
@@ -28,6 +28,7 @@ import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -35,13 +36,17 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Random;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 public class MinCostFlowLeaderBalancerTest {
 
   private static final MinCostFlowLeaderBalancer BALANCER = new 
MinCostFlowLeaderBalancer();
 
+  private static final String DATABASE = "root.database";
+
   /** This test shows a simple case that greedy algorithm might fail */
   @Test
   public void optimalLeaderDistributionTest() {
@@ -50,12 +55,10 @@ public class MinCostFlowLeaderBalancerTest {
     for (int i = 0; i < 3; i++) {
       regionGroupIds.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 
i));
     }
-
     List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
     for (int i = 0; i < 4; i++) {
       dataNodeLocations.add(new TDataNodeLocation().setDataNodeId(i));
     }
-
     List<TRegionReplicaSet> regionReplicaSets = new ArrayList<>();
     regionReplicaSets.add(
         new TRegionReplicaSet(
@@ -74,11 +77,13 @@ public class MinCostFlowLeaderBalancerTest {
                 dataNodeLocations.get(0), dataNodeLocations.get(2), 
dataNodeLocations.get(3))));
 
     // Prepare input parameters
-    Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new 
HashMap<>();
+    Map<String, List<TConsensusGroupId>> databaseRegionGroupMap = new 
TreeMap<>();
+    databaseRegionGroupMap.put(DATABASE, regionGroupIds);
+    Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new 
TreeMap<>();
     regionReplicaSets.forEach(
         regionReplicaSet ->
             regionReplicaSetMap.put(regionReplicaSet.getRegionId(), 
regionReplicaSet));
-    Map<TConsensusGroupId, Integer> regionLeaderMap = new HashMap<>();
+    Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>();
     regionReplicaSets.forEach(
         regionReplicaSet -> 
regionLeaderMap.put(regionReplicaSet.getRegionId(), 0));
     Set<Integer> disabledDataNodeSet = new HashSet<>();
@@ -86,7 +91,7 @@ public class MinCostFlowLeaderBalancerTest {
 
     // Do balancing
     Map<TConsensusGroupId, Integer> leaderDistribution =
-        BALANCER.generateOptimalLeaderDistribution(
+        BALANCER.generateOptimalLeaderDistribution(databaseRegionGroupMap,
             regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
     // All RegionGroup got a leader
     Assert.assertEquals(3, leaderDistribution.size());
@@ -94,8 +99,9 @@ public class MinCostFlowLeaderBalancerTest {
     Assert.assertEquals(3, new HashSet<>(leaderDistribution.values()).size());
     // MaxFlow is 3
     Assert.assertEquals(3, BALANCER.getMaximumFlow());
-    // MinimumCost is 3(switch leader cost) + 3(load cost, 1 for each DataNode)
-    Assert.assertEquals(3 + 3, BALANCER.getMinimumCost());
+    // MinimumCost is 3(switch leader cost) + 3(load cost, rNode -> sDNode)
+    // + 3(load cost, sDNode -> tDNode)
+    Assert.assertEquals(3 + 3 + 3, BALANCER.getMinimumCost());
   }
 
   /** The leader will remain the same if all DataNodes are disabled */
@@ -110,9 +116,11 @@ public class MinCostFlowLeaderBalancerTest {
                 new TDataNodeLocation().setDataNodeId(2)));
 
     // Prepare input parameters
-    Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new 
HashMap<>();
+    Map<String, List<TConsensusGroupId>> databaseRegionGroupMap = new 
TreeMap<>();
+    databaseRegionGroupMap.put(DATABASE, 
Collections.singletonList(regionReplicaSet.getRegionId()));
+    Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new 
TreeMap<>();
     regionReplicaSetMap.put(regionReplicaSet.getRegionId(), regionReplicaSet);
-    Map<TConsensusGroupId, Integer> regionLeaderMap = new HashMap<>();
+    Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>();
     regionLeaderMap.put(regionReplicaSet.getRegionId(), 1);
     Set<Integer> disabledDataNodeSet = new HashSet<>();
     disabledDataNodeSet.add(0);
@@ -121,7 +129,7 @@ public class MinCostFlowLeaderBalancerTest {
 
     // Do balancing
     Map<TConsensusGroupId, Integer> leaderDistribution =
-        BALANCER.generateOptimalLeaderDistribution(
+        BALANCER.generateOptimalLeaderDistribution(databaseRegionGroupMap,
             regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
     Assert.assertEquals(1, leaderDistribution.size());
     Assert.assertEquals(1, new HashSet<>(leaderDistribution.values()).size());
@@ -148,13 +156,15 @@ public class MinCostFlowLeaderBalancerTest {
 
     // The loadCost for each DataNode are the same
     int x = regionGroupNum / dataNodeNum;
-    // i.e. formula of 1^2 + 2^2 + 3^2 + ...
-    int loadCost = x * (x + 1) * (2 * x + 1) / 6;
+    // i.e. formula of (1^2 + 2^2 + 3^2 + ...) * 2
+    int loadCost = x * (x + 1) * (2 * x + 1) / 3;
 
     int dataNodeId = 0;
     Random random = new Random();
-    Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new 
HashMap<>();
-    Map<TConsensusGroupId, Integer> regionLeaderMap = new HashMap<>();
+    Map<String, List<TConsensusGroupId>> databaseRegionGroupMap = new 
TreeMap<>();
+    databaseRegionGroupMap.put(DATABASE, new ArrayList<>());
+    Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new 
TreeMap<>();
+    Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>();
     for (int i = 0; i < regionGroupNum; i++) {
       TConsensusGroupId regionGroupId = new 
TConsensusGroupId(TConsensusGroupType.DataRegion, i);
       int leaderId = (dataNodeId + random.nextInt(replicationFactor)) % 
dataNodeNum;
@@ -166,31 +176,30 @@ public class MinCostFlowLeaderBalancerTest {
         dataNodeId = (dataNodeId + 1) % dataNodeNum;
       }
 
+      databaseRegionGroupMap.get(DATABASE).add(regionGroupId);
       regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
       regionLeaderMap.put(regionGroupId, leaderId);
     }
 
     // Do balancing
     Map<TConsensusGroupId, Integer> leaderDistribution =
-        BALANCER.generateOptimalLeaderDistribution(
+        BALANCER.generateOptimalLeaderDistribution(databaseRegionGroupMap,
             regionReplicaSetMap, regionLeaderMap, new HashSet<>());
     // All RegionGroup got a leader
     Assert.assertEquals(regionGroupNum, leaderDistribution.size());
 
-    Map<Integer, AtomicInteger> leaderCounter = new ConcurrentHashMap<>();
+    Map<Integer, Integer> leaderCounter = new ConcurrentHashMap<>();
     leaderDistribution
         .values()
         .forEach(
             leaderId ->
-                leaderCounter
-                    .computeIfAbsent(leaderId, empty -> new AtomicInteger(0))
-                    .getAndIncrement());
+                leaderCounter.merge(leaderId, 1, Integer::sum));
     // Every DataNode has leader
     Assert.assertEquals(dataNodeNum, leaderCounter.size());
     // Every DataNode has exactly regionGroupNum / dataNodeNum leaders
-    leaderCounter
-        .values()
-        .forEach(leaderNum -> Assert.assertEquals(regionGroupNum / 
dataNodeNum, leaderNum.get()));
+    for (int i = 0; i < dataNodeNum; i++) {
+      Assert.assertEquals(regionGroupNum / dataNodeNum, 
leaderCounter.get(i).intValue());
+    }
 
     // MaxFlow is regionGroupNum
     Assert.assertEquals(regionGroupNum, BALANCER.getMaximumFlow());


Reply via email to