This is an automated email from the ASF dual-hosted git repository. bbeaudreault pushed a commit to branch hubspot-2 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 1b256f6a7adf74ac60c4d01b1caf8656a75e3b43 Author: clarax <clarax98...@gmail.com> AuthorDate: Tue Nov 2 19:27:46 2021 -0700 HBASE-26309 Balancer tends to move regions to the server at the end o… (#3812) Signed-off-by: Duo Zhang <zhang...@apache.org> --- .../hbase/master/balancer/BaseLoadBalancer.java | 7 +++ .../master/balancer/LoadCandidateGenerator.java | 54 ++++++++++++++++------ .../master/balancer/StochasticLoadBalancer.java | 7 +-- .../hbase/master/balancer/BalancerTestBase.java | 39 +++++++++++++++- ...ochasticLoadBalancerRegionReplicaWithRacks.java | 34 ++++++++++++-- 5 files changed, 120 insertions(+), 21 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java index a5756a0663e..5dc30a5d805 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java @@ -338,6 +338,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { racks[entry.getValue()] = entry.getKey(); } + LOG.debug("Hosts are {} racks are {}", hostsToIndex, racksToIndex); for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { int serverIndex = serversToIndex.get(entry.getKey().getAddress()); regionPerServerIndex = serverIndexToRegionsOffset[serverIndex]; @@ -365,6 +366,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { serversPerHost[i] = new int[serversPerHostList.get(i).size()]; for (int j = 0; j < serversPerHost[i].length; j++) { serversPerHost[i][j] = serversPerHostList.get(i).get(j); + LOG.debug("server {} is on host {}",serversPerHostList.get(i).get(j), i); } if (serversPerHost[i].length > 1) { multiServersPerHost = true; @@ -375,6 +377,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { serversPerRack[i] = new int[serversPerRackList.get(i).size()]; for (int j = 0; j < serversPerRack[i].length; j++) { serversPerRack[i][j] = serversPerRackList.get(i).get(j); + LOG.info("server {} is on rack {}",serversPerRackList.get(i).get(j), i); } } @@ -960,6 +963,10 @@ public abstract class BaseLoadBalancer implements LoadBalancer { private Comparator<Integer> numRegionsComparator = Comparator.comparingInt(this::getNumRegions); + public Comparator<Integer> getNumRegionsComparator() { + return numRegionsComparator; + } + int getLowestLocalityRegionOnServer(int serverIndex) { if (regionFinder != null) { float lowestLocality = 1.0f; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/LoadCandidateGenerator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/LoadCandidateGenerator.java index d60065feeb0..38114b9df80 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/LoadCandidateGenerator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/LoadCandidateGenerator.java @@ -18,13 +18,13 @@ package org.apache.hadoop.hbase.master.balancer; +import java.util.concurrent.ThreadLocalRandom; import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private class LoadCandidateGenerator extends CandidateGenerator { - @Override - BaseLoadBalancer.Cluster.Action generate(BaseLoadBalancer.Cluster cluster) { + @Override BaseLoadBalancer.Cluster.Action generate(BaseLoadBalancer.Cluster cluster) { cluster.sortServersByRegionCount(); int thisServer = pickMostLoadedServer(cluster, -1); int otherServer = pickLeastLoadedServer(cluster, thisServer); @@ -34,27 +34,53 @@ class LoadCandidateGenerator extends CandidateGenerator { private int pickLeastLoadedServer(final BaseLoadBalancer.Cluster cluster, int thisServer) { Integer[] servers = cluster.serverIndicesSortedByRegionCount; - int index = 0; - while (servers[index] == null || servers[index] == thisServer) { - index++; - if (index == servers.length) { - return -1; + int selectedIndex = -1; + double currentLargestRandom = -1; + for (int i = 0; i < servers.length; i++) { + if (servers[i] == null || servers[i] == thisServer) { + continue; + } + if (selectedIndex != -1 && cluster.getNumRegionsComparator().compare(servers[i], + servers[selectedIndex]) != 0) { + // Exhausted servers of the same region count + break; + } + // we don't know how many servers have the same region count, we will randomly select one + // using a simplified inline reservoir sampling by assignmening a random number to stream + // data and choose the greatest one. (http://gregable.com/2007/10/reservoir-sampling.html) + double currentRandom = ThreadLocalRandom.current().nextDouble(); + if (currentRandom > currentLargestRandom) { + selectedIndex = i; + currentLargestRandom = currentRandom; } } - return servers[index]; + return selectedIndex == -1 ? -1 : servers[selectedIndex]; } private int pickMostLoadedServer(final BaseLoadBalancer.Cluster cluster, int thisServer) { Integer[] servers = cluster.serverIndicesSortedByRegionCount; - int index = servers.length - 1; - while (servers[index] == null || servers[index] == thisServer) { - index--; - if (index < 0) { - return -1; + int selectedIndex = -1; + double currentLargestRandom = -1; + for (int i = servers.length - 1; i >= 0; i--) { + if (servers[i] == null || servers[i] == thisServer) { + continue; + } + if (selectedIndex != -1 + && cluster.getNumRegionsComparator().compare(servers[i], servers[selectedIndex]) != 0) { + // Exhausted servers of the same region count + break; + } + // we don't know how many servers have the same region count, we will randomly select one + // using a simplified inline reservoir sampling by assignmening a random number to stream + // data and choose the greatest one. (http://gregable.com/2007/10/reservoir-sampling.html) + double currentRandom = ThreadLocalRandom.current().nextDouble(); + if (currentRandom > currentLargestRandom) { + selectedIndex = i; + currentLargestRandom = currentRandom; } } - return servers[index]; + return selectedIndex == -1 ? -1 : servers[selectedIndex]; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java index 4a0dc6fe7d9..ec912c00f72 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java @@ -552,9 +552,10 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { plans = createRegionPlans(cluster); LOG.info("Finished computing new moving plan. Computation took {} ms" + " to try {} different iterations. Found a solution that moves " + - "{} regions; Going from a computed imbalance of {}" + " to a new imbalance of {}. ", - endTime - startTime, step, plans.size(), initCost / sumMultiplier, - currentCost / sumMultiplier); + "{} regions; Going from a computed imbalance of {}" + + " to a new imbalance of {}. funtionCost={}", + endTime - startTime, step, plans.size(), + initCost / sumMultiplier, currentCost / sumMultiplier, functionCost()); sendRegionPlansToRingBuffer(plans, currentCost, initCost, initFunctionTotalCosts, step); return plans; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java index 0b5bc65e267..0f92d86377f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java @@ -574,7 +574,7 @@ public class BalancerTestBase { List<ServerAndLoad> balancedCluster = reconcile(list, plans, serverMap); // Print out the cluster loads to make debugging easier. - LOG.info("Mock Balance : " + printMock(balancedCluster)); + LOG.info("Mock after Balance : " + printMock(balancedCluster)); if (assertFullyBalanced) { assertClusterAsBalanced(balancedCluster); @@ -590,6 +590,43 @@ public class BalancerTestBase { } } + protected void testWithClusterWithIteration(Map<ServerName, List<RegionInfo>> serverMap, + RackManager rackManager, boolean assertFullyBalanced, boolean assertFullyBalancedForReplicas) { + List<ServerAndLoad> list = convertToList(serverMap); + LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list)); + + loadBalancer.setRackManager(rackManager); + // Run the balancer. + Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable = + (Map) mockClusterServersWithTables(serverMap); + List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable); + assertNotNull("Initial cluster balance should produce plans.", plans); + + List<ServerAndLoad> balancedCluster = null; + // Run through iteration until done. Otherwise will be killed as test time out + while (plans != null && (assertFullyBalanced || assertFullyBalancedForReplicas)) { + // Apply the plan to the mock cluster. + balancedCluster = reconcile(list, plans, serverMap); + + // Print out the cluster loads to make debugging easier. + LOG.info("Mock after balance: " + printMock(balancedCluster)); + + LoadOfAllTable = (Map) mockClusterServersWithTables(serverMap); + plans = loadBalancer.balanceCluster(LoadOfAllTable); + } + + // Print out the cluster loads to make debugging easier. + LOG.info("Mock Final balance: " + printMock(balancedCluster)); + + if (assertFullyBalanced) { + assertNull("Given a requirement to be fully balanced, second attempt at plans should " + + "produce none.", plans); + } + if (assertFullyBalancedForReplicas) { + assertRegionReplicaPlacement(serverMap, rackManager); + } + } + protected Map<ServerName, List<RegionInfo>> createServerMap(int numNodes, int numRegions, int numRegionsPerServer, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerRegionReplicaWithRacks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerRegionReplicaWithRacks.java index b7532ebaa95..648d860d72b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerRegionReplicaWithRacks.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerRegionReplicaWithRacks.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.master.balancer; +import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -38,6 +39,8 @@ public class TestStochasticLoadBalancerRegionReplicaWithRacks extends BalancerTe private static class ForTestRackManager extends RackManager { int numRacks; + Map<String, Integer> serverIndexes = new HashMap<String, Integer>(); + int numServers = 0; public ForTestRackManager(int numRacks) { this.numRacks = numRacks; @@ -45,13 +48,18 @@ public class TestStochasticLoadBalancerRegionReplicaWithRacks extends BalancerTe @Override public String getRack(ServerName server) { - return "rack_" + (server.hashCode() % numRacks); + String key = server.getServerName(); + if (!serverIndexes.containsKey(key)) { + serverIndexes.put(key, numServers++); + } + return "rack_" + serverIndexes.get(key) % numRacks; } } @Test public void testRegionReplicationOnMidClusterWithRacks() { - conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 10000000L); + conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 100000000L); + conf.setBoolean("hbase.master.balancer.stochastic.runMaxSteps", true); conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f); conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 120 * 1000); // 120 sec loadBalancer.setConf(conf); @@ -65,6 +73,26 @@ public class TestStochasticLoadBalancerRegionReplicaWithRacks extends BalancerTe createServerMap(numNodes, numRegions, numRegionsPerServer, replication, numTables); RackManager rm = new ForTestRackManager(numRacks); - testWithCluster(serverMap, rm, false, true); + testWithClusterWithIteration(serverMap, rm, true, true); + } + + @Test + public void testRegionReplicationOnLargeClusterWithRacks() { + conf.setBoolean("hbase.master.balancer.stochastic.runMaxSteps", false); + conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 5000L); + conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f); + conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 10 * 1000); // 10 sec + loadBalancer.onConfigurationChange(conf); + int numNodes = 100; + int numRegions = numNodes * 30; + int replication = 3; // 3 replicas per region + int numRegionsPerServer = 28; + int numTables = 1; + int numRacks = 4; // all replicas should be on a different rack + Map<ServerName, List<RegionInfo>> serverMap = + createServerMap(numNodes, numRegions, numRegionsPerServer, replication, numTables); + RackManager rm = new ForTestRackManager(numRacks); + + testWithClusterWithIteration(serverMap, rm, true, true); } }