This is an automated email from the ASF dual-hosted git repository.
wchevreuil pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-3 by this push:
new fae6d3ea12f HBASE-29933: update_all_config hangs indefinitely when
balancing even… (#7900)
fae6d3ea12f is described below
commit fae6d3ea12fc383e09a4f9e87b9d575370cd438b
Author: Dev Hingu <[email protected]>
AuthorDate: Fri Mar 13 15:00:05 2026 +0530
HBASE-29933: update_all_config hangs indefinitely when balancing even…
(#7900)
Signed-off-by: Wellington Chevreuil <[email protected]>
Reviewed-by: Vaibhav Joshi <[email protected]>
---
.../apache/hadoop/hbase/master/LoadBalancer.java | 13 +
.../master/balancer/CacheAwareLoadBalancer.java | 11 +-
.../org/apache/hadoop/hbase/master/HMaster.java | 123 +++---
.../hbase/rsgroup/RSGroupBasedLoadBalancer.java | 78 +++-
.../balancer/RSGroupableBalancerTestBase.java | 29 ++
.../balancer/TestCacheAwareLoadBalancer.java | 425 ---------------------
...ancerWithCacheAwareLoadBalancerAsInternal.java} | 298 +++++++++------
7 files changed, 376 insertions(+), 601 deletions(-)
diff --git
a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
index 421e44c89cd..bc77e3eac7c 100644
---
a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
+++
b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
@@ -171,4 +171,17 @@ public interface LoadBalancer extends Stoppable,
ConfigurationObserver {
default void throttle(RegionPlan plan) throws Exception {
// noop
}
+
+ default long getThrottleDurationMs(RegionPlan plan) {
+ // By default, we do not throttle, so return 0 here.
+ return 0L;
+ }
+
+ default void onBalancingStart() {
+ // noop
+ }
+
+ default void onBalancingComplete() {
+ // noop
+ }
}
diff --git
a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java
b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java
index 30e4b5212bf..4f45687097e 100644
---
a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java
+++
b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java
@@ -70,7 +70,7 @@ public class CacheAwareLoadBalancer extends
StochasticLoadBalancer {
}
@Override
- public synchronized void loadConf(Configuration configuration) {
+ public void loadConf(Configuration configuration) {
this.configuration = configuration;
this.costFunctions = new ArrayList<>();
super.loadConf(configuration);
@@ -173,7 +173,7 @@ public class CacheAwareLoadBalancer extends
StochasticLoadBalancer {
}
@Override
- public void throttle(RegionPlan plan) {
+ public long getThrottleDurationMs(RegionPlan plan) {
Pair<ServerName, Float> rsRatio =
this.regionCacheRatioOnOldServerMap.get(plan.getRegionName());
if (
rsRatio != null && plan.getDestination().equals(rsRatio.getFirst())
@@ -181,6 +181,7 @@ public class CacheAwareLoadBalancer extends
StochasticLoadBalancer {
) {
LOG.debug("Moving region {} to server {} with cache ratio {}. No
throttling needed.",
plan.getRegionInfo().getEncodedName(), plan.getDestination(),
rsRatio.getSecond());
+ return 0L;
} else {
if (rsRatio != null) {
LOG.debug("Moving region {} to server {} with cache ratio: {}.
Throttling move for {}ms.",
@@ -193,11 +194,7 @@ public class CacheAwareLoadBalancer extends
StochasticLoadBalancer {
+ "Throttling move for {}ms.",
plan.getRegionInfo().getEncodedName(), plan.getDestination(),
sleepTime);
}
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
+ return sleepTime;
}
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 9e7cfa4dba6..16fc353a1c8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -2099,80 +2099,85 @@ public class HMaster extends
HBaseServerBase<MasterRpcServices> implements Maste
}
synchronized (this.balancer) {
- // Only allow one balance run at at time.
- if (this.assignmentManager.getRegionTransitScheduledCount() > 0) {
- List<RegionStateNode> regionsInTransition =
assignmentManager.getRegionsInTransition();
- // if hbase:meta region is in transition, result of assignment cannot
be recorded
- // ignore the force flag in that case
- boolean metaInTransition =
assignmentManager.isMetaRegionInTransition();
- List<RegionStateNode> toPrint = regionsInTransition;
- int max = 5;
- boolean truncated = false;
- if (regionsInTransition.size() > max) {
- toPrint = regionsInTransition.subList(0, max);
- truncated = true;
- }
+ try {
+ this.balancer.onBalancingStart();
+ // Only allow one balance run at at time.
+ if (this.assignmentManager.getRegionTransitScheduledCount() > 0) {
+ List<RegionStateNode> regionsInTransition =
assignmentManager.getRegionsInTransition();
+ // if hbase:meta region is in transition, result of assignment
cannot be recorded
+ // ignore the force flag in that case
+ boolean metaInTransition =
assignmentManager.isMetaRegionInTransition();
+ List<RegionStateNode> toPrint = regionsInTransition;
+ int max = 5;
+ boolean truncated = false;
+ if (regionsInTransition.size() > max) {
+ toPrint = regionsInTransition.subList(0, max);
+ truncated = true;
+ }
- if (!request.isIgnoreRegionsInTransition() || metaInTransition) {
- LOG.info("Not running balancer (ignoreRIT=false" + ", metaRIT=" +
metaInTransition
- + ") because " + assignmentManager.getRegionTransitScheduledCount()
- + " region(s) are scheduled to transit " + toPrint
- + (truncated ? "(truncated list)" : ""));
+ if (!request.isIgnoreRegionsInTransition() || metaInTransition) {
+ LOG.info("Not running balancer (ignoreRIT=false" + ", metaRIT=" +
metaInTransition
+ + ") because " +
assignmentManager.getRegionTransitScheduledCount()
+ + " region(s) are scheduled to transit " + toPrint
+ + (truncated ? "(truncated list)" : ""));
+ return responseBuilder.build();
+ }
+ }
+ if (this.serverManager.areDeadServersInProgress()) {
+ LOG.info("Not running balancer because processing dead
regionserver(s): "
+ + this.serverManager.getDeadServers());
return responseBuilder.build();
}
- }
- if (this.serverManager.areDeadServersInProgress()) {
- LOG.info("Not running balancer because processing dead
regionserver(s): "
- + this.serverManager.getDeadServers());
- return responseBuilder.build();
- }
- if (this.cpHost != null) {
- try {
- if (this.cpHost.preBalance(request)) {
- LOG.debug("Coprocessor bypassing balancer request");
+ if (this.cpHost != null) {
+ try {
+ if (this.cpHost.preBalance(request)) {
+ LOG.debug("Coprocessor bypassing balancer request");
+ return responseBuilder.build();
+ }
+ } catch (IOException ioe) {
+ LOG.error("Error invoking master coprocessor preBalance()", ioe);
return responseBuilder.build();
}
- } catch (IOException ioe) {
- LOG.error("Error invoking master coprocessor preBalance()", ioe);
- return responseBuilder.build();
}
- }
-
- Map<TableName, Map<ServerName, List<RegionInfo>>> assignments =
-
this.assignmentManager.getRegionStates().getAssignmentsForBalancer(tableStateManager,
- this.serverManager.getOnlineServersList());
- for (Map<ServerName, List<RegionInfo>> serverMap : assignments.values())
{
-
serverMap.keySet().removeAll(this.serverManager.getDrainingServersList());
- }
- // Give the balancer the current cluster state.
-
this.balancer.updateClusterMetrics(getClusterMetricsWithoutCoprocessor());
+ Map<TableName, Map<ServerName, List<RegionInfo>>> assignments =
+
this.assignmentManager.getRegionStates().getAssignmentsForBalancer(tableStateManager,
+ this.serverManager.getOnlineServersList());
+ for (Map<ServerName, List<RegionInfo>> serverMap :
assignments.values()) {
+
serverMap.keySet().removeAll(this.serverManager.getDrainingServersList());
+ }
- List<RegionPlan> plans = this.balancer.balanceCluster(assignments);
+ // Give the balancer the current cluster state.
+
this.balancer.updateClusterMetrics(getClusterMetricsWithoutCoprocessor());
- responseBuilder.setBalancerRan(true).setMovesCalculated(plans == null ?
0 : plans.size());
+ List<RegionPlan> plans = this.balancer.balanceCluster(assignments);
- if (skipRegionManagementAction("balancer")) {
- // make one last check that the cluster isn't shutting down before
proceeding.
- return responseBuilder.build();
- }
+ responseBuilder.setBalancerRan(true).setMovesCalculated(plans == null
? 0 : plans.size());
- // For dry run we don't actually want to execute the moves, but we do
want
- // to execute the coprocessor below
- List<RegionPlan> sucRPs =
- request.isDryRun() ? Collections.emptyList() :
executeRegionPlansWithThrottling(plans);
+ if (skipRegionManagementAction("balancer")) {
+ // make one last check that the cluster isn't shutting down before
proceeding.
+ return responseBuilder.build();
+ }
- if (this.cpHost != null) {
- try {
- this.cpHost.postBalance(request, sucRPs);
- } catch (IOException ioe) {
- // balancing already succeeded so don't change the result
- LOG.error("Error invoking master coprocessor postBalance()", ioe);
+ // For dry run we don't actually want to execute the moves, but we do
want
+ // to execute the coprocessor below
+ List<RegionPlan> sucRPs =
+ request.isDryRun() ? Collections.emptyList() :
executeRegionPlansWithThrottling(plans);
+
+ if (this.cpHost != null) {
+ try {
+ this.cpHost.postBalance(request, sucRPs);
+ } catch (IOException ioe) {
+ // balancing already succeeded so don't change the result
+ LOG.error("Error invoking master coprocessor postBalance()", ioe);
+ }
}
- }
- responseBuilder.setMovesExecuted(sucRPs.size());
+ responseBuilder.setMovesExecuted(sucRPs.size());
+ } finally {
+ this.balancer.onBalancingComplete();
+ }
}
// If LoadBalancer did not generate any plans, it means the cluster is
already balanced.
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java
index f539d1700ab..0b012eb9234 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java
@@ -26,6 +26,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics;
@@ -78,6 +80,16 @@ public class RSGroupBasedLoadBalancer implements
LoadBalancer {
private volatile RSGroupInfoManager rsGroupInfoManager;
private volatile LoadBalancer internalBalancer;
+ /**
+ * Tracks whether a balance run is currently in progress.
+ */
+ private final AtomicBoolean isBalancing = new AtomicBoolean(false);
+
+ /**
+ * Holds a configuration update that arrived while a balance run was in
progress.
+ */
+ private AtomicReference<Configuration> pendingConfiguration = new
AtomicReference<>();
+
/**
* Set this key to {@code true} to allow region fallback. Fallback to the
default rsgroup first,
* then fallback to any group if no online servers in default rsgroup.
Please keep balancer switch
@@ -396,7 +408,26 @@ public class RSGroupBasedLoadBalancer implements
LoadBalancer {
}
@Override
- public synchronized void onConfigurationChange(Configuration conf) {
+ public void onConfigurationChange(Configuration conf) {
+ // Refer to HBASE-29933
+ synchronized (this) {
+ // If balance is running, store configuration in pendingConfiguration
and return immediately.
+ // Defer the config update.
+ if (isBalancing.get()) {
+ LOG.debug(
+ "Balance is in progress, defer applying configuration change until
balance completed.");
+ pendingConfiguration.set(conf);
+ } else {
+ // Apply configuration change immediately.
+ updateConfiguration(conf);
+ }
+ }
+ }
+
+ /**
+ * Applies the given configuration.
+ */
+ private void updateConfiguration(Configuration conf) {
boolean newFallbackEnabled = conf.getBoolean(FALLBACK_GROUP_ENABLE_KEY,
false);
if (fallbackEnabled != newFallbackEnabled) {
LOG.info("Changing the value of {} from {} to {}",
FALLBACK_GROUP_ENABLE_KEY, fallbackEnabled,
@@ -407,6 +438,51 @@ public class RSGroupBasedLoadBalancer implements
LoadBalancer {
internalBalancer.onConfigurationChange(conf);
}
+ /**
+ * If a pending configuration was stored during a balance run, apply it and
clear the pending
+ * reference.
+ */
+ public void applyPendingConfiguration() {
+ Configuration toApply = pendingConfiguration.getAndSet(null);
+ if (toApply != null) {
+ LOG.info("Applying pending configuration after balance completed.");
+ updateConfiguration(toApply);
+ }
+ }
+
+ /**
+ * Sets {@link #isBalancing} to {@code true} before a balance run starts.
+ */
+ @Override
+ public void onBalancingStart() {
+ LOG.debug("setting isBalancing to true as balance is starting");
+ isBalancing.set(true);
+ }
+
+ /**
+ * Sets {@link #isBalancing} to {@code false} after a balance run completes
and applies any
+ * pending configuration that arrived during balancing.
+ */
+ @Override
+ public void onBalancingComplete() {
+ LOG.debug("setting isBalancing to false as balance is completed");
+ isBalancing.set(false);
+ applyPendingConfiguration();
+ }
+
+ @Override
+ public void throttle(RegionPlan plan) throws Exception {
+ long throttlingTime = internalBalancer.getThrottleDurationMs(plan);
+ if (throttlingTime > 0) {
+ try {
+ // Release the monitor while waiting to avoid blocking other threads.
+ wait(throttlingTime);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
@Override
public void stop(String why) {
internalBalancer.stop(why);
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/RSGroupableBalancerTestBase.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/RSGroupableBalancerTestBase.java
index 1ec08fbefe0..ab76f1c7add 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/RSGroupableBalancerTestBase.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/RSGroupableBalancerTestBase.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.master.balancer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -36,7 +38,10 @@ import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.RegionMetrics;
+import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Size;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
@@ -431,4 +436,28 @@ public class RSGroupableBalancerTestBase extends
BalancerTestBase {
}
return tableName;
}
+
+ protected ServerMetrics mockServerMetricsWithRegionCacheInfo(ServerName
server,
+ List<RegionInfo> regionsOnServer, float currentCacheRatio,
List<RegionInfo> oldRegionCacheInfo,
+ int oldRegionCachedSize, int regionSize) {
+ ServerMetrics serverMetrics = mock(ServerMetrics.class);
+ Map<byte[], RegionMetrics> regionLoadMap = new
TreeMap<>(Bytes.BYTES_COMPARATOR);
+ for (RegionInfo info : regionsOnServer) {
+ RegionMetrics rl = mock(RegionMetrics.class);
+ when(rl.getReadRequestCount()).thenReturn(0L);
+ when(rl.getWriteRequestCount()).thenReturn(0L);
+ when(rl.getMemStoreSize()).thenReturn(Size.ZERO);
+ when(rl.getStoreFileSize()).thenReturn(Size.ZERO);
+ when(rl.getCurrentRegionCachedRatio()).thenReturn(currentCacheRatio);
+ when(rl.getRegionSizeMB()).thenReturn(new Size(regionSize,
Size.Unit.MEGABYTE));
+ regionLoadMap.put(info.getRegionName(), rl);
+ }
+ when(serverMetrics.getRegionMetrics()).thenReturn(regionLoadMap);
+ Map<String, Integer> oldCacheRatioMap = new HashMap<>();
+ for (RegionInfo info : oldRegionCacheInfo) {
+ oldCacheRatioMap.put(info.getEncodedName(), oldRegionCachedSize);
+ }
+ when(serverMetrics.getRegionCachedInfo()).thenReturn(oldCacheRatioMap);
+ return serverMetrics;
+ }
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancer.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancer.java
index 916dbcf2947..ac62da5c356 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancer.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancer.java
@@ -17,21 +17,16 @@
*/
package org.apache.hadoop.hbase.master.balancer;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
-import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configuration;
@@ -50,8 +45,6 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Pair;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
@@ -75,8 +68,6 @@ public class TestCacheAwareLoadBalancer extends
BalancerTestBase {
static List<TableDescriptor> tableDescs;
- static Map<TableName, String> tableMap = new HashMap<>();
-
static TableName[] tables = new TableName[] { TableName.valueOf("dt1"),
TableName.valueOf("dt2"),
TableName.valueOf("dt3"), TableName.valueOf("dt4") };
@@ -135,422 +126,6 @@ public class TestCacheAwareLoadBalancer extends
BalancerTestBase {
loadBalancer.loadConf(conf);
}
- @Test
- public void testRegionsNotCachedOnOldServerAndCurrentServer() throws
Exception {
- // The regions are not cached on old server as well as the current server.
This causes
- // skewness in the region allocation which should be fixed by the balancer
-
- Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
- ServerName server0 = servers.get(0);
- ServerName server1 = servers.get(1);
- ServerName server2 = servers.get(2);
-
- // Simulate that the regions previously hosted by server1 are now hosted
on server0
- List<RegionInfo> regionsOnServer0 = randomRegions(10);
- List<RegionInfo> regionsOnServer1 = randomRegions(0);
- List<RegionInfo> regionsOnServer2 = randomRegions(5);
-
- clusterState.put(server0, regionsOnServer0);
- clusterState.put(server1, regionsOnServer1);
- clusterState.put(server2, regionsOnServer2);
-
- // Mock cluster metrics
- Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
- serverMetricsMap.put(server0,
mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
- 0.0f, new ArrayList<>(), 0, 10));
- serverMetricsMap.put(server1,
mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1,
- 0.0f, new ArrayList<>(), 0, 10));
- serverMetricsMap.put(server2,
mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2,
- 0.0f, new ArrayList<>(), 0, 10));
- ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
- when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
- loadBalancer.updateClusterMetrics(clusterMetrics);
-
- Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
- (Map) mockClusterServersWithTables(clusterState);
- List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
- Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>();
- Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>();
- for (RegionPlan plan : plans) {
- if (plan.getSource().equals(server0)) {
- regionsMovedFromServer0.add(plan.getRegionInfo());
- if (!targetServers.containsKey(plan.getDestination())) {
- targetServers.put(plan.getDestination(), new ArrayList<>());
- }
- targetServers.get(plan.getDestination()).add(plan.getRegionInfo());
- }
- }
- // should move 5 regions from server0 to server 1
- assertEquals(5, regionsMovedFromServer0.size());
- assertEquals(5, targetServers.get(server1).size());
- }
-
- @Test
- public void
testRegionsPartiallyCachedOnOldServerAndNotCachedOnCurrentServer() throws
Exception {
- // The regions are partially cached on old server but not cached on the
current server
-
- Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
- ServerName server0 = servers.get(0);
- ServerName server1 = servers.get(1);
- ServerName server2 = servers.get(2);
-
- // Simulate that the regions previously hosted by server1 are now hosted
on server0
- List<RegionInfo> regionsOnServer0 = randomRegions(10);
- List<RegionInfo> regionsOnServer1 = randomRegions(0);
- List<RegionInfo> regionsOnServer2 = randomRegions(5);
-
- clusterState.put(server0, regionsOnServer0);
- clusterState.put(server1, regionsOnServer1);
- clusterState.put(server2, regionsOnServer2);
-
- // Mock cluster metrics
-
- // Mock 5 regions from server0 were previously hosted on server1
- List<RegionInfo> oldCachedRegions = regionsOnServer0.subList(5,
regionsOnServer0.size() - 1);
-
- Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
- serverMetricsMap.put(server0,
mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
- 0.0f, new ArrayList<>(), 0, 10));
- serverMetricsMap.put(server1,
mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1,
- 0.0f, oldCachedRegions, 6, 10));
- serverMetricsMap.put(server2,
mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2,
- 0.0f, new ArrayList<>(), 0, 10));
- ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
- when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
- loadBalancer.updateClusterMetrics(clusterMetrics);
-
- Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
- (Map) mockClusterServersWithTables(clusterState);
- List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
- Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>();
- Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>();
- for (RegionPlan plan : plans) {
- if (plan.getSource().equals(server0)) {
- regionsMovedFromServer0.add(plan.getRegionInfo());
- if (!targetServers.containsKey(plan.getDestination())) {
- targetServers.put(plan.getDestination(), new ArrayList<>());
- }
- targetServers.get(plan.getDestination()).add(plan.getRegionInfo());
- }
- }
- // should move 5 regions from server0 to server1
- assertEquals(5, regionsMovedFromServer0.size());
- assertEquals(5, targetServers.get(server1).size());
- assertTrue(targetServers.get(server1).containsAll(oldCachedRegions));
- }
-
- @Test
- public void testThrottlingRegionBeyondThreshold() throws Exception {
- Configuration conf = HBaseConfiguration.create();
- CacheAwareLoadBalancer balancer = new CacheAwareLoadBalancer();
- balancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf));
- balancer.loadConf(conf);
- balancer.initialize();
- ServerName server0 = servers.get(0);
- ServerName server1 = servers.get(1);
- Pair<ServerName, Float> regionRatio = new Pair<>();
- regionRatio.setFirst(server0);
- regionRatio.setSecond(1.0f);
- balancer.regionCacheRatioOnOldServerMap.put("region1", regionRatio);
- RegionInfo mockedInfo = mock(RegionInfo.class);
- when(mockedInfo.getEncodedName()).thenReturn("region1");
- RegionPlan plan = new RegionPlan(mockedInfo, server1, server0);
- long startTime = EnvironmentEdgeManager.currentTime();
- balancer.throttle(plan);
- long endTime = EnvironmentEdgeManager.currentTime();
- assertTrue((endTime - startTime) < 10);
- }
-
- @Test
- public void testThrottlingRegionBelowThreshold() throws Exception {
- Configuration conf = HBaseConfiguration.create();
- conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 100);
- CacheAwareLoadBalancer balancer = new CacheAwareLoadBalancer();
- balancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf));
- balancer.loadConf(conf);
- balancer.initialize();
- ServerName server0 = servers.get(0);
- ServerName server1 = servers.get(1);
- Pair<ServerName, Float> regionRatio = new Pair<>();
- regionRatio.setFirst(server0);
- regionRatio.setSecond(0.1f);
- balancer.regionCacheRatioOnOldServerMap.put("region1", regionRatio);
- RegionInfo mockedInfo = mock(RegionInfo.class);
- when(mockedInfo.getEncodedName()).thenReturn("region1");
- RegionPlan plan = new RegionPlan(mockedInfo, server1, server0);
- long startTime = EnvironmentEdgeManager.currentTime();
- balancer.throttle(plan);
- long endTime = EnvironmentEdgeManager.currentTime();
- assertTrue((endTime - startTime) >= 100);
- }
-
- @Test
- public void testThrottlingCacheRatioUnknownOnTarget() throws Exception {
- Configuration conf = HBaseConfiguration.create();
- conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 100);
- CacheAwareLoadBalancer balancer = new CacheAwareLoadBalancer();
- balancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf));
- balancer.loadConf(conf);
- balancer.initialize();
- ServerName server0 = servers.get(0);
- ServerName server1 = servers.get(1);
- ServerName server3 = servers.get(2);
- // setting region cache ratio 100% on server 3, though this is not the
target in the region plan
- Pair<ServerName, Float> regionRatio = new Pair<>();
- regionRatio.setFirst(server3);
- regionRatio.setSecond(1.0f);
- balancer.regionCacheRatioOnOldServerMap.put("region1", regionRatio);
- RegionInfo mockedInfo = mock(RegionInfo.class);
- when(mockedInfo.getEncodedName()).thenReturn("region1");
- RegionPlan plan = new RegionPlan(mockedInfo, server1, server0);
- long startTime = EnvironmentEdgeManager.currentTime();
- balancer.throttle(plan);
- long endTime = EnvironmentEdgeManager.currentTime();
- assertTrue((endTime - startTime) >= 100);
- }
-
- @Test
- public void testThrottlingCacheRatioUnknownForRegion() throws Exception {
- Configuration conf = HBaseConfiguration.create();
- conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 100);
- CacheAwareLoadBalancer balancer = new CacheAwareLoadBalancer();
- balancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf));
- balancer.loadConf(conf);
- balancer.initialize();
- ServerName server0 = servers.get(0);
- ServerName server1 = servers.get(1);
- ServerName server3 = servers.get(2);
- // No cache ratio available for region1
- RegionInfo mockedInfo = mock(RegionInfo.class);
- when(mockedInfo.getEncodedName()).thenReturn("region1");
- RegionPlan plan = new RegionPlan(mockedInfo, server1, server0);
- long startTime = EnvironmentEdgeManager.currentTime();
- balancer.throttle(plan);
- long endTime = EnvironmentEdgeManager.currentTime();
- assertTrue((endTime - startTime) >= 100);
- }
-
- @Test
- public void testRegionPlansSortedByCacheRatioOnTarget() throws Exception {
- // The regions are fully cached on old server
-
- Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
- ServerName server0 = servers.get(0);
- ServerName server1 = servers.get(1);
- ServerName server2 = servers.get(2);
-
- // Simulate on RS with all regions, and two RSes with no regions
- List<RegionInfo> regionsOnServer0 = randomRegions(15);
- List<RegionInfo> regionsOnServer1 = randomRegions(0);
- List<RegionInfo> regionsOnServer2 = randomRegions(0);
-
- clusterState.put(server0, regionsOnServer0);
- clusterState.put(server1, regionsOnServer1);
- clusterState.put(server2, regionsOnServer2);
-
- // Mock cluster metrics
- // Mock 5 regions from server0 were previously hosted on server1
- List<RegionInfo> oldCachedRegions1 = regionsOnServer0.subList(5, 10);
- List<RegionInfo> oldCachedRegions2 = regionsOnServer0.subList(10,
regionsOnServer0.size());
- Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
- // mock server metrics to set cache ratio as 0 in the RS 0
- serverMetricsMap.put(server0,
mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
- 0.0f, new ArrayList<>(), 0, 10));
- // mock server metrics to set cache ratio as 1 in the RS 1
- serverMetricsMap.put(server1,
mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1,
- 0.0f, oldCachedRegions1, 10, 10));
- // mock server metrics to set cache ratio as .8 in the RS 2
- serverMetricsMap.put(server2,
mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2,
- 0.0f, oldCachedRegions2, 8, 10));
- ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
- when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
- loadBalancer.updateClusterMetrics(clusterMetrics);
-
- Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
- (Map) mockClusterServersWithTables(clusterState);
- List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
- LOG.debug("plans size: {}", plans.size());
- LOG.debug("plans: {}", plans);
- LOG.debug("server1 name: {}", server1.getServerName());
- // assert the plans are in descending order from the most cached to the
least cached
- int highCacheCount = 0;
- for (RegionPlan plan : plans) {
- LOG.debug("plan region: {}, target server: {}",
plan.getRegionInfo().getEncodedName(),
- plan.getDestination().getServerName());
- if (highCacheCount < 5) {
- LOG.debug("Count: {}", highCacheCount);
- assertTrue(oldCachedRegions1.contains(plan.getRegionInfo()));
- assertFalse(oldCachedRegions2.contains(plan.getRegionInfo()));
- highCacheCount++;
- } else {
- assertTrue(oldCachedRegions2.contains(plan.getRegionInfo()));
- assertFalse(oldCachedRegions1.contains(plan.getRegionInfo()));
- }
- }
-
- }
-
- @Test
- public void testRegionsFullyCachedOnOldServerAndNotCachedOnCurrentServers()
throws Exception {
- // The regions are fully cached on old server
-
- Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
- ServerName server0 = servers.get(0);
- ServerName server1 = servers.get(1);
- ServerName server2 = servers.get(2);
-
- // Simulate that the regions previously hosted by server1 are now hosted
on server0
- List<RegionInfo> regionsOnServer0 = randomRegions(10);
- List<RegionInfo> regionsOnServer1 = randomRegions(0);
- List<RegionInfo> regionsOnServer2 = randomRegions(5);
-
- clusterState.put(server0, regionsOnServer0);
- clusterState.put(server1, regionsOnServer1);
- clusterState.put(server2, regionsOnServer2);
-
- // Mock cluster metrics
-
- // Mock 5 regions from server0 were previously hosted on server1
- List<RegionInfo> oldCachedRegions = regionsOnServer0.subList(5,
regionsOnServer0.size() - 1);
-
- Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
- serverMetricsMap.put(server0,
mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
- 0.0f, new ArrayList<>(), 0, 10));
- serverMetricsMap.put(server1,
mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1,
- 0.0f, oldCachedRegions, 10, 10));
- serverMetricsMap.put(server2,
mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2,
- 0.0f, new ArrayList<>(), 0, 10));
- ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
- when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
- loadBalancer.updateClusterMetrics(clusterMetrics);
-
- Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
- (Map) mockClusterServersWithTables(clusterState);
- List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
- Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>();
- Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>();
- for (RegionPlan plan : plans) {
- if (plan.getSource().equals(server0)) {
- regionsMovedFromServer0.add(plan.getRegionInfo());
- if (!targetServers.containsKey(plan.getDestination())) {
- targetServers.put(plan.getDestination(), new ArrayList<>());
- }
- targetServers.get(plan.getDestination()).add(plan.getRegionInfo());
- }
- }
- // should move 5 regions from server0 to server1
- assertEquals(5, regionsMovedFromServer0.size());
- assertEquals(5, targetServers.get(server1).size());
- assertTrue(targetServers.get(server1).containsAll(oldCachedRegions));
- }
-
- @Test
- public void testRegionsFullyCachedOnOldAndCurrentServers() throws Exception {
- // The regions are fully cached on old server
-
- Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
- ServerName server0 = servers.get(0);
- ServerName server1 = servers.get(1);
- ServerName server2 = servers.get(2);
-
- // Simulate that the regions previously hosted by server1 are now hosted
on server0
- List<RegionInfo> regionsOnServer0 = randomRegions(10);
- List<RegionInfo> regionsOnServer1 = randomRegions(0);
- List<RegionInfo> regionsOnServer2 = randomRegions(5);
-
- clusterState.put(server0, regionsOnServer0);
- clusterState.put(server1, regionsOnServer1);
- clusterState.put(server2, regionsOnServer2);
-
- // Mock cluster metrics
-
- // Mock 5 regions from server0 were previously hosted on server1
- List<RegionInfo> oldCachedRegions = regionsOnServer0.subList(5,
regionsOnServer0.size() - 1);
-
- Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
- serverMetricsMap.put(server0,
mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
- 1.0f, new ArrayList<>(), 0, 10));
- serverMetricsMap.put(server1,
mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1,
- 1.0f, oldCachedRegions, 10, 10));
- serverMetricsMap.put(server2,
mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2,
- 1.0f, new ArrayList<>(), 0, 10));
- ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
- when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
- loadBalancer.updateClusterMetrics(clusterMetrics);
-
- Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
- (Map) mockClusterServersWithTables(clusterState);
- List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
- Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>();
- Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>();
- for (RegionPlan plan : plans) {
- if (plan.getSource().equals(server0)) {
- regionsMovedFromServer0.add(plan.getRegionInfo());
- if (!targetServers.containsKey(plan.getDestination())) {
- targetServers.put(plan.getDestination(), new ArrayList<>());
- }
- targetServers.get(plan.getDestination()).add(plan.getRegionInfo());
- }
- }
- // should move 5 regions from server0 to server1
- assertEquals(5, regionsMovedFromServer0.size());
- assertEquals(5, targetServers.get(server1).size());
- assertTrue(targetServers.get(server1).containsAll(oldCachedRegions));
- }
-
- @Test
- public void testRegionsPartiallyCachedOnOldServerAndCurrentServer() throws
Exception {
- // The regions are partially cached on old server
-
- Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
- ServerName server0 = servers.get(0);
- ServerName server1 = servers.get(1);
- ServerName server2 = servers.get(2);
-
- // Simulate that the regions previously hosted by server1 are now hosted
on server0
- List<RegionInfo> regionsOnServer0 = randomRegions(10);
- List<RegionInfo> regionsOnServer1 = randomRegions(0);
- List<RegionInfo> regionsOnServer2 = randomRegions(5);
-
- clusterState.put(server0, regionsOnServer0);
- clusterState.put(server1, regionsOnServer1);
- clusterState.put(server2, regionsOnServer2);
-
- // Mock cluster metrics
-
- // Mock 5 regions from server0 were previously hosted on server1
- List<RegionInfo> oldCachedRegions = regionsOnServer0.subList(5,
regionsOnServer0.size() - 1);
-
- Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
- serverMetricsMap.put(server0,
mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
- 0.2f, new ArrayList<>(), 0, 10));
- serverMetricsMap.put(server1,
mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1,
- 0.0f, oldCachedRegions, 6, 10));
- serverMetricsMap.put(server2,
mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2,
- 1.0f, new ArrayList<>(), 0, 10));
- ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
- when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
- loadBalancer.updateClusterMetrics(clusterMetrics);
-
- Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
- (Map) mockClusterServersWithTables(clusterState);
- List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
- Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>();
- Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>();
- for (RegionPlan plan : plans) {
- if (plan.getSource().equals(server0)) {
- regionsMovedFromServer0.add(plan.getRegionInfo());
- if (!targetServers.containsKey(plan.getDestination())) {
- targetServers.put(plan.getDestination(), new ArrayList<>());
- }
- targetServers.get(plan.getDestination()).add(plan.getRegionInfo());
- }
- }
- assertEquals(5, regionsMovedFromServer0.size());
- assertEquals(5, targetServers.get(server1).size());
- assertTrue(targetServers.get(server1).containsAll(oldCachedRegions));
- }
-
@Test
public void testBalancerNotThrowNPEWhenBalancerPlansIsNull() throws
Exception {
Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancer.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancerWithCacheAwareLoadBalancerAsInternal.java
similarity index 71%
copy from
hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancer.java
copy to
hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancerWithCacheAwareLoadBalancerAsInternal.java
index 916dbcf2947..c92f1e5a205 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancer.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancerWithCacheAwareLoadBalancerAsInternal.java
@@ -19,9 +19,7 @@ package org.apache.hadoop.hbase.master.balancer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -30,26 +28,25 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
-import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.RegionMetrics;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.Size;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.master.RegionPlan;
+import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
+import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.BeforeClass;
@@ -59,80 +56,31 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-
-@Category({ LargeTests.class })
-public class TestCacheAwareLoadBalancer extends BalancerTestBase {
+@Category(LargeTests.class)
+public class TestRSGroupBasedLoadBalancerWithCacheAwareLoadBalancerAsInternal
+ extends RSGroupableBalancerTestBase {
@ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestCacheAwareLoadBalancer.class);
-
- private static final Logger LOG =
LoggerFactory.getLogger(TestCacheAwareLoadBalancer.class);
-
- private static CacheAwareLoadBalancer loadBalancer;
-
- static List<ServerName> servers;
-
- static List<TableDescriptor> tableDescs;
-
- static Map<TableName, String> tableMap = new HashMap<>();
-
- static TableName[] tables = new TableName[] { TableName.valueOf("dt1"),
TableName.valueOf("dt2"),
- TableName.valueOf("dt3"), TableName.valueOf("dt4") };
+ public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule
+
.forClass(TestRSGroupBasedLoadBalancerWithCacheAwareLoadBalancerAsInternal.class);
- private static List<ServerName> generateServers(int numServers) {
- List<ServerName> servers = new ArrayList<>(numServers);
- Random rand = ThreadLocalRandom.current();
- for (int i = 0; i < numServers; i++) {
- String host = "server" + rand.nextInt(100000);
- int port = rand.nextInt(60000);
- servers.add(ServerName.valueOf(host, port, -1));
- }
- return servers;
- }
+ private static final Logger LOG =
+
LoggerFactory.getLogger(TestRSGroupBasedLoadBalancerWithCacheAwareLoadBalancerAsInternal.class);
- private static List<TableDescriptor> constructTableDesc(boolean
hasBogusTable) {
- List<TableDescriptor> tds = Lists.newArrayList();
- for (int i = 0; i < tables.length; i++) {
- TableDescriptor htd =
TableDescriptorBuilder.newBuilder(tables[i]).build();
- tds.add(htd);
- }
- return tds;
- }
-
- private ServerMetrics mockServerMetricsWithRegionCacheInfo(ServerName server,
- List<RegionInfo> regionsOnServer, float currentCacheRatio,
List<RegionInfo> oldRegionCacheInfo,
- int oldRegionCachedSize, int regionSize) {
- ServerMetrics serverMetrics = mock(ServerMetrics.class);
- Map<byte[], RegionMetrics> regionLoadMap = new
TreeMap<>(Bytes.BYTES_COMPARATOR);
- for (RegionInfo info : regionsOnServer) {
- RegionMetrics rl = mock(RegionMetrics.class);
- when(rl.getReadRequestCount()).thenReturn(0L);
- when(rl.getWriteRequestCount()).thenReturn(0L);
- when(rl.getMemStoreSize()).thenReturn(Size.ZERO);
- when(rl.getStoreFileSize()).thenReturn(Size.ZERO);
- when(rl.getCurrentRegionCachedRatio()).thenReturn(currentCacheRatio);
- when(rl.getRegionSizeMB()).thenReturn(new Size(regionSize,
Size.Unit.MEGABYTE));
- regionLoadMap.put(info.getRegionName(), rl);
- }
- when(serverMetrics.getRegionMetrics()).thenReturn(regionLoadMap);
- Map<String, Integer> oldCacheRatioMap = new HashMap<>();
- for (RegionInfo info : oldRegionCacheInfo) {
- oldCacheRatioMap.put(info.getEncodedName(), oldRegionCachedSize);
- }
- when(serverMetrics.getRegionCachedInfo()).thenReturn(oldCacheRatioMap);
- return serverMetrics;
- }
+ private static RSGroupBasedLoadBalancer loadBalancer;
@BeforeClass
public static void beforeAllTests() throws Exception {
+ groups = new String[] { RSGroupInfo.DEFAULT_GROUP };
servers = generateServers(3);
+ groupMap = constructGroupInfo(servers, groups);
tableDescs = constructTableDesc(false);
- Configuration conf = HBaseConfiguration.create();
+ Configuration cong = HBaseConfiguration.create();
conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY,
"prefetch_file_list");
- loadBalancer = new CacheAwareLoadBalancer();
- loadBalancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf));
- loadBalancer.loadConf(conf);
+ conf.set("hbase.rsgroup.grouploadbalancer.class",
+ CacheAwareLoadBalancer.class.getCanonicalName());
+ loadBalancer = new RSGroupBasedLoadBalancer();
+ loadBalancer.setMasterServices(getMockedMaster());
+ loadBalancer.initialize();
}
@Test
@@ -242,21 +190,28 @@ public class TestCacheAwareLoadBalancer extends
BalancerTestBase {
@Test
public void testThrottlingRegionBeyondThreshold() throws Exception {
Configuration conf = HBaseConfiguration.create();
- CacheAwareLoadBalancer balancer = new CacheAwareLoadBalancer();
- balancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf));
- balancer.loadConf(conf);
+ conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY,
"prefetch_file_list");
+ conf.set("hbase.rsgroup.grouploadbalancer.class",
+ CacheAwareLoadBalancer.class.getCanonicalName());
+ RSGroupBasedLoadBalancer balancer = new RSGroupBasedLoadBalancer();
+ balancer.setMasterServices(getMockedMaster());
balancer.initialize();
+
ServerName server0 = servers.get(0);
ServerName server1 = servers.get(1);
Pair<ServerName, Float> regionRatio = new Pair<>();
regionRatio.setFirst(server0);
regionRatio.setSecond(1.0f);
- balancer.regionCacheRatioOnOldServerMap.put("region1", regionRatio);
+ CacheAwareLoadBalancer internalBalancer =
+ (CacheAwareLoadBalancer) balancer.getInternalBalancer();
+ internalBalancer.regionCacheRatioOnOldServerMap.put("region1",
regionRatio);
RegionInfo mockedInfo = mock(RegionInfo.class);
when(mockedInfo.getEncodedName()).thenReturn("region1");
RegionPlan plan = new RegionPlan(mockedInfo, server1, server0);
long startTime = EnvironmentEdgeManager.currentTime();
- balancer.throttle(plan);
+ synchronized (balancer) {
+ balancer.throttle(plan);
+ }
long endTime = EnvironmentEdgeManager.currentTime();
assertTrue((endTime - startTime) < 10);
}
@@ -264,22 +219,31 @@ public class TestCacheAwareLoadBalancer extends
BalancerTestBase {
@Test
public void testThrottlingRegionBelowThreshold() throws Exception {
Configuration conf = HBaseConfiguration.create();
+ conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY,
"prefetch_file_list");
conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 100);
- CacheAwareLoadBalancer balancer = new CacheAwareLoadBalancer();
- balancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf));
- balancer.loadConf(conf);
- balancer.initialize();
+ conf.set("hbase.rsgroup.grouploadbalancer.class",
+ CacheAwareLoadBalancer.class.getCanonicalName());
+ RSGroupBasedLoadBalancer loadBalancer = new RSGroupBasedLoadBalancer();
+ loadBalancer.setMasterServices(getMockedMaster());
+ loadBalancer.initialize();
+ CacheAwareLoadBalancer internalBalancer =
+ (CacheAwareLoadBalancer) loadBalancer.getInternalBalancer();
+ internalBalancer.loadConf(conf);
+
ServerName server0 = servers.get(0);
ServerName server1 = servers.get(1);
Pair<ServerName, Float> regionRatio = new Pair<>();
regionRatio.setFirst(server0);
regionRatio.setSecond(0.1f);
- balancer.regionCacheRatioOnOldServerMap.put("region1", regionRatio);
+ internalBalancer = (CacheAwareLoadBalancer)
loadBalancer.getInternalBalancer();
+ internalBalancer.regionCacheRatioOnOldServerMap.put("region1",
regionRatio);
RegionInfo mockedInfo = mock(RegionInfo.class);
when(mockedInfo.getEncodedName()).thenReturn("region1");
RegionPlan plan = new RegionPlan(mockedInfo, server1, server0);
long startTime = EnvironmentEdgeManager.currentTime();
- balancer.throttle(plan);
+ synchronized (loadBalancer) {
+ loadBalancer.throttle(plan);
+ }
long endTime = EnvironmentEdgeManager.currentTime();
assertTrue((endTime - startTime) >= 100);
}
@@ -287,11 +251,17 @@ public class TestCacheAwareLoadBalancer extends
BalancerTestBase {
@Test
public void testThrottlingCacheRatioUnknownOnTarget() throws Exception {
Configuration conf = HBaseConfiguration.create();
+ conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY,
"prefetch_file_list");
conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 100);
- CacheAwareLoadBalancer balancer = new CacheAwareLoadBalancer();
- balancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf));
- balancer.loadConf(conf);
- balancer.initialize();
+ conf.set("hbase.rsgroup.grouploadbalancer.class",
+ CacheAwareLoadBalancer.class.getCanonicalName());
+ RSGroupBasedLoadBalancer loadBalancer = new RSGroupBasedLoadBalancer();
+ loadBalancer.setMasterServices(getMockedMaster());
+ loadBalancer.initialize();
+ CacheAwareLoadBalancer internalBalancer =
+ (CacheAwareLoadBalancer) loadBalancer.getInternalBalancer();
+ internalBalancer.loadConf(conf);
+
ServerName server0 = servers.get(0);
ServerName server1 = servers.get(1);
ServerName server3 = servers.get(2);
@@ -299,12 +269,15 @@ public class TestCacheAwareLoadBalancer extends
BalancerTestBase {
Pair<ServerName, Float> regionRatio = new Pair<>();
regionRatio.setFirst(server3);
regionRatio.setSecond(1.0f);
- balancer.regionCacheRatioOnOldServerMap.put("region1", regionRatio);
+ internalBalancer = (CacheAwareLoadBalancer)
loadBalancer.getInternalBalancer();
+ internalBalancer.regionCacheRatioOnOldServerMap.put("region1",
regionRatio);
RegionInfo mockedInfo = mock(RegionInfo.class);
when(mockedInfo.getEncodedName()).thenReturn("region1");
RegionPlan plan = new RegionPlan(mockedInfo, server1, server0);
long startTime = EnvironmentEdgeManager.currentTime();
- balancer.throttle(plan);
+ synchronized (loadBalancer) {
+ loadBalancer.throttle(plan);
+ }
long endTime = EnvironmentEdgeManager.currentTime();
assertTrue((endTime - startTime) >= 100);
}
@@ -312,11 +285,17 @@ public class TestCacheAwareLoadBalancer extends
BalancerTestBase {
@Test
public void testThrottlingCacheRatioUnknownForRegion() throws Exception {
Configuration conf = HBaseConfiguration.create();
+ conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY,
"prefetch_file_list");
conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 100);
- CacheAwareLoadBalancer balancer = new CacheAwareLoadBalancer();
- balancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf));
- balancer.loadConf(conf);
- balancer.initialize();
+ conf.set("hbase.rsgroup.grouploadbalancer.class",
+ CacheAwareLoadBalancer.class.getCanonicalName());
+ RSGroupBasedLoadBalancer loadBalancer = new RSGroupBasedLoadBalancer();
+ loadBalancer.setMasterServices(getMockedMaster());
+ loadBalancer.initialize();
+ CacheAwareLoadBalancer internalBalancer =
+ (CacheAwareLoadBalancer) loadBalancer.getInternalBalancer();
+ internalBalancer.loadConf(conf);
+
ServerName server0 = servers.get(0);
ServerName server1 = servers.get(1);
ServerName server3 = servers.get(2);
@@ -325,7 +304,9 @@ public class TestCacheAwareLoadBalancer extends
BalancerTestBase {
when(mockedInfo.getEncodedName()).thenReturn("region1");
RegionPlan plan = new RegionPlan(mockedInfo, server1, server0);
long startTime = EnvironmentEdgeManager.currentTime();
- balancer.throttle(plan);
+ synchronized (loadBalancer) {
+ loadBalancer.throttle(plan);
+ }
long endTime = EnvironmentEdgeManager.currentTime();
assertTrue((endTime - startTime) >= 100);
}
@@ -387,7 +368,6 @@ public class TestCacheAwareLoadBalancer extends
BalancerTestBase {
assertFalse(oldCachedRegions1.contains(plan.getRegionInfo()));
}
}
-
}
@Test
@@ -551,22 +531,38 @@ public class TestCacheAwareLoadBalancer extends
BalancerTestBase {
assertTrue(targetServers.get(server1).containsAll(oldCachedRegions));
}
- @Test
- public void testBalancerNotThrowNPEWhenBalancerPlansIsNull() throws
Exception {
+ @Test(timeout = 60000)
+ public void testConfigUpdateDuringBalance() throws Exception {
+ float expectedOldRatioThreshold = 0.8f;
+ float expectedNewRatioThreshold = 0.95f;
+ long throttlingTimeMillis = 10000;
+
+ conf = HBaseConfiguration.create();
+ conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, throttlingTimeMillis);
+ conf.setFloat(CacheAwareLoadBalancer.CACHE_RATIO_THRESHOLD,
expectedOldRatioThreshold);
+ conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY,
"prefetch_file_list");
+ conf.set("hbase.rsgroup.grouploadbalancer.class",
+ CacheAwareLoadBalancer.class.getCanonicalName());
+
+ RSGroupBasedLoadBalancer balancer = new RSGroupBasedLoadBalancer();
+ balancer.setMasterServices(getMockedMaster());
+ balancer.initialize();
+
Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
ServerName server0 = servers.get(0);
ServerName server1 = servers.get(1);
ServerName server2 = servers.get(2);
- List<RegionInfo> regionsOnServer0 = randomRegions(5);
- List<RegionInfo> regionsOnServer1 = randomRegions(5);
- List<RegionInfo> regionsOnServer2 = randomRegions(5);
+ // Setup cluster: all 3 regions on server0 (unbalanced)
+ List<RegionInfo> regionsOnServer0 = randomRegions(3);
+ List<RegionInfo> regionsOnServer1 = randomRegions(0);
+ List<RegionInfo> regionsOnServer2 = randomRegions(0);
clusterState.put(server0, regionsOnServer0);
clusterState.put(server1, regionsOnServer1);
clusterState.put(server2, regionsOnServer2);
- // Mock cluster metrics
+ // Mock metrics: NO cache info for any region = all will be throttled
Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
serverMetricsMap.put(server0,
mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
0.0f, new ArrayList<>(), 0, 10));
@@ -577,15 +573,99 @@ public class TestCacheAwareLoadBalancer extends
BalancerTestBase {
ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
- loadBalancer.updateClusterMetrics(clusterMetrics);
+ balancer.updateClusterMetrics(clusterMetrics);
- Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
+ final Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable =
(Map) mockClusterServersWithTables(clusterState);
+
+ // Verify initial configuration is set correctly
+ CacheAwareLoadBalancer internalBalancer =
+ (CacheAwareLoadBalancer) balancer.getInternalBalancer();
+ assertEquals(expectedOldRatioThreshold, internalBalancer.ratioThreshold,
0.001f);
+
+ CountDownLatch balanceStarted = new CountDownLatch(1);
+ CountDownLatch configUpdateInitiated = new CountDownLatch(1);
+ long[] configUpdateDuration = new long[1];
+ long[] balanceDuration = new long[1];
+
+ // Actual old ratio threshold used during balance
+ float[] actualOldRatioThresholdDuringBalance = new float[1];
+
+ ExecutorService executor = Executors.newFixedThreadPool(2);
+
try {
- List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
- assertNull(plans);
- } catch (NullPointerException npe) {
- fail("NPE should not be thrown");
+ // Thread 1 Simulate similar flow to HMaster.balance() which holds
synchronized(balancer) for
+ // the duration of balance
+ Future<Long> balanceFuture = executor.submit(() -> {
+ try {
+ long start = EnvironmentEdgeManager.currentTime();
+ synchronized (balancer) {
+ try {
+ // Simulate beginning of HMaster.balance() mark balancing window
open
+ balancer.onBalancingStart();
+ balanceStarted.countDown();
+
+ List<RegionPlan> plans = balancer.balanceCluster(loadOfAllTable);
+ if (plans != null) {
+ for (RegionPlan plan : plans) {
+ balancer.throttle(plan);
+ }
+ }
+ // Wait until config update is initiated while balance is still
in progress
+ configUpdateInitiated.await();
+
+ // Old config should still be visible during current balance run
+ CacheAwareLoadBalancer currentInternal =
+ (CacheAwareLoadBalancer) balancer.getInternalBalancer();
+ actualOldRatioThresholdDuringBalance[0] =
currentInternal.ratioThreshold;
+
+ } finally {
+ balancer.onBalancingComplete();
+ }
+ }
+ return EnvironmentEdgeManager.currentTime() - start;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ // Thread 2: Simulate update_all_config / onConfigurationChange
+ Future<Long> configUpdateFuture = executor.submit(() -> {
+ try {
+ long startTime = System.currentTimeMillis();
+ // Wait for balance to start
+ balanceStarted.await();
+
+ // Call onConfigurationChange - should NOT hang
+ Configuration newConf = new Configuration(conf);
+ newConf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY,
"prefetch_file_list");
+ newConf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 10000);
+ newConf.setFloat(CacheAwareLoadBalancer.CACHE_RATIO_THRESHOLD,
expectedNewRatioThreshold);
+ balancer.onConfigurationChange(newConf);
+ configUpdateInitiated.countDown();
+
+ return System.currentTimeMillis() - startTime;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ // Wait for both threads to complete
+ configUpdateDuration[0] = configUpdateFuture.get();
+ balanceDuration[0] = balanceFuture.get();
+
+ // Verify that config update didn't hang/timeout waiting for balance
+ assertTrue(configUpdateDuration[0] < balanceDuration[0]);
+
+ // Verify that ratio threshold used during balance is stll the old
+ assertEquals(expectedOldRatioThreshold,
actualOldRatioThresholdDuringBalance[0], 0.001f);
+
+ // Verify that config updated successfully after balance completed
+ internalBalancer = (CacheAwareLoadBalancer)
balancer.getInternalBalancer();
+ assertEquals(expectedNewRatioThreshold, internalBalancer.ratioThreshold,
0.001f);
+
+ } finally {
+ executor.shutdownNow();
}
}
}