This is an automated email from the ASF dual-hosted git repository. yongzao pushed a commit to branch leader-balance-constraint in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3db04ac8f33d05c5cbeb7d3777e91c4317eb26a8 Author: YongzaoDan <[email protected]> AuthorDate: Thu Apr 18 22:13:46 2024 +0800 Finish --- .../iotdb/confignode/conf/ConfigNodeConfig.java | 4 +- .../confignode/conf/ConfigNodeDescriptor.java | 6 +- .../confignode/conf/ConfigNodeStartupCheck.java | 6 +- .../manager/load/balancer/RouteBalancer.java | 17 +-- .../router/leader/AbstractLeaderBalancer.java | 108 ++++++++++++++++++ .../router/leader/GreedyLeaderBalancer.java | 57 ++++------ .../balancer/router/leader/ILeaderBalancer.java | 49 -------- .../router/leader/MinCostFlowLeaderBalancer.java | 123 ++++++++------------- .../confignode/manager/load/cache/LoadCache.java | 59 +++++++++- .../manager/load/cache/node/NodeStatistics.java | 9 ++ .../load/cache/region/RegionStatistics.java | 7 ++ .../router/leader/CFDLeaderBalancerTest.java | 117 ++++++++++++++++++-- .../router/leader/GreedyLeaderBalancerTest.java | 118 +++++++++++++------- .../leader/LeaderBalancerComparisonTest.java | 75 +++++++++++-- 14 files changed, 509 insertions(+), 246 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java index b20f1e87d2f..3875bf09080 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java @@ -23,7 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProperty; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer; -import org.apache.iotdb.confignode.manager.load.balancer.router.leader.ILeaderBalancer; +import org.apache.iotdb.confignode.manager.load.balancer.router.leader.AbstractLeaderBalancer; import org.apache.iotdb.confignode.manager.load.balancer.router.priority.IPriorityBalancer; import org.apache.iotdb.confignode.manager.partition.RegionGroupExtensionPolicy; import org.apache.iotdb.consensus.ConsensusFactory; @@ -187,7 +187,7 @@ public class ConfigNodeConfig { private long unknownDataNodeDetectInterval = heartbeatIntervalInMs; /** The policy of cluster RegionGroups' leader distribution. */ - private String leaderDistributionPolicy = ILeaderBalancer.CFD_POLICY; + private String leaderDistributionPolicy = AbstractLeaderBalancer.CFD_POLICY; /** Whether to enable auto leader balance for Ratis consensus protocol. */ private boolean enableAutoLeaderBalanceForRatisConsensus = true; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java index 1bad7b48c5c..33b8ec69f8e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java @@ -26,7 +26,7 @@ import org.apache.iotdb.commons.exception.BadNodeUrlException; import org.apache.iotdb.commons.schema.SchemaConstant; import org.apache.iotdb.commons.utils.NodeUrlUtils; import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer; -import org.apache.iotdb.confignode.manager.load.balancer.router.leader.ILeaderBalancer; +import org.apache.iotdb.confignode.manager.load.balancer.router.leader.AbstractLeaderBalancer; import org.apache.iotdb.confignode.manager.load.balancer.router.priority.IPriorityBalancer; import org.apache.iotdb.confignode.manager.partition.RegionGroupExtensionPolicy; import org.apache.iotdb.metrics.config.MetricConfigDescriptor; @@ -354,8 +354,8 @@ public class ConfigNodeDescriptor { properties .getProperty("leader_distribution_policy", conf.getLeaderDistributionPolicy()) .trim(); - if (ILeaderBalancer.GREEDY_POLICY.equals(leaderDistributionPolicy) - || ILeaderBalancer.CFD_POLICY.equals(leaderDistributionPolicy)) { + if (AbstractLeaderBalancer.GREEDY_POLICY.equals(leaderDistributionPolicy) + || AbstractLeaderBalancer.CFD_POLICY.equals(leaderDistributionPolicy)) { conf.setLeaderDistributionPolicy(leaderDistributionPolicy); } else { throw new IOException( diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java index 1d8bf58259e..3e82e0bb4a6 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java @@ -25,7 +25,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.ConfigurationException; import org.apache.iotdb.commons.exception.StartupException; import org.apache.iotdb.commons.service.StartupChecks; -import org.apache.iotdb.confignode.manager.load.balancer.router.leader.ILeaderBalancer; +import org.apache.iotdb.confignode.manager.load.balancer.router.leader.AbstractLeaderBalancer; import org.apache.iotdb.confignode.manager.load.balancer.router.priority.IPriorityBalancer; import org.apache.iotdb.consensus.ConsensusFactory; @@ -144,8 +144,8 @@ public class ConfigNodeStartupCheck extends StartupChecks { } // The leader distribution policy is limited - if (!ILeaderBalancer.GREEDY_POLICY.equals(CONF.getLeaderDistributionPolicy()) - && !ILeaderBalancer.CFD_POLICY.equals(CONF.getLeaderDistributionPolicy())) { + if (!AbstractLeaderBalancer.GREEDY_POLICY.equals(CONF.getLeaderDistributionPolicy()) + && !AbstractLeaderBalancer.CFD_POLICY.equals(CONF.getLeaderDistributionPolicy())) { throw new ConfigurationException( "leader_distribution_policy", CONF.getRoutePriorityPolicy(), diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java index 7269defdbe6..4896885e956 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java @@ -33,8 +33,8 @@ import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.manager.IManager; import org.apache.iotdb.confignode.manager.ProcedureManager; import org.apache.iotdb.confignode.manager.load.LoadManager; +import org.apache.iotdb.confignode.manager.load.balancer.router.leader.AbstractLeaderBalancer; import org.apache.iotdb.confignode.manager.load.balancer.router.leader.GreedyLeaderBalancer; -import org.apache.iotdb.confignode.manager.load.balancer.router.leader.ILeaderBalancer; import org.apache.iotdb.confignode.manager.load.balancer.router.leader.MinCostFlowLeaderBalancer; import org.apache.iotdb.confignode.manager.load.balancer.router.priority.GreedyPriorityBalancer; import org.apache.iotdb.confignode.manager.load.balancer.router.priority.IPriorityBalancer; @@ -97,7 +97,7 @@ public class RouteBalancer implements IClusterStatusSubscriber { private final IManager configManager; // For generating optimal Region leader distribution - private final ILeaderBalancer leaderBalancer; + private final AbstractLeaderBalancer leaderBalancer; // For generating optimal cluster Region routing priority private final IPriorityBalancer priorityRouter; @@ -118,10 +118,10 @@ public class RouteBalancer implements IClusterStatusSubscriber { this.lastFailedTimeForLeaderBalance = new TreeMap<>(); switch (CONF.getLeaderDistributionPolicy()) { - case ILeaderBalancer.GREEDY_POLICY: + case AbstractLeaderBalancer.GREEDY_POLICY: this.leaderBalancer = new GreedyLeaderBalancer(); break; - case ILeaderBalancer.CFD_POLICY: + case AbstractLeaderBalancer.CFD_POLICY: default: this.leaderBalancer = new MinCostFlowLeaderBalancer(); break; @@ -157,13 +157,8 @@ public class RouteBalancer implements IClusterStatusSubscriber { getPartitionManager().getAllRegionGroupIdMap(regionGroupType), getPartitionManager().getAllReplicaSetsMap(regionGroupType), currentLeaderMap, - getNodeManager() - .filterDataNodeThroughStatus( - NodeStatus.Unknown, NodeStatus.ReadOnly, NodeStatus.Removing) - .stream() - .map(TDataNodeConfiguration::getLocation) - .map(TDataNodeLocation::getDataNodeId) - .collect(Collectors.toSet())); + getLoadManager().getLoadCache().getCurrentDataNodeStatisticsMap(), + getLoadManager().getLoadCache().getCurrentRegionStatisticsMap(regionGroupType)); // Transfer leader to the optimal distribution long currentTime = System.nanoTime(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java new file mode 100644 index 00000000000..b82b3a0ca48 --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.load.balancer.router.leader; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.commons.cluster.RegionStatus; +import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics; +import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics; + +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +public abstract class AbstractLeaderBalancer { + + public static final String GREEDY_POLICY = "GREEDY"; + public static final String CFD_POLICY = "CFD"; + + // Map<Database, List<RegionGroup>> + protected final Map<String, List<TConsensusGroupId>> databaseRegionGroupMap; + // Map<RegionGroupId, RegionGroup> + protected final Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap; + // Map<RegionGroupId, leaderId> + protected final Map<TConsensusGroupId, Integer> regionLeaderMap; + // Map<DataNodeId, NodeStatistics> + protected final Map<Integer, NodeStatistics> dataNodeStatisticsMap; + // Map<RegionGroupId, Map<DataNodeId, RegionStatistics>> + protected final Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap; + + protected AbstractLeaderBalancer() { + this.databaseRegionGroupMap = new TreeMap<>(); + this.regionReplicaSetMap = new TreeMap<>(); + this.regionLeaderMap = new TreeMap<>(); + this.dataNodeStatisticsMap = new TreeMap<>(); + this.regionStatisticsMap = new TreeMap<>(); + } + + protected void initialize( + Map<String, List<TConsensusGroupId>> databaseRegionGroupMap, + Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap, + Map<TConsensusGroupId, Integer> regionLeaderMap, + Map<Integer, NodeStatistics> dataNodeStatisticsMap, + Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap) { + this.databaseRegionGroupMap.putAll(databaseRegionGroupMap); + this.regionReplicaSetMap.putAll(regionReplicaSetMap); + this.regionLeaderMap.putAll(regionLeaderMap); + this.dataNodeStatisticsMap.putAll(dataNodeStatisticsMap); + this.regionStatisticsMap.putAll(regionStatisticsMap); + } + + protected boolean isDataNodeAvailable(int dataNodeId) { + // RegionGroup-leader can only be placed on Running DataNode + return dataNodeStatisticsMap.containsKey(dataNodeId) + && NodeStatus.Running.equals(dataNodeStatisticsMap.get(dataNodeId).getStatus()); + } + + protected boolean isRegionAvailable(TConsensusGroupId regionGroupId, int dataNodeId) { + // Only Running Region can be selected as RegionGroup-leader + return regionStatisticsMap.containsKey(regionGroupId) + && regionStatisticsMap.get(regionGroupId).containsKey(dataNodeId) + && RegionStatus.Running.equals( + regionStatisticsMap.get(regionGroupId).get(dataNodeId).getRegionStatus()); + } + + protected void clear() { + this.databaseRegionGroupMap.clear(); + this.regionReplicaSetMap.clear(); + this.regionLeaderMap.clear(); + this.dataNodeStatisticsMap.clear(); + this.regionStatisticsMap.clear(); + } + + /** + * Generate an optimal leader distribution. + * + * @param databaseRegionGroupMap RegionGroup held by each Database + * @param regionReplicaSetMap All RegionGroups the cluster currently have + * @param regionLeaderMap The current leader distribution of each RegionGroup + * @param dataNodeStatisticsMap The current statistics of each DataNode + * @param regionStatisticsMap The current statistics of each Region + * @return Map<TConsensusGroupId, Integer>, The optimal leader distribution + */ + public abstract Map<TConsensusGroupId, Integer> generateOptimalLeaderDistribution( + Map<String, List<TConsensusGroupId>> databaseRegionGroupMap, + Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap, + Map<TConsensusGroupId, Integer> regionLeaderMap, + Map<Integer, NodeStatistics> dataNodeStatisticsMap, + Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap); +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java index 4a28fb6ca31..5a6098b60f9 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java @@ -22,26 +22,19 @@ package org.apache.iotdb.confignode.manager.load.balancer.router.leader; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics; +import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; /** Leader distribution balancer that uses greedy algorithm */ -public class GreedyLeaderBalancer implements ILeaderBalancer { - - private final Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap; - private final Map<TConsensusGroupId, Integer> regionLeaderMap; - private final Set<Integer> disabledDataNodeSet; +public class GreedyLeaderBalancer extends AbstractLeaderBalancer { public GreedyLeaderBalancer() { - this.regionReplicaSetMap = new HashMap<>(); - this.regionLeaderMap = new ConcurrentHashMap<>(); - this.disabledDataNodeSet = new HashSet<>(); + super(); } @Override @@ -49,30 +42,19 @@ public class GreedyLeaderBalancer implements ILeaderBalancer { Map<String, List<TConsensusGroupId>> databaseRegionGroupMap, Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap, Map<TConsensusGroupId, Integer> regionLeaderMap, - Set<Integer> disabledDataNodeSet) { - initialize(regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet); - + Map<Integer, NodeStatistics> dataNodeStatisticsMap, + Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap) { + initialize( + databaseRegionGroupMap, + regionReplicaSetMap, + regionLeaderMap, + dataNodeStatisticsMap, + regionStatisticsMap); Map<TConsensusGroupId, Integer> result = constructGreedyDistribution(); - clear(); return result; } - private void initialize( - Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap, - Map<TConsensusGroupId, Integer> regionLeaderMap, - Set<Integer> disabledDataNodeSet) { - this.regionReplicaSetMap.putAll(regionReplicaSetMap); - this.regionLeaderMap.putAll(regionLeaderMap); - this.disabledDataNodeSet.addAll(disabledDataNodeSet); - } - - private void clear() { - this.regionReplicaSetMap.clear(); - this.regionLeaderMap.clear(); - this.disabledDataNodeSet.clear(); - } - private Map<TConsensusGroupId, Integer> constructGreedyDistribution() { Map<Integer, Integer> leaderCounter = new TreeMap<>(); regionReplicaSetMap.forEach( @@ -81,14 +63,13 @@ public class GreedyLeaderBalancer implements ILeaderBalancer { leaderId = regionLeaderMap.getOrDefault(regionGroupId, -1); for (TDataNodeLocation dataNodeLocation : regionGroup.getDataNodeLocations()) { int dataNodeId = dataNodeLocation.getDataNodeId(); - if (disabledDataNodeSet.contains(dataNodeId)) { - continue; - } - // Select the DataNode with the minimal leader count as the new leader - int count = leaderCounter.getOrDefault(dataNodeId, 0); - if (count < minCount) { - minCount = count; - leaderId = dataNodeId; + if (isDataNodeAvailable(dataNodeId) && isRegionAvailable(regionGroupId, dataNodeId)) { + // Select the DataNode with the minimal leader count as the new leader + int count = leaderCounter.getOrDefault(dataNodeId, 0); + if (count < minCount) { + minCount = count; + leaderId = dataNodeId; + } } } regionLeaderMap.put(regionGroupId, leaderId); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/ILeaderBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/ILeaderBalancer.java deleted file mode 100644 index 65c45aaebab..00000000000 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/ILeaderBalancer.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.confignode.manager.load.balancer.router.leader; - -import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; -import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; - -import java.util.List; -import java.util.Map; -import java.util.Set; - -public interface ILeaderBalancer { - - String GREEDY_POLICY = "GREEDY"; - String CFD_POLICY = "CFD"; - - /** - * Generate an optimal leader distribution. - * - * @param databaseRegionGroupMap RegionGroup held by each Database - * @param regionReplicaSetMap All RegionGroups the cluster currently have - * @param regionLeaderMap The current leader of each RegionGroup - * @param disabledDataNodeSet The DataNodes that currently unable to work(can't place - * RegionGroup-leader) - * @return Map<TConsensusGroupId, Integer>, The optimal leader distribution - */ - Map<TConsensusGroupId, Integer> generateOptimalLeaderDistribution( - Map<String, List<TConsensusGroupId>> databaseRegionGroupMap, - Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap, - Map<TConsensusGroupId, Integer> regionLeaderMap, - Set<Integer> disabledDataNodeSet); -} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java index 48aa8cc5547..daf1d15f27c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java @@ -23,6 +23,8 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.commons.utils.TestOnly; +import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics; +import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics; import java.util.ArrayList; import java.util.Arrays; @@ -30,23 +32,14 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; -import java.util.Set; import java.util.TreeMap; -import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; /** Leader distribution balancer that uses minimum cost flow algorithm */ -public class MinCostFlowLeaderBalancer implements ILeaderBalancer { +public class MinCostFlowLeaderBalancer extends AbstractLeaderBalancer { private static final int INFINITY = Integer.MAX_VALUE; - /** Input parameters */ - private final Map<String, List<TConsensusGroupId>> databaseRegionGroupMap; - - private final Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap; - private final Map<TConsensusGroupId, Integer> regionLeaderMap; - private final Set<Integer> disabledDataNodeSet; - /** Graph nodes */ // Super source node private static final int S_NODE = 0; @@ -78,10 +71,7 @@ public class MinCostFlowLeaderBalancer implements ILeaderBalancer { private int minimumCost = 0; public MinCostFlowLeaderBalancer() { - this.databaseRegionGroupMap = new TreeMap<>(); - this.regionReplicaSetMap = new TreeMap<>(); - this.regionLeaderMap = new TreeMap<>(); - this.disabledDataNodeSet = new TreeSet<>(); + super(); this.rNodeMap = new TreeMap<>(); this.sDNodeMap = new TreeMap<>(); this.sDNodeReflect = new TreeMap<>(); @@ -94,47 +84,34 @@ public class MinCostFlowLeaderBalancer implements ILeaderBalancer { Map<String, List<TConsensusGroupId>> databaseRegionGroupMap, Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap, Map<TConsensusGroupId, Integer> regionLeaderMap, - Set<Integer> disabledDataNodeSet) { - - initialize(databaseRegionGroupMap, regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet); - + Map<Integer, NodeStatistics> dataNodeStatisticsMap, + Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap) { + initialize( + databaseRegionGroupMap, + regionReplicaSetMap, + regionLeaderMap, + dataNodeStatisticsMap, + regionStatisticsMap); Map<TConsensusGroupId, Integer> result; constructMCFGraph(); dinicAlgorithm(); result = collectLeaderDistribution(); - clear(); return result; } - private void initialize( - Map<String, List<TConsensusGroupId>> databaseRegionGroupMap, - Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap, - Map<TConsensusGroupId, Integer> regionLeaderMap, - Set<Integer> disabledDataNodeSet) { - this.databaseRegionGroupMap.putAll(databaseRegionGroupMap); - this.regionReplicaSetMap.putAll(regionReplicaSetMap); - this.regionLeaderMap.putAll(regionLeaderMap); - this.disabledDataNodeSet.addAll(disabledDataNodeSet); - } - - private void clear() { - this.databaseRegionGroupMap.clear(); - this.regionReplicaSetMap.clear(); - this.regionLeaderMap.clear(); - this.disabledDataNodeSet.clear(); - + @Override + protected void clear() { + super.clear(); this.rNodeMap.clear(); this.sDNodeMap.clear(); this.sDNodeReflect.clear(); this.tDNodeMap.clear(); this.minCostFlowEdges.clear(); - this.nodeHeadEdge = null; this.nodeCurrentEdge = null; this.isNodeVisited = null; this.nodeMinimumCost = null; - this.maxNode = T_NODE + 1; this.maxEdge = 0; } @@ -155,18 +132,16 @@ public class MinCostFlowLeaderBalancer implements ILeaderBalancer { for (TDataNodeLocation dataNodeLocation : regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) { int dataNodeId = dataNodeLocation.getDataNodeId(); - if (disabledDataNodeSet.contains(dataNodeId)) { - // Skip disabled DataNode - continue; - } - if (!sDNodeMap.get(database).containsKey(dataNodeId)) { - sDNodeMap.get(database).put(dataNodeId, maxNode); - sDNodeReflect.get(database).put(maxNode, dataNodeId); - maxNode += 1; - } - if (!tDNodeMap.containsKey(dataNodeId)) { - tDNodeMap.put(dataNodeId, maxNode); - maxNode += 1; + if (isDataNodeAvailable(dataNodeId)) { + if (!sDNodeMap.get(database).containsKey(dataNodeId)) { + sDNodeMap.get(database).put(dataNodeId, maxNode); + sDNodeReflect.get(database).put(maxNode, dataNodeId); + maxNode += 1; + } + if (!tDNodeMap.containsKey(dataNodeId)) { + tDNodeMap.put(dataNodeId, maxNode); + maxNode += 1; + } } } } @@ -194,15 +169,13 @@ public class MinCostFlowLeaderBalancer implements ILeaderBalancer { for (TDataNodeLocation dataNodeLocation : regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) { int dataNodeId = dataNodeLocation.getDataNodeId(); - if (disabledDataNodeSet.contains(dataNodeId)) { - // Skip disabled DataNode - continue; + if (isDataNodeAvailable(dataNodeId) && isRegionAvailable(regionGroupId, dataNodeId)) { + int sDNode = sDNodeMap.get(database).get(dataNodeId); + // Capacity: 1, Cost: 1 if sDNode is the current leader of the rNode, 0 otherwise. + // Therefore, the RegionGroup will keep the leader as constant as possible. + int cost = regionLeaderMap.getOrDefault(regionGroupId, -1) == dataNodeId ? 0 : 1; + addAdjacentEdges(rNode, sDNode, 1, cost); } - int sDNode = sDNodeMap.get(database).get(dataNodeId); - // Capacity: 1, Cost: 1 if sDNode is the current leader of the rNode, 0 otherwise. - // Therefore, the RegionGroup will keep the leader as constant as possible. - int cost = regionLeaderMap.getOrDefault(regionGroupId, -1) == dataNodeId ? 0 : 1; - addAdjacentEdges(rNode, sDNode, 1, cost); } } } @@ -217,17 +190,15 @@ public class MinCostFlowLeaderBalancer implements ILeaderBalancer { for (TDataNodeLocation dataNodeLocation : regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) { int dataNodeId = dataNodeLocation.getDataNodeId(); - if (disabledDataNodeSet.contains(dataNodeId)) { - // Skip disabled DataNode - continue; + if (isDataNodeAvailable(dataNodeId)) { + int sDNode = sDNodeMap.get(database).get(dataNodeId); + int tDNode = tDNodeMap.get(dataNodeId); + int leaderCount = leaderCounter.merge(dataNodeId, 1, Integer::sum); + // Capacity: 1, Cost: x^2 for the x-th edge at the current sDNode. + // Thus, the leader distribution will be as balance as possible within each Database + // based on the Jensen's-Inequality. + addAdjacentEdges(sDNode, tDNode, 1, leaderCount * leaderCount); } - int sDNode = sDNodeMap.get(database).get(dataNodeId); - int tDNode = tDNodeMap.get(dataNodeId); - int leaderCount = leaderCounter.merge(dataNodeId, 1, Integer::sum); - // Capacity: 1, Cost: x^2 for the x-th edge at the current sDNode. - // Thus, the leader distribution will be as balance as possible within each Database - // based on the Jensen's-Inequality. - addAdjacentEdges(sDNode, tDNode, 1, leaderCount * leaderCount); } } } @@ -239,16 +210,14 @@ public class MinCostFlowLeaderBalancer implements ILeaderBalancer { for (TRegionReplicaSet regionReplicaSet : regionReplicaSetMap.values()) { for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) { int dataNodeId = dataNodeLocation.getDataNodeId(); - if (disabledDataNodeSet.contains(dataNodeId)) { - // Skip disabled DataNode - continue; + if (isDataNodeAvailable(dataNodeId)) { + int tDNode = tDNodeMap.get(dataNodeId); + int leaderCount = maxLeaderCounter.merge(dataNodeId, 1, Integer::sum); + // Cost: x^2 for the x-th edge at the current dNode. + // Thus, the leader distribution will be as balance as possible within the cluster + // Based on the Jensen's-Inequality. + addAdjacentEdges(tDNode, T_NODE, 1, leaderCount * leaderCount); } - int tDNode = tDNodeMap.get(dataNodeId); - int leaderCount = maxLeaderCounter.merge(dataNodeId, 1, Integer::sum); - // Cost: x^2 for the x-th edge at the current dNode. - // Thus, the leader distribution will be as balance as possible within the cluster - // Based on the Jensen's-Inequality. - addAdjacentEdges(tDNode, T_NODE, 1, leaderCount * leaderCount); } } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java index 6fae8010e22..be561937b9b 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java @@ -40,6 +40,7 @@ import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics; import org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupCache; import org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupStatistics; import org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample; +import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics; import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus; import org.slf4j.Logger; @@ -261,6 +262,22 @@ public class LoadCache { return nodeStatisticsMap; } + /** + * Get the NodeStatistics of all DataNodes. + * + * @return a map of all DataNodes' NodeStatistics + */ + public Map<Integer, NodeStatistics> getCurrentDataNodeStatisticsMap() { + Map<Integer, NodeStatistics> dataNodeStatisticsMap = new TreeMap<>(); + nodeCacheMap.forEach( + (nodeId, nodeCache) -> { + if (nodeCache instanceof DataNodeHeartbeatCache) { + dataNodeStatisticsMap.put(nodeId, (NodeStatistics) nodeCache.getCurrentStatistics()); + } + }); + return dataNodeStatisticsMap; + } + /** * Get the RegionGroupStatistics of all RegionGroups. * @@ -274,6 +291,30 @@ public class LoadCache { return regionGroupStatisticsMap; } + /** + * Get the RegionStatistics of all Regions. + * + * @param type DataRegion or SchemaRegion + * @return a map of RegionStatistics + */ + public Map<TConsensusGroupId, Map<Integer, RegionStatistics>> getCurrentRegionStatisticsMap( + TConsensusGroupType type) { + Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap = new TreeMap<>(); + regionGroupCacheMap.forEach( + (regionGroupId, regionGroupCache) -> { + if (type.equals(regionGroupId.getType())) { + regionStatisticsMap.put( + regionGroupId, regionGroupCache.getCurrentStatistics().getRegionStatisticsMap()); + } + }); + return regionStatisticsMap; + } + + /** + * Get the ConsensusGroupStatistics of all RegionGroups. + * + * @return a map of ConsensusGroupStatistics + */ public Map<TConsensusGroupId, ConsensusGroupStatistics> getCurrentConsensusGroupStatisticsMap() { Map<TConsensusGroupId, ConsensusGroupStatistics> consensusGroupStatisticsMap = new TreeMap<>(); consensusGroupCacheMap.forEach( @@ -294,6 +335,22 @@ public class LoadCache { return nodeCache == null ? NodeStatus.Unknown : nodeCache.getNodeStatus(); } + /** + * Get all DataNodes' NodeStatus + * + * @return Map<DataNodeId, NodeStatus> + */ + public Map<Integer, NodeStatus> getDataNodeStatus() { + Map<Integer, NodeStatus> nodeStatusMap = new TreeMap<>(); + nodeCacheMap.forEach( + (nodeId, nodeCache) -> { + if (nodeCache instanceof DataNodeHeartbeatCache) { + nodeStatusMap.put(nodeId, nodeCache.getNodeStatus()); + } + }); + return nodeStatusMap; + } + /** * Safely get the specified Node's current status with reason. * @@ -448,7 +505,7 @@ public class LoadCache { */ public Map<TConsensusGroupId, RegionGroupStatus> getRegionGroupStatus( List<TConsensusGroupId> consensusGroupIds) { - Map<TConsensusGroupId, RegionGroupStatus> regionGroupStatusMap = new ConcurrentHashMap<>(); + Map<TConsensusGroupId, RegionGroupStatus> regionGroupStatusMap = new TreeMap<>(); for (TConsensusGroupId consensusGroupId : consensusGroupIds) { regionGroupStatusMap.put(consensusGroupId, getRegionGroupStatus(consensusGroupId)); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeStatistics.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeStatistics.java index e3c8bd25dbb..24bee71ad0e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeStatistics.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeStatistics.java @@ -20,6 +20,7 @@ package org.apache.iotdb.confignode.manager.load.cache.node; import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.confignode.manager.load.cache.AbstractStatistics; import java.util.Objects; @@ -42,6 +43,14 @@ public class NodeStatistics extends AbstractStatistics { this.loadScore = loadScore; } + @TestOnly + public NodeStatistics(NodeStatus status) { + super(System.nanoTime()); + this.status = status; + this.statusReason = null; + this.loadScore = Long.MAX_VALUE; + } + public static NodeStatistics generateDefaultNodeStatistics() { return new NodeStatistics(Long.MIN_VALUE, NodeStatus.Unknown, null, Long.MAX_VALUE); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionStatistics.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionStatistics.java index 1246f0ed435..84f22ef4ac5 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionStatistics.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionStatistics.java @@ -20,6 +20,7 @@ package org.apache.iotdb.confignode.manager.load.cache.region; import org.apache.iotdb.commons.cluster.RegionStatus; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.confignode.manager.load.cache.AbstractStatistics; import java.util.Objects; @@ -34,6 +35,12 @@ public class RegionStatistics extends AbstractStatistics { this.regionStatus = regionStatus; } + @TestOnly + public RegionStatistics(RegionStatus regionStatus) { + super(System.nanoTime()); + this.regionStatus = regionStatus; + } + public static RegionStatistics generateDefaultRegionStatistics() { return new RegionStatistics(0, RegionStatus.Unknown); } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/CFDLeaderBalancerTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/CFDLeaderBalancerTest.java index 716fa55a80f..7cc1f50c28c 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/CFDLeaderBalancerTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/CFDLeaderBalancerTest.java @@ -23,6 +23,10 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.commons.cluster.RegionStatus; +import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics; +import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics; import org.junit.Assert; import org.junit.Test; @@ -35,7 +39,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Random; -import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -63,21 +66,37 @@ public class CFDLeaderBalancerTest { // The result will be unbalanced if select DataNode-2 as leader for RegionGroup-0 // and select DataNode-3 as leader for RegionGroup-1 List<TRegionReplicaSet> regionReplicaSets = new ArrayList<>(); + Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap = new TreeMap<>(); regionReplicaSets.add( new TRegionReplicaSet( regionGroupIds.get(0), Arrays.asList( dataNodeLocations.get(0), dataNodeLocations.get(1), dataNodeLocations.get(2)))); + Map<Integer, RegionStatistics> region0 = new TreeMap<>(); + region0.put(0, new RegionStatistics(RegionStatus.Unknown)); + region0.put(1, new RegionStatistics(RegionStatus.Running)); + region0.put(2, new RegionStatistics(RegionStatus.Running)); + regionStatisticsMap.put(regionGroupIds.get(0), region0); regionReplicaSets.add( new TRegionReplicaSet( regionGroupIds.get(1), Arrays.asList( dataNodeLocations.get(0), dataNodeLocations.get(1), dataNodeLocations.get(3)))); + Map<Integer, RegionStatistics> region1 = new TreeMap<>(); + region1.put(0, new RegionStatistics(RegionStatus.Unknown)); + region1.put(1, new RegionStatistics(RegionStatus.Running)); + region1.put(3, new RegionStatistics(RegionStatus.Running)); + regionStatisticsMap.put(regionGroupIds.get(1), region1); regionReplicaSets.add( new TRegionReplicaSet( regionGroupIds.get(2), Arrays.asList( dataNodeLocations.get(0), dataNodeLocations.get(2), dataNodeLocations.get(3)))); + Map<Integer, RegionStatistics> region2 = new TreeMap<>(); + region2.put(0, new RegionStatistics(RegionStatus.Unknown)); + region2.put(2, new RegionStatistics(RegionStatus.Running)); + region2.put(3, new RegionStatistics(RegionStatus.Running)); + regionStatisticsMap.put(regionGroupIds.get(2), region2); // Prepare input parameters Map<String, List<TConsensusGroupId>> databaseRegionGroupMap = new TreeMap<>(); @@ -89,13 +108,20 @@ public class CFDLeaderBalancerTest { Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>(); regionReplicaSets.forEach( regionReplicaSet -> regionLeaderMap.put(regionReplicaSet.getRegionId(), 0)); - Set<Integer> disabledDataNodeSet = new HashSet<>(); - disabledDataNodeSet.add(0); + Map<Integer, NodeStatistics> dataNodeStatisticsMap = new TreeMap<>(); + dataNodeStatisticsMap.put(0, new NodeStatistics(NodeStatus.Unknown)); + dataNodeStatisticsMap.put(1, new NodeStatistics(NodeStatus.Running)); + dataNodeStatisticsMap.put(2, new NodeStatistics(NodeStatus.Running)); + dataNodeStatisticsMap.put(3, new NodeStatistics(NodeStatus.Running)); // Do balancing Map<TConsensusGroupId, Integer> leaderDistribution = BALANCER.generateOptimalLeaderDistribution( - databaseRegionGroupMap, regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet); + databaseRegionGroupMap, + regionReplicaSetMap, + regionLeaderMap, + dataNodeStatisticsMap, + regionStatisticsMap); // All RegionGroup got a leader Assert.assertEquals(3, leaderDistribution.size()); // Each DataNode has exactly one leader @@ -125,15 +151,25 @@ public class CFDLeaderBalancerTest { regionReplicaSetMap.put(regionReplicaSet.getRegionId(), regionReplicaSet); Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>(); regionLeaderMap.put(regionReplicaSet.getRegionId(), 1); - Set<Integer> disabledDataNodeSet = new HashSet<>(); - disabledDataNodeSet.add(0); - disabledDataNodeSet.add(1); - disabledDataNodeSet.add(2); + Map<Integer, NodeStatistics> nodeStatisticsMap = new TreeMap<>(); + nodeStatisticsMap.put(0, new NodeStatistics(NodeStatus.Unknown)); + nodeStatisticsMap.put(1, new NodeStatistics(NodeStatus.ReadOnly)); + nodeStatisticsMap.put(2, new NodeStatistics(NodeStatus.Removing)); + Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap = new TreeMap<>(); + Map<Integer, RegionStatistics> regionStatistics = new TreeMap<>(); + regionStatistics.put(0, new RegionStatistics(RegionStatus.Running)); + regionStatistics.put(1, new RegionStatistics(RegionStatus.Running)); + regionStatistics.put(2, new RegionStatistics(RegionStatus.Running)); + regionStatisticsMap.put(regionReplicaSet.getRegionId(), regionStatistics); // Do balancing Map<TConsensusGroupId, Integer> leaderDistribution = BALANCER.generateOptimalLeaderDistribution( - databaseRegionGroupMap, regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet); + databaseRegionGroupMap, + regionReplicaSetMap, + regionLeaderMap, + nodeStatisticsMap, + regionStatisticsMap); Assert.assertEquals(1, leaderDistribution.size()); Assert.assertEquals(1, new HashSet<>(leaderDistribution.values()).size()); // Leader remains the same @@ -146,6 +182,55 @@ public class CFDLeaderBalancerTest { Assert.assertEquals(0, BALANCER.getMinimumCost()); } + @Test + public void migrateTest() { + // All DataNodes are in Running status + Map<Integer, NodeStatistics> nodeStatisticsMap = new TreeMap<>(); + nodeStatisticsMap.put(0, new NodeStatistics(NodeStatus.Running)); + nodeStatisticsMap.put(1, new NodeStatistics(NodeStatus.Running)); + // Prepare RegionGroups + Map<String, List<TConsensusGroupId>> databaseRegionGroupMap = new TreeMap<>(); + Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new TreeMap<>(); + Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>(); + Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap = new TreeMap<>(); + for (int i = 0; i < 5; i++) { + TConsensusGroupId regionGroupId = new TConsensusGroupId(TConsensusGroupType.DataRegion, i); + databaseRegionGroupMap + .computeIfAbsent(DATABASE, empty -> new ArrayList<>()) + .add(regionGroupId); + TRegionReplicaSet regionReplicaSet = + new TRegionReplicaSet( + regionGroupId, + Arrays.asList( + new TDataNodeLocation().setDataNodeId(0), + new TDataNodeLocation().setDataNodeId(1))); + regionReplicaSetMap.put(regionGroupId, regionReplicaSet); + regionLeaderMap.put(regionGroupId, 0); + // Assuming all Regions are migrating from DataNode-1 to DataNode-2 + Map<Integer, RegionStatistics> regionStatistics = new TreeMap<>(); + regionStatistics.put(0, new RegionStatistics(RegionStatus.Removing)); + regionStatistics.put(1, new RegionStatistics(RegionStatus.Running)); + regionStatisticsMap.put(regionGroupId, regionStatistics); + } + + // Do balancing + Map<TConsensusGroupId, Integer> leaderDistribution = + BALANCER.generateOptimalLeaderDistribution( + databaseRegionGroupMap, + regionReplicaSetMap, + regionLeaderMap, + nodeStatisticsMap, + regionStatisticsMap); + for (int i = 0; i < 5; i++) { + // All RegionGroups' leader should be DataNode-1 + Assert.assertEquals( + 1, + leaderDistribution + .get(new TConsensusGroupId(TConsensusGroupType.DataRegion, i)) + .intValue()); + } + } + /** * In this case shows the balance ability for big cluster. * @@ -156,6 +241,10 @@ public class CFDLeaderBalancerTest { final int regionGroupNum = 1500; final int dataNodeNum = 300; final int replicationFactor = 3; + Map<Integer, NodeStatistics> dataNodeStatisticsMap = new TreeMap<>(); + for (int i = 0; i < dataNodeNum; i++) { + dataNodeStatisticsMap.put(i, new NodeStatistics(NodeStatus.Running)); + } // The loadCost for each DataNode are the same int x = regionGroupNum / dataNodeNum; @@ -168,16 +257,20 @@ public class CFDLeaderBalancerTest { databaseRegionGroupMap.put(DATABASE, new ArrayList<>()); Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new TreeMap<>(); Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>(); + Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap = new TreeMap<>(); for (int i = 0; i < regionGroupNum; i++) { TConsensusGroupId regionGroupId = new TConsensusGroupId(TConsensusGroupType.DataRegion, i); int leaderId = (dataNodeId + random.nextInt(replicationFactor)) % dataNodeNum; TRegionReplicaSet regionReplicaSet = new TRegionReplicaSet(); regionReplicaSet.setRegionId(regionGroupId); + Map<Integer, RegionStatistics> regionStatistics = new TreeMap<>(); for (int j = 0; j < 3; j++) { regionReplicaSet.addToDataNodeLocations(new TDataNodeLocation().setDataNodeId(dataNodeId)); + regionStatistics.put(dataNodeId, new RegionStatistics(RegionStatus.Running)); dataNodeId = (dataNodeId + 1) % dataNodeNum; } + regionStatisticsMap.put(regionGroupId, regionStatistics); databaseRegionGroupMap.get(DATABASE).add(regionGroupId); regionReplicaSetMap.put(regionGroupId, regionReplicaSet); @@ -187,7 +280,11 @@ public class CFDLeaderBalancerTest { // Do balancing Map<TConsensusGroupId, Integer> leaderDistribution = BALANCER.generateOptimalLeaderDistribution( - databaseRegionGroupMap, regionReplicaSetMap, regionLeaderMap, new HashSet<>()); + databaseRegionGroupMap, + regionReplicaSetMap, + regionLeaderMap, + dataNodeStatisticsMap, + regionStatisticsMap); // All RegionGroup got a leader Assert.assertEquals(regionGroupNum, leaderDistribution.size()); diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java index a443fe74c02..6b97e1b385b 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java @@ -23,16 +23,18 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.commons.cluster.RegionStatus; +import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics; +import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics; import org.junit.Assert; import org.junit.Test; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Random; -import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -43,42 +45,56 @@ public class GreedyLeaderBalancerTest { @Test public void optimalLeaderDistributionTest() { - Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new HashMap<>(); - Map<TConsensusGroupId, Integer> regionLeaderMap = new HashMap<>(); - Set<Integer> disabledDataNodeSet = new HashSet<>(); + Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new TreeMap<>(); + Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>(); + Map<Integer, NodeStatistics> dataNodeStatisticsMap = new TreeMap<>(); + Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap = new TreeMap<>(); Random random = new Random(); + // Assuming all DataNodes are in Running status + for (int i = 0; i < 6; i++) { + dataNodeStatisticsMap.put(i, new NodeStatistics(NodeStatus.Running)); + } + // Build 9 RegionGroups in DataNodes 0~2 for (int i = 0; i < 9; i++) { TConsensusGroupId regionGroupId = new TConsensusGroupId(TConsensusGroupType.DataRegion, i); - TRegionReplicaSet regionReplicaSet = - new TRegionReplicaSet( - regionGroupId, - Arrays.asList( - new TDataNodeLocation().setDataNodeId(0), - new TDataNodeLocation().setDataNodeId(1), - new TDataNodeLocation().setDataNodeId(2))); + List<TDataNodeLocation> dataNodeLocations = new ArrayList<>(); + Map<Integer, RegionStatistics> regionStatistics = new TreeMap<>(); + for (int j = 0; j < 3; j++) { + dataNodeLocations.add(new TDataNodeLocation().setDataNodeId(j)); + // Assuming all Regions are in Running status + regionStatistics.put(j, new RegionStatistics(RegionStatus.Running)); + } + TRegionReplicaSet regionReplicaSet = new TRegionReplicaSet(regionGroupId, dataNodeLocations); regionReplicaSetMap.put(regionGroupId, regionReplicaSet); regionLeaderMap.put(regionGroupId, random.nextInt(3)); + regionStatisticsMap.put(regionGroupId, regionStatistics); } // Build 9 RegionGroups in DataNodes 3~5 for (int i = 9; i < 18; i++) { TConsensusGroupId regionGroupId = new TConsensusGroupId(TConsensusGroupType.DataRegion, i); - TRegionReplicaSet regionReplicaSet = - new TRegionReplicaSet( - regionGroupId, - Arrays.asList( - new TDataNodeLocation().setDataNodeId(3), - new TDataNodeLocation().setDataNodeId(4), - new TDataNodeLocation().setDataNodeId(5))); + List<TDataNodeLocation> dataNodeLocations = new ArrayList<>(); + Map<Integer, RegionStatistics> regionStatistics = new TreeMap<>(); + for (int j = 3; j < 6; j++) { + dataNodeLocations.add(new TDataNodeLocation().setDataNodeId(j)); + // Assuming all Regions are in Running status + regionStatistics.put(j, new RegionStatistics(RegionStatus.Running)); + } + TRegionReplicaSet regionReplicaSet = new TRegionReplicaSet(regionGroupId, dataNodeLocations); regionReplicaSetMap.put(regionGroupId, regionReplicaSet); regionLeaderMap.put(regionGroupId, 3 + random.nextInt(3)); + regionStatisticsMap.put(regionGroupId, regionStatistics); } Map<TConsensusGroupId, Integer> leaderDistribution = BALANCER.generateOptimalLeaderDistribution( - new TreeMap<>(), regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet); + new TreeMap<>(), + regionReplicaSetMap, + regionLeaderMap, + dataNodeStatisticsMap, + regionStatisticsMap); Map<Integer, AtomicInteger> leaderCounter = new ConcurrentHashMap<>(); leaderDistribution.forEach( (regionGroupId, leaderId) -> @@ -94,44 +110,60 @@ public class GreedyLeaderBalancerTest { @Test public void disableTest() { - Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new HashMap<>(); - Map<TConsensusGroupId, Integer> regionLeaderMap = new HashMap<>(); - Set<Integer> disabledDataNodeSet = new HashSet<>(); - - disabledDataNodeSet.add(1); - disabledDataNodeSet.add(4); + Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new TreeMap<>(); + Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>(); + Map<Integer, NodeStatistics> dataNodeStatisticsMap = new TreeMap<>(); + Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap = new TreeMap<>(); + + // Assuming DataNode 1 and 4 are disabled + dataNodeStatisticsMap.put(0, new NodeStatistics(NodeStatus.Running)); + dataNodeStatisticsMap.put(1, new NodeStatistics(NodeStatus.Unknown)); + dataNodeStatisticsMap.put(2, new NodeStatistics(NodeStatus.Running)); + dataNodeStatisticsMap.put(3, new NodeStatistics(NodeStatus.Running)); + dataNodeStatisticsMap.put(4, new NodeStatistics(NodeStatus.ReadOnly)); + dataNodeStatisticsMap.put(5, new NodeStatistics(NodeStatus.Running)); // Build 10 RegionGroup whose leaders are all 1(Disabled) for (int i = 0; i < 10; i++) { TConsensusGroupId regionGroupId = new TConsensusGroupId(TConsensusGroupType.DataRegion, i); - TRegionReplicaSet regionReplicaSet = - new TRegionReplicaSet( - regionGroupId, - Arrays.asList( - new TDataNodeLocation().setDataNodeId(0), - new TDataNodeLocation().setDataNodeId(1), - new TDataNodeLocation().setDataNodeId(2))); + List<TDataNodeLocation> dataNodeLocations = new ArrayList<>(); + Map<Integer, RegionStatistics> regionStatistics = new TreeMap<>(); + for (int j = 0; j < 3; j++) { + dataNodeLocations.add(new TDataNodeLocation().setDataNodeId(j)); + // Assuming all Regions in DataNode 1 are Unknown + regionStatistics.put( + j, new RegionStatistics(j == 1 ? RegionStatus.Unknown : RegionStatus.Running)); + } + TRegionReplicaSet regionReplicaSet = new TRegionReplicaSet(regionGroupId, dataNodeLocations); regionReplicaSetMap.put(regionGroupId, regionReplicaSet); regionLeaderMap.put(regionGroupId, 1); + regionStatisticsMap.put(regionGroupId, regionStatistics); } // Build 10 RegionGroup whose leaders are all 4(Disabled) for (int i = 10; i < 20; i++) { TConsensusGroupId regionGroupId = new TConsensusGroupId(TConsensusGroupType.DataRegion, i); - TRegionReplicaSet regionReplicaSet = - new TRegionReplicaSet( - regionGroupId, - Arrays.asList( - new TDataNodeLocation().setDataNodeId(3), - new TDataNodeLocation().setDataNodeId(4), - new TDataNodeLocation().setDataNodeId(5))); + List<TDataNodeLocation> dataNodeLocations = new ArrayList<>(); + Map<Integer, RegionStatistics> regionStatistics = new TreeMap<>(); + for (int j = 3; j < 6; j++) { + dataNodeLocations.add(new TDataNodeLocation().setDataNodeId(j)); + // Assuming all Regions in DataNode 4 are ReadOnly + regionStatistics.put( + j, new RegionStatistics(j == 4 ? RegionStatus.ReadOnly : RegionStatus.Running)); + } + TRegionReplicaSet regionReplicaSet = new TRegionReplicaSet(regionGroupId, dataNodeLocations); regionReplicaSetMap.put(regionGroupId, regionReplicaSet); regionLeaderMap.put(regionGroupId, 4); + regionStatisticsMap.put(regionGroupId, regionStatistics); } Map<TConsensusGroupId, Integer> leaderDistribution = BALANCER.generateOptimalLeaderDistribution( - new TreeMap<>(), regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet); + new TreeMap<>(), + regionReplicaSetMap, + regionLeaderMap, + dataNodeStatisticsMap, + regionStatisticsMap); Map<Integer, AtomicInteger> leaderCounter = new ConcurrentHashMap<>(); leaderDistribution.forEach( (regionGroupId, leaderId) -> diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java index 2a1c7093906..182b5edbce9 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java @@ -23,6 +23,10 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.commons.cluster.RegionStatus; +import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics; +import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics; import org.apache.iotdb.tsfile.utils.Pair; import org.slf4j.Logger; @@ -88,6 +92,24 @@ public class LeaderBalancerComparisonTest { // Basic test Map<TConsensusGroupId, Integer> greedyLeaderDistribution = new ConcurrentHashMap<>(); + Map<Integer, NodeStatistics> allRunningDataNodeStatistics = new TreeMap<>(); + for (int i = 0; i < dataNodeNum; i++) { + allRunningDataNodeStatistics.put(i, new NodeStatistics(NodeStatus.Running)); + } + Map<TConsensusGroupId, Map<Integer, RegionStatistics>> allRunningRegionStatistics = + new TreeMap<>(); + regionReplicaSetMap.forEach( + (regionGroupId, regionReplicaSet) -> { + Map<Integer, RegionStatistics> regionStatistics = new TreeMap<>(); + regionReplicaSet + .getDataNodeLocations() + .forEach( + dataNodeLocation -> + regionStatistics.put( + dataNodeLocation.getDataNodeId(), + new RegionStatistics(RegionStatus.Running))); + allRunningRegionStatistics.put(regionGroupId, regionStatistics); + }); Statistics greedyStatistics = doBalancing( dataNodeNum, @@ -95,7 +117,8 @@ public class LeaderBalancerComparisonTest { GREEDY_LEADER_BALANCER, regionReplicaSetMap, regionLeaderMap, - new HashSet<>(), + allRunningDataNodeStatistics, + allRunningRegionStatistics, greedyLeaderDistribution); Map<TConsensusGroupId, Integer> mcfLeaderDistribution = new ConcurrentHashMap<>(); Statistics mcfStatistics = @@ -105,7 +128,8 @@ public class LeaderBalancerComparisonTest { MIN_COST_FLOW_LEADER_BALANCER, regionReplicaSetMap, regionLeaderMap, - new HashSet<>(), + allRunningDataNodeStatistics, + allRunningRegionStatistics, mcfLeaderDistribution); if (isCommandLineMode) { LOGGER.info("[Basic test]"); @@ -126,6 +150,30 @@ public class LeaderBalancerComparisonTest { } disabledDataNodeSet.add(dataNodeId); } + Map<Integer, NodeStatistics> disabledDataNodeStatistics = new TreeMap<>(); + for (int i = 0; i < dataNodeNum; i++) { + disabledDataNodeStatistics.put( + i, + disabledDataNodeSet.contains(i) + ? new NodeStatistics(NodeStatus.Unknown) + : new NodeStatistics(NodeStatus.Running)); + } + Map<TConsensusGroupId, Map<Integer, RegionStatistics>> disabledRegionStatistics = + new TreeMap<>(); + regionReplicaSetMap.forEach( + (regionGroupId, regionReplicaSet) -> { + Map<Integer, RegionStatistics> regionStatistics = new TreeMap<>(); + regionReplicaSet + .getDataNodeLocations() + .forEach( + dataNodeLocation -> + regionStatistics.put( + dataNodeLocation.getDataNodeId(), + disabledDataNodeSet.contains(dataNodeLocation.getDataNodeId()) + ? new RegionStatistics(RegionStatus.Unknown) + : new RegionStatistics(RegionStatus.Running))); + disabledRegionStatistics.put(regionGroupId, regionStatistics); + }); greedyStatistics = doBalancing( dataNodeNum, @@ -133,7 +181,8 @@ public class LeaderBalancerComparisonTest { GREEDY_LEADER_BALANCER, regionReplicaSetMap, greedyLeaderDistribution, - disabledDataNodeSet, + disabledDataNodeStatistics, + disabledRegionStatistics, greedyLeaderDistribution); mcfStatistics = doBalancing( @@ -142,7 +191,8 @@ public class LeaderBalancerComparisonTest { MIN_COST_FLOW_LEADER_BALANCER, regionReplicaSetMap, mcfLeaderDistribution, - disabledDataNodeSet, + disabledDataNodeStatistics, + disabledRegionStatistics, mcfLeaderDistribution); if (isCommandLineMode) { LOGGER.info("[Disaster test]"); @@ -161,7 +211,8 @@ public class LeaderBalancerComparisonTest { GREEDY_LEADER_BALANCER, regionReplicaSetMap, greedyLeaderDistribution, - new HashSet<>(), + allRunningDataNodeStatistics, + allRunningRegionStatistics, greedyLeaderDistribution); mcfStatistics = doBalancing( @@ -170,7 +221,8 @@ public class LeaderBalancerComparisonTest { MIN_COST_FLOW_LEADER_BALANCER, regionReplicaSetMap, mcfLeaderDistribution, - new HashSet<>(), + allRunningDataNodeStatistics, + allRunningRegionStatistics, mcfLeaderDistribution); if (isCommandLineMode) { LOGGER.info("[Recovery test]"); @@ -254,10 +306,11 @@ public class LeaderBalancerComparisonTest { private Statistics doBalancing( int dataNodeNum, int regionGroupNum, - ILeaderBalancer leaderBalancer, + AbstractLeaderBalancer leaderBalancer, Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap, Map<TConsensusGroupId, Integer> regionLeaderMap, - Set<Integer> disabledDataNodeSet, + Map<Integer, NodeStatistics> nodeStatisticsMap, + Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap, Map<TConsensusGroupId, Integer> stableLeaderDistribution) { Statistics result = new Statistics(); @@ -266,7 +319,11 @@ public class LeaderBalancerComparisonTest { for (int rounds = 0; rounds < 1000; rounds++) { Map<TConsensusGroupId, Integer> currentDistribution = leaderBalancer.generateOptimalLeaderDistribution( - new TreeMap<>(), regionReplicaSetMap, lastDistribution, disabledDataNodeSet); + new TreeMap<>(), + regionReplicaSetMap, + lastDistribution, + nodeStatisticsMap, + regionStatisticsMap); if (currentDistribution.equals(lastDistribution)) { // The leader distribution is stable result.rounds = rounds;
