This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 91abf72975b Optimize the input parameters for LeaderBalancer (#12374)
91abf72975b is described below
commit 91abf72975b4879f9deeef88afb15c6df96723d6
Author: Yongzao <[email protected]>
AuthorDate: Fri Apr 19 13:37:29 2024 +0800
Optimize the input parameters for LeaderBalancer (#12374)
* Finish
* decrease ratis waiting interval
---
.../iotdb/confignode/conf/ConfigNodeConfig.java | 4 +-
.../confignode/conf/ConfigNodeDescriptor.java | 6 +-
.../confignode/conf/ConfigNodeStartupCheck.java | 6 +-
.../manager/load/balancer/RouteBalancer.java | 19 ++--
.../router/leader/AbstractLeaderBalancer.java | 108 ++++++++++++++++++
.../router/leader/GreedyLeaderBalancer.java | 57 ++++------
.../balancer/router/leader/ILeaderBalancer.java | 49 --------
.../router/leader/MinCostFlowLeaderBalancer.java | 123 ++++++++-------------
.../confignode/manager/load/cache/LoadCache.java | 59 +++++++++-
.../manager/load/cache/node/NodeStatistics.java | 9 ++
.../load/cache/region/RegionStatistics.java | 7 ++
.../router/leader/CFDLeaderBalancerTest.java | 117 ++++++++++++++++++--
.../router/leader/GreedyLeaderBalancerTest.java | 118 +++++++++++++-------
.../leader/LeaderBalancerComparisonTest.java | 75 +++++++++++--
14 files changed, 510 insertions(+), 247 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index b20f1e87d2f..3875bf09080 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import
org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProperty;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer;
-import
org.apache.iotdb.confignode.manager.load.balancer.router.leader.ILeaderBalancer;
+import
org.apache.iotdb.confignode.manager.load.balancer.router.leader.AbstractLeaderBalancer;
import
org.apache.iotdb.confignode.manager.load.balancer.router.priority.IPriorityBalancer;
import
org.apache.iotdb.confignode.manager.partition.RegionGroupExtensionPolicy;
import org.apache.iotdb.consensus.ConsensusFactory;
@@ -187,7 +187,7 @@ public class ConfigNodeConfig {
private long unknownDataNodeDetectInterval = heartbeatIntervalInMs;
/** The policy of cluster RegionGroups' leader distribution. */
- private String leaderDistributionPolicy = ILeaderBalancer.CFD_POLICY;
+ private String leaderDistributionPolicy = AbstractLeaderBalancer.CFD_POLICY;
/** Whether to enable auto leader balance for Ratis consensus protocol. */
private boolean enableAutoLeaderBalanceForRatisConsensus = true;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 1bad7b48c5c..33b8ec69f8e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.commons.exception.BadNodeUrlException;
import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer;
-import
org.apache.iotdb.confignode.manager.load.balancer.router.leader.ILeaderBalancer;
+import
org.apache.iotdb.confignode.manager.load.balancer.router.leader.AbstractLeaderBalancer;
import
org.apache.iotdb.confignode.manager.load.balancer.router.priority.IPriorityBalancer;
import
org.apache.iotdb.confignode.manager.partition.RegionGroupExtensionPolicy;
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
@@ -354,8 +354,8 @@ public class ConfigNodeDescriptor {
properties
.getProperty("leader_distribution_policy",
conf.getLeaderDistributionPolicy())
.trim();
- if (ILeaderBalancer.GREEDY_POLICY.equals(leaderDistributionPolicy)
- || ILeaderBalancer.CFD_POLICY.equals(leaderDistributionPolicy)) {
+ if (AbstractLeaderBalancer.GREEDY_POLICY.equals(leaderDistributionPolicy)
+ || AbstractLeaderBalancer.CFD_POLICY.equals(leaderDistributionPolicy))
{
conf.setLeaderDistributionPolicy(leaderDistributionPolicy);
} else {
throw new IOException(
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
index 1d8bf58259e..3e82e0bb4a6 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.ConfigurationException;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.service.StartupChecks;
-import
org.apache.iotdb.confignode.manager.load.balancer.router.leader.ILeaderBalancer;
+import
org.apache.iotdb.confignode.manager.load.balancer.router.leader.AbstractLeaderBalancer;
import
org.apache.iotdb.confignode.manager.load.balancer.router.priority.IPriorityBalancer;
import org.apache.iotdb.consensus.ConsensusFactory;
@@ -144,8 +144,8 @@ public class ConfigNodeStartupCheck extends StartupChecks {
}
// The leader distribution policy is limited
- if
(!ILeaderBalancer.GREEDY_POLICY.equals(CONF.getLeaderDistributionPolicy())
- &&
!ILeaderBalancer.CFD_POLICY.equals(CONF.getLeaderDistributionPolicy())) {
+ if
(!AbstractLeaderBalancer.GREEDY_POLICY.equals(CONF.getLeaderDistributionPolicy())
+ &&
!AbstractLeaderBalancer.CFD_POLICY.equals(CONF.getLeaderDistributionPolicy())) {
throw new ConfigurationException(
"leader_distribution_policy",
CONF.getRoutePriorityPolicy(),
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index 7269defdbe6..e42b333e794 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -33,8 +33,8 @@ import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.ProcedureManager;
import org.apache.iotdb.confignode.manager.load.LoadManager;
+import
org.apache.iotdb.confignode.manager.load.balancer.router.leader.AbstractLeaderBalancer;
import
org.apache.iotdb.confignode.manager.load.balancer.router.leader.GreedyLeaderBalancer;
-import
org.apache.iotdb.confignode.manager.load.balancer.router.leader.ILeaderBalancer;
import
org.apache.iotdb.confignode.manager.load.balancer.router.leader.MinCostFlowLeaderBalancer;
import
org.apache.iotdb.confignode.manager.load.balancer.router.priority.GreedyPriorityBalancer;
import
org.apache.iotdb.confignode.manager.load.balancer.router.priority.IPriorityBalancer;
@@ -97,7 +97,7 @@ public class RouteBalancer implements
IClusterStatusSubscriber {
private final IManager configManager;
// For generating optimal Region leader distribution
- private final ILeaderBalancer leaderBalancer;
+ private final AbstractLeaderBalancer leaderBalancer;
// For generating optimal cluster Region routing priority
private final IPriorityBalancer priorityRouter;
@@ -108,7 +108,7 @@ public class RouteBalancer implements
IClusterStatusSubscriber {
private final Map<TConsensusGroupId, TRegionReplicaSet> regionPriorityMap;
// The interval of retrying to balance ratis leader after the last failed
time
- private static final long BALANCE_RATIS_LEADER_FAILED_INTERVAL_IN_NS = 60 *
1000L * 1000L * 1000L;
+ private static final long BALANCE_RATIS_LEADER_FAILED_INTERVAL_IN_NS = 20 *
1000L * 1000L * 1000L;
private final Map<TConsensusGroupId, Long> lastFailedTimeForLeaderBalance;
public RouteBalancer(IManager configManager) {
@@ -118,10 +118,10 @@ public class RouteBalancer implements
IClusterStatusSubscriber {
this.lastFailedTimeForLeaderBalance = new TreeMap<>();
switch (CONF.getLeaderDistributionPolicy()) {
- case ILeaderBalancer.GREEDY_POLICY:
+ case AbstractLeaderBalancer.GREEDY_POLICY:
this.leaderBalancer = new GreedyLeaderBalancer();
break;
- case ILeaderBalancer.CFD_POLICY:
+ case AbstractLeaderBalancer.CFD_POLICY:
default:
this.leaderBalancer = new MinCostFlowLeaderBalancer();
break;
@@ -157,13 +157,8 @@ public class RouteBalancer implements
IClusterStatusSubscriber {
getPartitionManager().getAllRegionGroupIdMap(regionGroupType),
getPartitionManager().getAllReplicaSetsMap(regionGroupType),
currentLeaderMap,
- getNodeManager()
- .filterDataNodeThroughStatus(
- NodeStatus.Unknown, NodeStatus.ReadOnly,
NodeStatus.Removing)
- .stream()
- .map(TDataNodeConfiguration::getLocation)
- .map(TDataNodeLocation::getDataNodeId)
- .collect(Collectors.toSet()));
+ getLoadManager().getLoadCache().getCurrentDataNodeStatisticsMap(),
+
getLoadManager().getLoadCache().getCurrentRegionStatisticsMap(regionGroupType));
// Transfer leader to the optimal distribution
long currentTime = System.nanoTime();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java
new file mode 100644
index 00000000000..b82b3a0ca48
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.balancer.router.leader;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics;
+
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+public abstract class AbstractLeaderBalancer {
+
+ public static final String GREEDY_POLICY = "GREEDY";
+ public static final String CFD_POLICY = "CFD";
+
+ // Map<Database, List<RegionGroup>>
+ protected final Map<String, List<TConsensusGroupId>> databaseRegionGroupMap;
+ // Map<RegionGroupId, RegionGroup>
+ protected final Map<TConsensusGroupId, TRegionReplicaSet>
regionReplicaSetMap;
+ // Map<RegionGroupId, leaderId>
+ protected final Map<TConsensusGroupId, Integer> regionLeaderMap;
+ // Map<DataNodeId, NodeStatistics>
+ protected final Map<Integer, NodeStatistics> dataNodeStatisticsMap;
+ // Map<RegionGroupId, Map<DataNodeId, RegionStatistics>>
+ protected final Map<TConsensusGroupId, Map<Integer, RegionStatistics>>
regionStatisticsMap;
+
+ protected AbstractLeaderBalancer() {
+ this.databaseRegionGroupMap = new TreeMap<>();
+ this.regionReplicaSetMap = new TreeMap<>();
+ this.regionLeaderMap = new TreeMap<>();
+ this.dataNodeStatisticsMap = new TreeMap<>();
+ this.regionStatisticsMap = new TreeMap<>();
+ }
+
+ protected void initialize(
+ Map<String, List<TConsensusGroupId>> databaseRegionGroupMap,
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
+ Map<TConsensusGroupId, Integer> regionLeaderMap,
+ Map<Integer, NodeStatistics> dataNodeStatisticsMap,
+ Map<TConsensusGroupId, Map<Integer, RegionStatistics>>
regionStatisticsMap) {
+ this.databaseRegionGroupMap.putAll(databaseRegionGroupMap);
+ this.regionReplicaSetMap.putAll(regionReplicaSetMap);
+ this.regionLeaderMap.putAll(regionLeaderMap);
+ this.dataNodeStatisticsMap.putAll(dataNodeStatisticsMap);
+ this.regionStatisticsMap.putAll(regionStatisticsMap);
+ }
+
+ protected boolean isDataNodeAvailable(int dataNodeId) {
+ // RegionGroup-leader can only be placed on Running DataNode
+ return dataNodeStatisticsMap.containsKey(dataNodeId)
+ &&
NodeStatus.Running.equals(dataNodeStatisticsMap.get(dataNodeId).getStatus());
+ }
+
+ protected boolean isRegionAvailable(TConsensusGroupId regionGroupId, int
dataNodeId) {
+ // Only Running Region can be selected as RegionGroup-leader
+ return regionStatisticsMap.containsKey(regionGroupId)
+ && regionStatisticsMap.get(regionGroupId).containsKey(dataNodeId)
+ && RegionStatus.Running.equals(
+
regionStatisticsMap.get(regionGroupId).get(dataNodeId).getRegionStatus());
+ }
+
+ protected void clear() {
+ this.databaseRegionGroupMap.clear();
+ this.regionReplicaSetMap.clear();
+ this.regionLeaderMap.clear();
+ this.dataNodeStatisticsMap.clear();
+ this.regionStatisticsMap.clear();
+ }
+
+ /**
+ * Generate an optimal leader distribution.
+ *
+ * @param databaseRegionGroupMap RegionGroup held by each Database
+ * @param regionReplicaSetMap All RegionGroups the cluster currently have
+ * @param regionLeaderMap The current leader distribution of each RegionGroup
+ * @param dataNodeStatisticsMap The current statistics of each DataNode
+ * @param regionStatisticsMap The current statistics of each Region
+ * @return Map<TConsensusGroupId, Integer>, The optimal leader distribution
+ */
+ public abstract Map<TConsensusGroupId, Integer>
generateOptimalLeaderDistribution(
+ Map<String, List<TConsensusGroupId>> databaseRegionGroupMap,
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
+ Map<TConsensusGroupId, Integer> regionLeaderMap,
+ Map<Integer, NodeStatistics> dataNodeStatisticsMap,
+ Map<TConsensusGroupId, Map<Integer, RegionStatistics>>
regionStatisticsMap);
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
index 4a28fb6ca31..5a6098b60f9 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
@@ -22,26 +22,19 @@ package
org.apache.iotdb.confignode.manager.load.balancer.router.leader;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
/** Leader distribution balancer that uses greedy algorithm */
-public class GreedyLeaderBalancer implements ILeaderBalancer {
-
- private final Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap;
- private final Map<TConsensusGroupId, Integer> regionLeaderMap;
- private final Set<Integer> disabledDataNodeSet;
+public class GreedyLeaderBalancer extends AbstractLeaderBalancer {
public GreedyLeaderBalancer() {
- this.regionReplicaSetMap = new HashMap<>();
- this.regionLeaderMap = new ConcurrentHashMap<>();
- this.disabledDataNodeSet = new HashSet<>();
+ super();
}
@Override
@@ -49,30 +42,19 @@ public class GreedyLeaderBalancer implements
ILeaderBalancer {
Map<String, List<TConsensusGroupId>> databaseRegionGroupMap,
Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
Map<TConsensusGroupId, Integer> regionLeaderMap,
- Set<Integer> disabledDataNodeSet) {
- initialize(regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
-
+ Map<Integer, NodeStatistics> dataNodeStatisticsMap,
+ Map<TConsensusGroupId, Map<Integer, RegionStatistics>>
regionStatisticsMap) {
+ initialize(
+ databaseRegionGroupMap,
+ regionReplicaSetMap,
+ regionLeaderMap,
+ dataNodeStatisticsMap,
+ regionStatisticsMap);
Map<TConsensusGroupId, Integer> result = constructGreedyDistribution();
-
clear();
return result;
}
- private void initialize(
- Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
- Map<TConsensusGroupId, Integer> regionLeaderMap,
- Set<Integer> disabledDataNodeSet) {
- this.regionReplicaSetMap.putAll(regionReplicaSetMap);
- this.regionLeaderMap.putAll(regionLeaderMap);
- this.disabledDataNodeSet.addAll(disabledDataNodeSet);
- }
-
- private void clear() {
- this.regionReplicaSetMap.clear();
- this.regionLeaderMap.clear();
- this.disabledDataNodeSet.clear();
- }
-
private Map<TConsensusGroupId, Integer> constructGreedyDistribution() {
Map<Integer, Integer> leaderCounter = new TreeMap<>();
regionReplicaSetMap.forEach(
@@ -81,14 +63,13 @@ public class GreedyLeaderBalancer implements
ILeaderBalancer {
leaderId = regionLeaderMap.getOrDefault(regionGroupId, -1);
for (TDataNodeLocation dataNodeLocation :
regionGroup.getDataNodeLocations()) {
int dataNodeId = dataNodeLocation.getDataNodeId();
- if (disabledDataNodeSet.contains(dataNodeId)) {
- continue;
- }
- // Select the DataNode with the minimal leader count as the new
leader
- int count = leaderCounter.getOrDefault(dataNodeId, 0);
- if (count < minCount) {
- minCount = count;
- leaderId = dataNodeId;
+ if (isDataNodeAvailable(dataNodeId) &&
isRegionAvailable(regionGroupId, dataNodeId)) {
+ // Select the DataNode with the minimal leader count as the new
leader
+ int count = leaderCounter.getOrDefault(dataNodeId, 0);
+ if (count < minCount) {
+ minCount = count;
+ leaderId = dataNodeId;
+ }
}
}
regionLeaderMap.put(regionGroupId, leaderId);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/ILeaderBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/ILeaderBalancer.java
deleted file mode 100644
index 65c45aaebab..00000000000
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/ILeaderBalancer.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.manager.load.balancer.router.leader;
-
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public interface ILeaderBalancer {
-
- String GREEDY_POLICY = "GREEDY";
- String CFD_POLICY = "CFD";
-
- /**
- * Generate an optimal leader distribution.
- *
- * @param databaseRegionGroupMap RegionGroup held by each Database
- * @param regionReplicaSetMap All RegionGroups the cluster currently have
- * @param regionLeaderMap The current leader of each RegionGroup
- * @param disabledDataNodeSet The DataNodes that currently unable to
work(can't place
- * RegionGroup-leader)
- * @return Map<TConsensusGroupId, Integer>, The optimal leader distribution
- */
- Map<TConsensusGroupId, Integer> generateOptimalLeaderDistribution(
- Map<String, List<TConsensusGroupId>> databaseRegionGroupMap,
- Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
- Map<TConsensusGroupId, Integer> regionLeaderMap,
- Set<Integer> disabledDataNodeSet);
-}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
index 48aa8cc5547..daf1d15f27c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics;
import java.util.ArrayList;
import java.util.Arrays;
@@ -30,23 +32,14 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
-import java.util.Set;
import java.util.TreeMap;
-import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
/** Leader distribution balancer that uses minimum cost flow algorithm */
-public class MinCostFlowLeaderBalancer implements ILeaderBalancer {
+public class MinCostFlowLeaderBalancer extends AbstractLeaderBalancer {
private static final int INFINITY = Integer.MAX_VALUE;
- /** Input parameters */
- private final Map<String, List<TConsensusGroupId>> databaseRegionGroupMap;
-
- private final Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap;
- private final Map<TConsensusGroupId, Integer> regionLeaderMap;
- private final Set<Integer> disabledDataNodeSet;
-
/** Graph nodes */
// Super source node
private static final int S_NODE = 0;
@@ -78,10 +71,7 @@ public class MinCostFlowLeaderBalancer implements
ILeaderBalancer {
private int minimumCost = 0;
public MinCostFlowLeaderBalancer() {
- this.databaseRegionGroupMap = new TreeMap<>();
- this.regionReplicaSetMap = new TreeMap<>();
- this.regionLeaderMap = new TreeMap<>();
- this.disabledDataNodeSet = new TreeSet<>();
+ super();
this.rNodeMap = new TreeMap<>();
this.sDNodeMap = new TreeMap<>();
this.sDNodeReflect = new TreeMap<>();
@@ -94,47 +84,34 @@ public class MinCostFlowLeaderBalancer implements
ILeaderBalancer {
Map<String, List<TConsensusGroupId>> databaseRegionGroupMap,
Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
Map<TConsensusGroupId, Integer> regionLeaderMap,
- Set<Integer> disabledDataNodeSet) {
-
- initialize(databaseRegionGroupMap, regionReplicaSetMap, regionLeaderMap,
disabledDataNodeSet);
-
+ Map<Integer, NodeStatistics> dataNodeStatisticsMap,
+ Map<TConsensusGroupId, Map<Integer, RegionStatistics>>
regionStatisticsMap) {
+ initialize(
+ databaseRegionGroupMap,
+ regionReplicaSetMap,
+ regionLeaderMap,
+ dataNodeStatisticsMap,
+ regionStatisticsMap);
Map<TConsensusGroupId, Integer> result;
constructMCFGraph();
dinicAlgorithm();
result = collectLeaderDistribution();
-
clear();
return result;
}
- private void initialize(
- Map<String, List<TConsensusGroupId>> databaseRegionGroupMap,
- Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
- Map<TConsensusGroupId, Integer> regionLeaderMap,
- Set<Integer> disabledDataNodeSet) {
- this.databaseRegionGroupMap.putAll(databaseRegionGroupMap);
- this.regionReplicaSetMap.putAll(regionReplicaSetMap);
- this.regionLeaderMap.putAll(regionLeaderMap);
- this.disabledDataNodeSet.addAll(disabledDataNodeSet);
- }
-
- private void clear() {
- this.databaseRegionGroupMap.clear();
- this.regionReplicaSetMap.clear();
- this.regionLeaderMap.clear();
- this.disabledDataNodeSet.clear();
-
+ @Override
+ protected void clear() {
+ super.clear();
this.rNodeMap.clear();
this.sDNodeMap.clear();
this.sDNodeReflect.clear();
this.tDNodeMap.clear();
this.minCostFlowEdges.clear();
-
this.nodeHeadEdge = null;
this.nodeCurrentEdge = null;
this.isNodeVisited = null;
this.nodeMinimumCost = null;
-
this.maxNode = T_NODE + 1;
this.maxEdge = 0;
}
@@ -155,18 +132,16 @@ public class MinCostFlowLeaderBalancer implements
ILeaderBalancer {
for (TDataNodeLocation dataNodeLocation :
regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
int dataNodeId = dataNodeLocation.getDataNodeId();
- if (disabledDataNodeSet.contains(dataNodeId)) {
- // Skip disabled DataNode
- continue;
- }
- if (!sDNodeMap.get(database).containsKey(dataNodeId)) {
- sDNodeMap.get(database).put(dataNodeId, maxNode);
- sDNodeReflect.get(database).put(maxNode, dataNodeId);
- maxNode += 1;
- }
- if (!tDNodeMap.containsKey(dataNodeId)) {
- tDNodeMap.put(dataNodeId, maxNode);
- maxNode += 1;
+ if (isDataNodeAvailable(dataNodeId)) {
+ if (!sDNodeMap.get(database).containsKey(dataNodeId)) {
+ sDNodeMap.get(database).put(dataNodeId, maxNode);
+ sDNodeReflect.get(database).put(maxNode, dataNodeId);
+ maxNode += 1;
+ }
+ if (!tDNodeMap.containsKey(dataNodeId)) {
+ tDNodeMap.put(dataNodeId, maxNode);
+ maxNode += 1;
+ }
}
}
}
@@ -194,15 +169,13 @@ public class MinCostFlowLeaderBalancer implements
ILeaderBalancer {
for (TDataNodeLocation dataNodeLocation :
regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
int dataNodeId = dataNodeLocation.getDataNodeId();
- if (disabledDataNodeSet.contains(dataNodeId)) {
- // Skip disabled DataNode
- continue;
+ if (isDataNodeAvailable(dataNodeId) &&
isRegionAvailable(regionGroupId, dataNodeId)) {
+ int sDNode = sDNodeMap.get(database).get(dataNodeId);
+ // Capacity: 1, Cost: 1 if sDNode is the current leader of the
rNode, 0 otherwise.
+ // Therefore, the RegionGroup will keep the leader as constant as
possible.
+ int cost = regionLeaderMap.getOrDefault(regionGroupId, -1) ==
dataNodeId ? 0 : 1;
+ addAdjacentEdges(rNode, sDNode, 1, cost);
}
- int sDNode = sDNodeMap.get(database).get(dataNodeId);
- // Capacity: 1, Cost: 1 if sDNode is the current leader of the
rNode, 0 otherwise.
- // Therefore, the RegionGroup will keep the leader as constant as
possible.
- int cost = regionLeaderMap.getOrDefault(regionGroupId, -1) ==
dataNodeId ? 0 : 1;
- addAdjacentEdges(rNode, sDNode, 1, cost);
}
}
}
@@ -217,17 +190,15 @@ public class MinCostFlowLeaderBalancer implements
ILeaderBalancer {
for (TDataNodeLocation dataNodeLocation :
regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
int dataNodeId = dataNodeLocation.getDataNodeId();
- if (disabledDataNodeSet.contains(dataNodeId)) {
- // Skip disabled DataNode
- continue;
+ if (isDataNodeAvailable(dataNodeId)) {
+ int sDNode = sDNodeMap.get(database).get(dataNodeId);
+ int tDNode = tDNodeMap.get(dataNodeId);
+ int leaderCount = leaderCounter.merge(dataNodeId, 1, Integer::sum);
+ // Capacity: 1, Cost: x^2 for the x-th edge at the current sDNode.
+ // Thus, the leader distribution will be as balance as possible
within each Database
+ // based on the Jensen's-Inequality.
+ addAdjacentEdges(sDNode, tDNode, 1, leaderCount * leaderCount);
}
- int sDNode = sDNodeMap.get(database).get(dataNodeId);
- int tDNode = tDNodeMap.get(dataNodeId);
- int leaderCount = leaderCounter.merge(dataNodeId, 1, Integer::sum);
- // Capacity: 1, Cost: x^2 for the x-th edge at the current sDNode.
- // Thus, the leader distribution will be as balance as possible
within each Database
- // based on the Jensen's-Inequality.
- addAdjacentEdges(sDNode, tDNode, 1, leaderCount * leaderCount);
}
}
}
@@ -239,16 +210,14 @@ public class MinCostFlowLeaderBalancer implements
ILeaderBalancer {
for (TRegionReplicaSet regionReplicaSet : regionReplicaSetMap.values()) {
for (TDataNodeLocation dataNodeLocation :
regionReplicaSet.getDataNodeLocations()) {
int dataNodeId = dataNodeLocation.getDataNodeId();
- if (disabledDataNodeSet.contains(dataNodeId)) {
- // Skip disabled DataNode
- continue;
+ if (isDataNodeAvailable(dataNodeId)) {
+ int tDNode = tDNodeMap.get(dataNodeId);
+ int leaderCount = maxLeaderCounter.merge(dataNodeId, 1,
Integer::sum);
+ // Cost: x^2 for the x-th edge at the current dNode.
+ // Thus, the leader distribution will be as balance as possible
within the cluster
+ // Based on the Jensen's-Inequality.
+ addAdjacentEdges(tDNode, T_NODE, 1, leaderCount * leaderCount);
}
- int tDNode = tDNodeMap.get(dataNodeId);
- int leaderCount = maxLeaderCounter.merge(dataNodeId, 1, Integer::sum);
- // Cost: x^2 for the x-th edge at the current dNode.
- // Thus, the leader distribution will be as balance as possible within
the cluster
- // Based on the Jensen's-Inequality.
- addAdjacentEdges(tDNode, T_NODE, 1, leaderCount * leaderCount);
}
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
index 6fae8010e22..be561937b9b 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
@@ -40,6 +40,7 @@ import
org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
import org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupCache;
import
org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupStatistics;
import
org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics;
import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
import org.slf4j.Logger;
@@ -261,6 +262,22 @@ public class LoadCache {
return nodeStatisticsMap;
}
+ /**
+ * Get the NodeStatistics of all DataNodes.
+ *
+ * @return a map of all DataNodes' NodeStatistics
+ */
+ public Map<Integer, NodeStatistics> getCurrentDataNodeStatisticsMap() {
+ Map<Integer, NodeStatistics> dataNodeStatisticsMap = new TreeMap<>();
+ nodeCacheMap.forEach(
+ (nodeId, nodeCache) -> {
+ if (nodeCache instanceof DataNodeHeartbeatCache) {
+ dataNodeStatisticsMap.put(nodeId, (NodeStatistics)
nodeCache.getCurrentStatistics());
+ }
+ });
+ return dataNodeStatisticsMap;
+ }
+
/**
* Get the RegionGroupStatistics of all RegionGroups.
*
@@ -274,6 +291,30 @@ public class LoadCache {
return regionGroupStatisticsMap;
}
+ /**
+ * Get the RegionStatistics of all Regions.
+ *
+ * @param type DataRegion or SchemaRegion
+ * @return a map of RegionStatistics
+ */
+ public Map<TConsensusGroupId, Map<Integer, RegionStatistics>>
getCurrentRegionStatisticsMap(
+ TConsensusGroupType type) {
+ Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap
= new TreeMap<>();
+ regionGroupCacheMap.forEach(
+ (regionGroupId, regionGroupCache) -> {
+ if (type.equals(regionGroupId.getType())) {
+ regionStatisticsMap.put(
+ regionGroupId,
regionGroupCache.getCurrentStatistics().getRegionStatisticsMap());
+ }
+ });
+ return regionStatisticsMap;
+ }
+
+ /**
+ * Get the ConsensusGroupStatistics of all RegionGroups.
+ *
+ * @return a map of ConsensusGroupStatistics
+ */
public Map<TConsensusGroupId, ConsensusGroupStatistics>
getCurrentConsensusGroupStatisticsMap() {
Map<TConsensusGroupId, ConsensusGroupStatistics>
consensusGroupStatisticsMap = new TreeMap<>();
consensusGroupCacheMap.forEach(
@@ -294,6 +335,22 @@ public class LoadCache {
return nodeCache == null ? NodeStatus.Unknown : nodeCache.getNodeStatus();
}
+ /**
+ * Get all DataNodes' NodeStatus
+ *
+ * @return Map<DataNodeId, NodeStatus>
+ */
+ public Map<Integer, NodeStatus> getDataNodeStatus() {
+ Map<Integer, NodeStatus> nodeStatusMap = new TreeMap<>();
+ nodeCacheMap.forEach(
+ (nodeId, nodeCache) -> {
+ if (nodeCache instanceof DataNodeHeartbeatCache) {
+ nodeStatusMap.put(nodeId, nodeCache.getNodeStatus());
+ }
+ });
+ return nodeStatusMap;
+ }
+
/**
* Safely get the specified Node's current status with reason.
*
@@ -448,7 +505,7 @@ public class LoadCache {
*/
public Map<TConsensusGroupId, RegionGroupStatus> getRegionGroupStatus(
List<TConsensusGroupId> consensusGroupIds) {
- Map<TConsensusGroupId, RegionGroupStatus> regionGroupStatusMap = new
ConcurrentHashMap<>();
+ Map<TConsensusGroupId, RegionGroupStatus> regionGroupStatusMap = new
TreeMap<>();
for (TConsensusGroupId consensusGroupId : consensusGroupIds) {
regionGroupStatusMap.put(consensusGroupId,
getRegionGroupStatus(consensusGroupId));
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeStatistics.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeStatistics.java
index e3c8bd25dbb..24bee71ad0e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeStatistics.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeStatistics.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.confignode.manager.load.cache.node;
import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.manager.load.cache.AbstractStatistics;
import java.util.Objects;
@@ -42,6 +43,14 @@ public class NodeStatistics extends AbstractStatistics {
this.loadScore = loadScore;
}
+ @TestOnly
+ public NodeStatistics(NodeStatus status) {
+ super(System.nanoTime());
+ this.status = status;
+ this.statusReason = null;
+ this.loadScore = Long.MAX_VALUE;
+ }
+
public static NodeStatistics generateDefaultNodeStatistics() {
return new NodeStatistics(Long.MIN_VALUE, NodeStatus.Unknown, null,
Long.MAX_VALUE);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionStatistics.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionStatistics.java
index 1246f0ed435..84f22ef4ac5 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionStatistics.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionStatistics.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.confignode.manager.load.cache.region;
import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.manager.load.cache.AbstractStatistics;
import java.util.Objects;
@@ -34,6 +35,12 @@ public class RegionStatistics extends AbstractStatistics {
this.regionStatus = regionStatus;
}
+ @TestOnly
+ public RegionStatistics(RegionStatus regionStatus) {
+ super(System.nanoTime());
+ this.regionStatus = regionStatus;
+ }
+
public static RegionStatistics generateDefaultRegionStatistics() {
return new RegionStatistics(0, RegionStatus.Unknown);
}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/CFDLeaderBalancerTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/CFDLeaderBalancerTest.java
index 716fa55a80f..7cc1f50c28c 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/CFDLeaderBalancerTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/CFDLeaderBalancerTest.java
@@ -23,6 +23,10 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics;
import org.junit.Assert;
import org.junit.Test;
@@ -35,7 +39,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
-import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -63,21 +66,37 @@ public class CFDLeaderBalancerTest {
// The result will be unbalanced if select DataNode-2 as leader for
RegionGroup-0
// and select DataNode-3 as leader for RegionGroup-1
List<TRegionReplicaSet> regionReplicaSets = new ArrayList<>();
+ Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap
= new TreeMap<>();
regionReplicaSets.add(
new TRegionReplicaSet(
regionGroupIds.get(0),
Arrays.asList(
dataNodeLocations.get(0), dataNodeLocations.get(1),
dataNodeLocations.get(2))));
+ Map<Integer, RegionStatistics> region0 = new TreeMap<>();
+ region0.put(0, new RegionStatistics(RegionStatus.Unknown));
+ region0.put(1, new RegionStatistics(RegionStatus.Running));
+ region0.put(2, new RegionStatistics(RegionStatus.Running));
+ regionStatisticsMap.put(regionGroupIds.get(0), region0);
regionReplicaSets.add(
new TRegionReplicaSet(
regionGroupIds.get(1),
Arrays.asList(
dataNodeLocations.get(0), dataNodeLocations.get(1),
dataNodeLocations.get(3))));
+ Map<Integer, RegionStatistics> region1 = new TreeMap<>();
+ region1.put(0, new RegionStatistics(RegionStatus.Unknown));
+ region1.put(1, new RegionStatistics(RegionStatus.Running));
+ region1.put(3, new RegionStatistics(RegionStatus.Running));
+ regionStatisticsMap.put(regionGroupIds.get(1), region1);
regionReplicaSets.add(
new TRegionReplicaSet(
regionGroupIds.get(2),
Arrays.asList(
dataNodeLocations.get(0), dataNodeLocations.get(2),
dataNodeLocations.get(3))));
+ Map<Integer, RegionStatistics> region2 = new TreeMap<>();
+ region2.put(0, new RegionStatistics(RegionStatus.Unknown));
+ region2.put(2, new RegionStatistics(RegionStatus.Running));
+ region2.put(3, new RegionStatistics(RegionStatus.Running));
+ regionStatisticsMap.put(regionGroupIds.get(2), region2);
// Prepare input parameters
Map<String, List<TConsensusGroupId>> databaseRegionGroupMap = new
TreeMap<>();
@@ -89,13 +108,20 @@ public class CFDLeaderBalancerTest {
Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>();
regionReplicaSets.forEach(
regionReplicaSet ->
regionLeaderMap.put(regionReplicaSet.getRegionId(), 0));
- Set<Integer> disabledDataNodeSet = new HashSet<>();
- disabledDataNodeSet.add(0);
+ Map<Integer, NodeStatistics> dataNodeStatisticsMap = new TreeMap<>();
+ dataNodeStatisticsMap.put(0, new NodeStatistics(NodeStatus.Unknown));
+ dataNodeStatisticsMap.put(1, new NodeStatistics(NodeStatus.Running));
+ dataNodeStatisticsMap.put(2, new NodeStatistics(NodeStatus.Running));
+ dataNodeStatisticsMap.put(3, new NodeStatistics(NodeStatus.Running));
// Do balancing
Map<TConsensusGroupId, Integer> leaderDistribution =
BALANCER.generateOptimalLeaderDistribution(
- databaseRegionGroupMap, regionReplicaSetMap, regionLeaderMap,
disabledDataNodeSet);
+ databaseRegionGroupMap,
+ regionReplicaSetMap,
+ regionLeaderMap,
+ dataNodeStatisticsMap,
+ regionStatisticsMap);
// All RegionGroup got a leader
Assert.assertEquals(3, leaderDistribution.size());
// Each DataNode has exactly one leader
@@ -125,15 +151,25 @@ public class CFDLeaderBalancerTest {
regionReplicaSetMap.put(regionReplicaSet.getRegionId(), regionReplicaSet);
Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>();
regionLeaderMap.put(regionReplicaSet.getRegionId(), 1);
- Set<Integer> disabledDataNodeSet = new HashSet<>();
- disabledDataNodeSet.add(0);
- disabledDataNodeSet.add(1);
- disabledDataNodeSet.add(2);
+ Map<Integer, NodeStatistics> nodeStatisticsMap = new TreeMap<>();
+ nodeStatisticsMap.put(0, new NodeStatistics(NodeStatus.Unknown));
+ nodeStatisticsMap.put(1, new NodeStatistics(NodeStatus.ReadOnly));
+ nodeStatisticsMap.put(2, new NodeStatistics(NodeStatus.Removing));
+ Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap
= new TreeMap<>();
+ Map<Integer, RegionStatistics> regionStatistics = new TreeMap<>();
+ regionStatistics.put(0, new RegionStatistics(RegionStatus.Running));
+ regionStatistics.put(1, new RegionStatistics(RegionStatus.Running));
+ regionStatistics.put(2, new RegionStatistics(RegionStatus.Running));
+ regionStatisticsMap.put(regionReplicaSet.getRegionId(), regionStatistics);
// Do balancing
Map<TConsensusGroupId, Integer> leaderDistribution =
BALANCER.generateOptimalLeaderDistribution(
- databaseRegionGroupMap, regionReplicaSetMap, regionLeaderMap,
disabledDataNodeSet);
+ databaseRegionGroupMap,
+ regionReplicaSetMap,
+ regionLeaderMap,
+ nodeStatisticsMap,
+ regionStatisticsMap);
Assert.assertEquals(1, leaderDistribution.size());
Assert.assertEquals(1, new HashSet<>(leaderDistribution.values()).size());
// Leader remains the same
@@ -146,6 +182,55 @@ public class CFDLeaderBalancerTest {
Assert.assertEquals(0, BALANCER.getMinimumCost());
}
+ @Test
+ public void migrateTest() {
+ // All DataNodes are in Running status
+ Map<Integer, NodeStatistics> nodeStatisticsMap = new TreeMap<>();
+ nodeStatisticsMap.put(0, new NodeStatistics(NodeStatus.Running));
+ nodeStatisticsMap.put(1, new NodeStatistics(NodeStatus.Running));
+ // Prepare RegionGroups
+ Map<String, List<TConsensusGroupId>> databaseRegionGroupMap = new
TreeMap<>();
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new
TreeMap<>();
+ Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>();
+ Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap
= new TreeMap<>();
+ for (int i = 0; i < 5; i++) {
+ TConsensusGroupId regionGroupId = new
TConsensusGroupId(TConsensusGroupType.DataRegion, i);
+ databaseRegionGroupMap
+ .computeIfAbsent(DATABASE, empty -> new ArrayList<>())
+ .add(regionGroupId);
+ TRegionReplicaSet regionReplicaSet =
+ new TRegionReplicaSet(
+ regionGroupId,
+ Arrays.asList(
+ new TDataNodeLocation().setDataNodeId(0),
+ new TDataNodeLocation().setDataNodeId(1)));
+ regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
+ regionLeaderMap.put(regionGroupId, 0);
+ // Assuming all Regions are migrating from DataNode-1 to DataNode-2
+ Map<Integer, RegionStatistics> regionStatistics = new TreeMap<>();
+ regionStatistics.put(0, new RegionStatistics(RegionStatus.Removing));
+ regionStatistics.put(1, new RegionStatistics(RegionStatus.Running));
+ regionStatisticsMap.put(regionGroupId, regionStatistics);
+ }
+
+ // Do balancing
+ Map<TConsensusGroupId, Integer> leaderDistribution =
+ BALANCER.generateOptimalLeaderDistribution(
+ databaseRegionGroupMap,
+ regionReplicaSetMap,
+ regionLeaderMap,
+ nodeStatisticsMap,
+ regionStatisticsMap);
+ for (int i = 0; i < 5; i++) {
+ // All RegionGroups' leader should be DataNode-1
+ Assert.assertEquals(
+ 1,
+ leaderDistribution
+ .get(new TConsensusGroupId(TConsensusGroupType.DataRegion, i))
+ .intValue());
+ }
+ }
+
/**
* In this case shows the balance ability for big cluster.
*
@@ -156,6 +241,10 @@ public class CFDLeaderBalancerTest {
final int regionGroupNum = 1500;
final int dataNodeNum = 300;
final int replicationFactor = 3;
+ Map<Integer, NodeStatistics> dataNodeStatisticsMap = new TreeMap<>();
+ for (int i = 0; i < dataNodeNum; i++) {
+ dataNodeStatisticsMap.put(i, new NodeStatistics(NodeStatus.Running));
+ }
// The loadCost for each DataNode are the same
int x = regionGroupNum / dataNodeNum;
@@ -168,16 +257,20 @@ public class CFDLeaderBalancerTest {
databaseRegionGroupMap.put(DATABASE, new ArrayList<>());
Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new
TreeMap<>();
Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>();
+ Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap
= new TreeMap<>();
for (int i = 0; i < regionGroupNum; i++) {
TConsensusGroupId regionGroupId = new
TConsensusGroupId(TConsensusGroupType.DataRegion, i);
int leaderId = (dataNodeId + random.nextInt(replicationFactor)) %
dataNodeNum;
TRegionReplicaSet regionReplicaSet = new TRegionReplicaSet();
regionReplicaSet.setRegionId(regionGroupId);
+ Map<Integer, RegionStatistics> regionStatistics = new TreeMap<>();
for (int j = 0; j < 3; j++) {
regionReplicaSet.addToDataNodeLocations(new
TDataNodeLocation().setDataNodeId(dataNodeId));
+ regionStatistics.put(dataNodeId, new
RegionStatistics(RegionStatus.Running));
dataNodeId = (dataNodeId + 1) % dataNodeNum;
}
+ regionStatisticsMap.put(regionGroupId, regionStatistics);
databaseRegionGroupMap.get(DATABASE).add(regionGroupId);
regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
@@ -187,7 +280,11 @@ public class CFDLeaderBalancerTest {
// Do balancing
Map<TConsensusGroupId, Integer> leaderDistribution =
BALANCER.generateOptimalLeaderDistribution(
- databaseRegionGroupMap, regionReplicaSetMap, regionLeaderMap, new
HashSet<>());
+ databaseRegionGroupMap,
+ regionReplicaSetMap,
+ regionLeaderMap,
+ dataNodeStatisticsMap,
+ regionStatisticsMap);
// All RegionGroup got a leader
Assert.assertEquals(regionGroupNum, leaderDistribution.size());
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java
index a443fe74c02..6b97e1b385b 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java
@@ -23,16 +23,18 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics;
import org.junit.Assert;
import org.junit.Test;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.Random;
-import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -43,42 +45,56 @@ public class GreedyLeaderBalancerTest {
@Test
public void optimalLeaderDistributionTest() {
- Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new
HashMap<>();
- Map<TConsensusGroupId, Integer> regionLeaderMap = new HashMap<>();
- Set<Integer> disabledDataNodeSet = new HashSet<>();
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new
TreeMap<>();
+ Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>();
+ Map<Integer, NodeStatistics> dataNodeStatisticsMap = new TreeMap<>();
+ Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap
= new TreeMap<>();
Random random = new Random();
+ // Assuming all DataNodes are in Running status
+ for (int i = 0; i < 6; i++) {
+ dataNodeStatisticsMap.put(i, new NodeStatistics(NodeStatus.Running));
+ }
+
// Build 9 RegionGroups in DataNodes 0~2
for (int i = 0; i < 9; i++) {
TConsensusGroupId regionGroupId = new
TConsensusGroupId(TConsensusGroupType.DataRegion, i);
- TRegionReplicaSet regionReplicaSet =
- new TRegionReplicaSet(
- regionGroupId,
- Arrays.asList(
- new TDataNodeLocation().setDataNodeId(0),
- new TDataNodeLocation().setDataNodeId(1),
- new TDataNodeLocation().setDataNodeId(2)));
+ List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
+ Map<Integer, RegionStatistics> regionStatistics = new TreeMap<>();
+ for (int j = 0; j < 3; j++) {
+ dataNodeLocations.add(new TDataNodeLocation().setDataNodeId(j));
+ // Assuming all Regions are in Running status
+ regionStatistics.put(j, new RegionStatistics(RegionStatus.Running));
+ }
+ TRegionReplicaSet regionReplicaSet = new
TRegionReplicaSet(regionGroupId, dataNodeLocations);
regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
regionLeaderMap.put(regionGroupId, random.nextInt(3));
+ regionStatisticsMap.put(regionGroupId, regionStatistics);
}
// Build 9 RegionGroups in DataNodes 3~5
for (int i = 9; i < 18; i++) {
TConsensusGroupId regionGroupId = new
TConsensusGroupId(TConsensusGroupType.DataRegion, i);
- TRegionReplicaSet regionReplicaSet =
- new TRegionReplicaSet(
- regionGroupId,
- Arrays.asList(
- new TDataNodeLocation().setDataNodeId(3),
- new TDataNodeLocation().setDataNodeId(4),
- new TDataNodeLocation().setDataNodeId(5)));
+ List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
+ Map<Integer, RegionStatistics> regionStatistics = new TreeMap<>();
+ for (int j = 3; j < 6; j++) {
+ dataNodeLocations.add(new TDataNodeLocation().setDataNodeId(j));
+ // Assuming all Regions are in Running status
+ regionStatistics.put(j, new RegionStatistics(RegionStatus.Running));
+ }
+ TRegionReplicaSet regionReplicaSet = new
TRegionReplicaSet(regionGroupId, dataNodeLocations);
regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
regionLeaderMap.put(regionGroupId, 3 + random.nextInt(3));
+ regionStatisticsMap.put(regionGroupId, regionStatistics);
}
Map<TConsensusGroupId, Integer> leaderDistribution =
BALANCER.generateOptimalLeaderDistribution(
- new TreeMap<>(), regionReplicaSetMap, regionLeaderMap,
disabledDataNodeSet);
+ new TreeMap<>(),
+ regionReplicaSetMap,
+ regionLeaderMap,
+ dataNodeStatisticsMap,
+ regionStatisticsMap);
Map<Integer, AtomicInteger> leaderCounter = new ConcurrentHashMap<>();
leaderDistribution.forEach(
(regionGroupId, leaderId) ->
@@ -94,44 +110,60 @@ public class GreedyLeaderBalancerTest {
@Test
public void disableTest() {
- Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new
HashMap<>();
- Map<TConsensusGroupId, Integer> regionLeaderMap = new HashMap<>();
- Set<Integer> disabledDataNodeSet = new HashSet<>();
-
- disabledDataNodeSet.add(1);
- disabledDataNodeSet.add(4);
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new
TreeMap<>();
+ Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>();
+ Map<Integer, NodeStatistics> dataNodeStatisticsMap = new TreeMap<>();
+ Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap
= new TreeMap<>();
+
+ // Assuming DataNode 1 and 4 are disabled
+ dataNodeStatisticsMap.put(0, new NodeStatistics(NodeStatus.Running));
+ dataNodeStatisticsMap.put(1, new NodeStatistics(NodeStatus.Unknown));
+ dataNodeStatisticsMap.put(2, new NodeStatistics(NodeStatus.Running));
+ dataNodeStatisticsMap.put(3, new NodeStatistics(NodeStatus.Running));
+ dataNodeStatisticsMap.put(4, new NodeStatistics(NodeStatus.ReadOnly));
+ dataNodeStatisticsMap.put(5, new NodeStatistics(NodeStatus.Running));
// Build 10 RegionGroup whose leaders are all 1(Disabled)
for (int i = 0; i < 10; i++) {
TConsensusGroupId regionGroupId = new
TConsensusGroupId(TConsensusGroupType.DataRegion, i);
- TRegionReplicaSet regionReplicaSet =
- new TRegionReplicaSet(
- regionGroupId,
- Arrays.asList(
- new TDataNodeLocation().setDataNodeId(0),
- new TDataNodeLocation().setDataNodeId(1),
- new TDataNodeLocation().setDataNodeId(2)));
+ List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
+ Map<Integer, RegionStatistics> regionStatistics = new TreeMap<>();
+ for (int j = 0; j < 3; j++) {
+ dataNodeLocations.add(new TDataNodeLocation().setDataNodeId(j));
+ // Assuming all Regions in DataNode 1 are Unknown
+ regionStatistics.put(
+ j, new RegionStatistics(j == 1 ? RegionStatus.Unknown :
RegionStatus.Running));
+ }
+ TRegionReplicaSet regionReplicaSet = new
TRegionReplicaSet(regionGroupId, dataNodeLocations);
regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
regionLeaderMap.put(regionGroupId, 1);
+ regionStatisticsMap.put(regionGroupId, regionStatistics);
}
// Build 10 RegionGroup whose leaders are all 4(Disabled)
for (int i = 10; i < 20; i++) {
TConsensusGroupId regionGroupId = new
TConsensusGroupId(TConsensusGroupType.DataRegion, i);
- TRegionReplicaSet regionReplicaSet =
- new TRegionReplicaSet(
- regionGroupId,
- Arrays.asList(
- new TDataNodeLocation().setDataNodeId(3),
- new TDataNodeLocation().setDataNodeId(4),
- new TDataNodeLocation().setDataNodeId(5)));
+ List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
+ Map<Integer, RegionStatistics> regionStatistics = new TreeMap<>();
+ for (int j = 3; j < 6; j++) {
+ dataNodeLocations.add(new TDataNodeLocation().setDataNodeId(j));
+ // Assuming all Regions in DataNode 4 are ReadOnly
+ regionStatistics.put(
+ j, new RegionStatistics(j == 4 ? RegionStatus.ReadOnly :
RegionStatus.Running));
+ }
+ TRegionReplicaSet regionReplicaSet = new
TRegionReplicaSet(regionGroupId, dataNodeLocations);
regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
regionLeaderMap.put(regionGroupId, 4);
+ regionStatisticsMap.put(regionGroupId, regionStatistics);
}
Map<TConsensusGroupId, Integer> leaderDistribution =
BALANCER.generateOptimalLeaderDistribution(
- new TreeMap<>(), regionReplicaSetMap, regionLeaderMap,
disabledDataNodeSet);
+ new TreeMap<>(),
+ regionReplicaSetMap,
+ regionLeaderMap,
+ dataNodeStatisticsMap,
+ regionStatisticsMap);
Map<Integer, AtomicInteger> leaderCounter = new ConcurrentHashMap<>();
leaderDistribution.forEach(
(regionGroupId, leaderId) ->
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java
index 2a1c7093906..182b5edbce9 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java
@@ -23,6 +23,10 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics;
import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
@@ -88,6 +92,24 @@ public class LeaderBalancerComparisonTest {
// Basic test
Map<TConsensusGroupId, Integer> greedyLeaderDistribution = new
ConcurrentHashMap<>();
+ Map<Integer, NodeStatistics> allRunningDataNodeStatistics = new
TreeMap<>();
+ for (int i = 0; i < dataNodeNum; i++) {
+ allRunningDataNodeStatistics.put(i, new
NodeStatistics(NodeStatus.Running));
+ }
+ Map<TConsensusGroupId, Map<Integer, RegionStatistics>>
allRunningRegionStatistics =
+ new TreeMap<>();
+ regionReplicaSetMap.forEach(
+ (regionGroupId, regionReplicaSet) -> {
+ Map<Integer, RegionStatistics> regionStatistics = new TreeMap<>();
+ regionReplicaSet
+ .getDataNodeLocations()
+ .forEach(
+ dataNodeLocation ->
+ regionStatistics.put(
+ dataNodeLocation.getDataNodeId(),
+ new RegionStatistics(RegionStatus.Running)));
+ allRunningRegionStatistics.put(regionGroupId, regionStatistics);
+ });
Statistics greedyStatistics =
doBalancing(
dataNodeNum,
@@ -95,7 +117,8 @@ public class LeaderBalancerComparisonTest {
GREEDY_LEADER_BALANCER,
regionReplicaSetMap,
regionLeaderMap,
- new HashSet<>(),
+ allRunningDataNodeStatistics,
+ allRunningRegionStatistics,
greedyLeaderDistribution);
Map<TConsensusGroupId, Integer> mcfLeaderDistribution = new
ConcurrentHashMap<>();
Statistics mcfStatistics =
@@ -105,7 +128,8 @@ public class LeaderBalancerComparisonTest {
MIN_COST_FLOW_LEADER_BALANCER,
regionReplicaSetMap,
regionLeaderMap,
- new HashSet<>(),
+ allRunningDataNodeStatistics,
+ allRunningRegionStatistics,
mcfLeaderDistribution);
if (isCommandLineMode) {
LOGGER.info("[Basic test]");
@@ -126,6 +150,30 @@ public class LeaderBalancerComparisonTest {
}
disabledDataNodeSet.add(dataNodeId);
}
+ Map<Integer, NodeStatistics> disabledDataNodeStatistics = new
TreeMap<>();
+ for (int i = 0; i < dataNodeNum; i++) {
+ disabledDataNodeStatistics.put(
+ i,
+ disabledDataNodeSet.contains(i)
+ ? new NodeStatistics(NodeStatus.Unknown)
+ : new NodeStatistics(NodeStatus.Running));
+ }
+ Map<TConsensusGroupId, Map<Integer, RegionStatistics>>
disabledRegionStatistics =
+ new TreeMap<>();
+ regionReplicaSetMap.forEach(
+ (regionGroupId, regionReplicaSet) -> {
+ Map<Integer, RegionStatistics> regionStatistics = new TreeMap<>();
+ regionReplicaSet
+ .getDataNodeLocations()
+ .forEach(
+ dataNodeLocation ->
+ regionStatistics.put(
+ dataNodeLocation.getDataNodeId(),
+
disabledDataNodeSet.contains(dataNodeLocation.getDataNodeId())
+ ? new RegionStatistics(RegionStatus.Unknown)
+ : new RegionStatistics(RegionStatus.Running)));
+ disabledRegionStatistics.put(regionGroupId, regionStatistics);
+ });
greedyStatistics =
doBalancing(
dataNodeNum,
@@ -133,7 +181,8 @@ public class LeaderBalancerComparisonTest {
GREEDY_LEADER_BALANCER,
regionReplicaSetMap,
greedyLeaderDistribution,
- disabledDataNodeSet,
+ disabledDataNodeStatistics,
+ disabledRegionStatistics,
greedyLeaderDistribution);
mcfStatistics =
doBalancing(
@@ -142,7 +191,8 @@ public class LeaderBalancerComparisonTest {
MIN_COST_FLOW_LEADER_BALANCER,
regionReplicaSetMap,
mcfLeaderDistribution,
- disabledDataNodeSet,
+ disabledDataNodeStatistics,
+ disabledRegionStatistics,
mcfLeaderDistribution);
if (isCommandLineMode) {
LOGGER.info("[Disaster test]");
@@ -161,7 +211,8 @@ public class LeaderBalancerComparisonTest {
GREEDY_LEADER_BALANCER,
regionReplicaSetMap,
greedyLeaderDistribution,
- new HashSet<>(),
+ allRunningDataNodeStatistics,
+ allRunningRegionStatistics,
greedyLeaderDistribution);
mcfStatistics =
doBalancing(
@@ -170,7 +221,8 @@ public class LeaderBalancerComparisonTest {
MIN_COST_FLOW_LEADER_BALANCER,
regionReplicaSetMap,
mcfLeaderDistribution,
- new HashSet<>(),
+ allRunningDataNodeStatistics,
+ allRunningRegionStatistics,
mcfLeaderDistribution);
if (isCommandLineMode) {
LOGGER.info("[Recovery test]");
@@ -254,10 +306,11 @@ public class LeaderBalancerComparisonTest {
private Statistics doBalancing(
int dataNodeNum,
int regionGroupNum,
- ILeaderBalancer leaderBalancer,
+ AbstractLeaderBalancer leaderBalancer,
Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
Map<TConsensusGroupId, Integer> regionLeaderMap,
- Set<Integer> disabledDataNodeSet,
+ Map<Integer, NodeStatistics> nodeStatisticsMap,
+ Map<TConsensusGroupId, Map<Integer, RegionStatistics>>
regionStatisticsMap,
Map<TConsensusGroupId, Integer> stableLeaderDistribution) {
Statistics result = new Statistics();
@@ -266,7 +319,11 @@ public class LeaderBalancerComparisonTest {
for (int rounds = 0; rounds < 1000; rounds++) {
Map<TConsensusGroupId, Integer> currentDistribution =
leaderBalancer.generateOptimalLeaderDistribution(
- new TreeMap<>(), regionReplicaSetMap, lastDistribution,
disabledDataNodeSet);
+ new TreeMap<>(),
+ regionReplicaSetMap,
+ lastDistribution,
+ nodeStatisticsMap,
+ regionStatisticsMap);
if (currentDistribution.equals(lastDistribution)) {
// The leader distribution is stable
result.rounds = rounds;