This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new b6e258e4959 HBASE-29933: update_all_config hangs indefinitely when 
balancing even… (#7900)
b6e258e4959 is described below

commit b6e258e49591caecc13c4ce31470012670f5fde9
Author: Dev Hingu <[email protected]>
AuthorDate: Fri Mar 13 15:00:05 2026 +0530

    HBASE-29933: update_all_config hangs indefinitely when balancing even… 
(#7900)
    
    Signed-off-by: Wellington Chevreuil <[email protected]>
    Reviewed-by: Vaibhav Joshi <[email protected]>
---
 .../apache/hadoop/hbase/master/LoadBalancer.java   |  13 +
 .../master/balancer/CacheAwareLoadBalancer.java    |  11 +-
 .../org/apache/hadoop/hbase/master/HMaster.java    | 123 +++---
 .../hbase/rsgroup/RSGroupBasedLoadBalancer.java    |  78 +++-
 .../balancer/RSGroupableBalancerTestBase.java      |  29 ++
 .../balancer/TestCacheAwareLoadBalancer.java       | 425 ---------------------
 ...ancerWithCacheAwareLoadBalancerAsInternal.java} | 298 +++++++++------
 7 files changed, 376 insertions(+), 601 deletions(-)

diff --git 
a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java 
b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
index 421e44c89cd..bc77e3eac7c 100644
--- 
a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
+++ 
b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
@@ -171,4 +171,17 @@ public interface LoadBalancer extends Stoppable, 
ConfigurationObserver {
   default void throttle(RegionPlan plan) throws Exception {
     // noop
   }
+
+  default long getThrottleDurationMs(RegionPlan plan) {
+    // By default, we do not throttle, so return 0 here.
+    return 0L;
+  }
+
+  default void onBalancingStart() {
+    // noop
+  }
+
+  default void onBalancingComplete() {
+    // noop
+  }
 }
diff --git 
a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java
 
b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java
index 867c6f8aadb..b61ca31073e 100644
--- 
a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java
+++ 
b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java
@@ -69,7 +69,7 @@ public class CacheAwareLoadBalancer extends 
StochasticLoadBalancer {
   }
 
   @Override
-  public synchronized void loadConf(Configuration configuration) {
+  public void loadConf(Configuration configuration) {
     this.configuration = configuration;
     this.costFunctions = new ArrayList<>();
     super.loadConf(configuration);
@@ -172,7 +172,7 @@ public class CacheAwareLoadBalancer extends 
StochasticLoadBalancer {
   }
 
   @Override
-  public void throttle(RegionPlan plan) {
+  public long getThrottleDurationMs(RegionPlan plan) {
     Pair<ServerName, Float> rsRatio = 
this.regionCacheRatioOnOldServerMap.get(plan.getRegionName());
     if (
       rsRatio != null && plan.getDestination().equals(rsRatio.getFirst())
@@ -180,6 +180,7 @@ public class CacheAwareLoadBalancer extends 
StochasticLoadBalancer {
     ) {
       LOG.debug("Moving region {} to server {} with cache ratio {}. No 
throttling needed.",
         plan.getRegionInfo().getEncodedName(), plan.getDestination(), 
rsRatio.getSecond());
+      return 0L;
     } else {
       if (rsRatio != null) {
         LOG.debug("Moving region {} to server {} with cache ratio: {}. 
Throttling move for {}ms.",
@@ -192,11 +193,7 @@ public class CacheAwareLoadBalancer extends 
StochasticLoadBalancer {
             + "Throttling move for {}ms.",
           plan.getRegionInfo().getEncodedName(), plan.getDestination(), 
sleepTime);
       }
-      try {
-        Thread.sleep(sleepTime);
-      } catch (InterruptedException e) {
-        throw new RuntimeException(e);
-      }
+      return sleepTime;
     }
   }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index c997f1c6e82..a0f84c5672c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -2109,80 +2109,85 @@ public class HMaster extends 
HBaseServerBase<MasterRpcServices> implements Maste
     }
 
     synchronized (this.balancer) {
-      // Only allow one balance run at at time.
-      if (this.assignmentManager.getRegionTransitScheduledCount() > 0) {
-        List<RegionStateNode> regionsInTransition = 
assignmentManager.getRegionsInTransition();
-        // if hbase:meta region is in transition, result of assignment cannot 
be recorded
-        // ignore the force flag in that case
-        boolean metaInTransition = 
assignmentManager.isMetaRegionInTransition();
-        List<RegionStateNode> toPrint = regionsInTransition;
-        int max = 5;
-        boolean truncated = false;
-        if (regionsInTransition.size() > max) {
-          toPrint = regionsInTransition.subList(0, max);
-          truncated = true;
-        }
+      try {
+        this.balancer.onBalancingStart();
+        // Only allow one balance run at at time.
+        if (this.assignmentManager.getRegionTransitScheduledCount() > 0) {
+          List<RegionStateNode> regionsInTransition = 
assignmentManager.getRegionsInTransition();
+          // if hbase:meta region is in transition, result of assignment 
cannot be recorded
+          // ignore the force flag in that case
+          boolean metaInTransition = 
assignmentManager.isMetaRegionInTransition();
+          List<RegionStateNode> toPrint = regionsInTransition;
+          int max = 5;
+          boolean truncated = false;
+          if (regionsInTransition.size() > max) {
+            toPrint = regionsInTransition.subList(0, max);
+            truncated = true;
+          }
 
-        if (!request.isIgnoreRegionsInTransition() || metaInTransition) {
-          LOG.info("Not running balancer (ignoreRIT=false" + ", metaRIT=" + 
metaInTransition
-            + ") because " + assignmentManager.getRegionTransitScheduledCount()
-            + " region(s) are scheduled to transit " + toPrint
-            + (truncated ? "(truncated list)" : ""));
+          if (!request.isIgnoreRegionsInTransition() || metaInTransition) {
+            LOG.info("Not running balancer (ignoreRIT=false" + ", metaRIT=" + 
metaInTransition
+              + ") because " + 
assignmentManager.getRegionTransitScheduledCount()
+              + " region(s) are scheduled to transit " + toPrint
+              + (truncated ? "(truncated list)" : ""));
+            return responseBuilder.build();
+          }
+        }
+        if (this.serverManager.areDeadServersInProgress()) {
+          LOG.info("Not running balancer because processing dead 
regionserver(s): "
+            + this.serverManager.getDeadServers());
           return responseBuilder.build();
         }
-      }
-      if (this.serverManager.areDeadServersInProgress()) {
-        LOG.info("Not running balancer because processing dead 
regionserver(s): "
-          + this.serverManager.getDeadServers());
-        return responseBuilder.build();
-      }
 
-      if (this.cpHost != null) {
-        try {
-          if (this.cpHost.preBalance(request)) {
-            LOG.debug("Coprocessor bypassing balancer request");
+        if (this.cpHost != null) {
+          try {
+            if (this.cpHost.preBalance(request)) {
+              LOG.debug("Coprocessor bypassing balancer request");
+              return responseBuilder.build();
+            }
+          } catch (IOException ioe) {
+            LOG.error("Error invoking master coprocessor preBalance()", ioe);
             return responseBuilder.build();
           }
-        } catch (IOException ioe) {
-          LOG.error("Error invoking master coprocessor preBalance()", ioe);
-          return responseBuilder.build();
         }
-      }
-
-      Map<TableName, Map<ServerName, List<RegionInfo>>> assignments =
-        
this.assignmentManager.getRegionStates().getAssignmentsForBalancer(tableStateManager,
-          this.serverManager.getOnlineServersList());
-      for (Map<ServerName, List<RegionInfo>> serverMap : assignments.values()) 
{
-        
serverMap.keySet().removeAll(this.serverManager.getDrainingServersList());
-      }
 
-      // Give the balancer the current cluster state.
-      
this.balancer.updateClusterMetrics(getClusterMetricsWithoutCoprocessor());
+        Map<TableName, Map<ServerName, List<RegionInfo>>> assignments =
+          
this.assignmentManager.getRegionStates().getAssignmentsForBalancer(tableStateManager,
+            this.serverManager.getOnlineServersList());
+        for (Map<ServerName, List<RegionInfo>> serverMap : 
assignments.values()) {
+          
serverMap.keySet().removeAll(this.serverManager.getDrainingServersList());
+        }
 
-      List<RegionPlan> plans = this.balancer.balanceCluster(assignments);
+        // Give the balancer the current cluster state.
+        
this.balancer.updateClusterMetrics(getClusterMetricsWithoutCoprocessor());
 
-      responseBuilder.setBalancerRan(true).setMovesCalculated(plans == null ? 
0 : plans.size());
+        List<RegionPlan> plans = this.balancer.balanceCluster(assignments);
 
-      if (skipRegionManagementAction("balancer")) {
-        // make one last check that the cluster isn't shutting down before 
proceeding.
-        return responseBuilder.build();
-      }
+        responseBuilder.setBalancerRan(true).setMovesCalculated(plans == null 
? 0 : plans.size());
 
-      // For dry run we don't actually want to execute the moves, but we do 
want
-      // to execute the coprocessor below
-      List<RegionPlan> sucRPs =
-        request.isDryRun() ? Collections.emptyList() : 
executeRegionPlansWithThrottling(plans);
+        if (skipRegionManagementAction("balancer")) {
+          // make one last check that the cluster isn't shutting down before 
proceeding.
+          return responseBuilder.build();
+        }
 
-      if (this.cpHost != null) {
-        try {
-          this.cpHost.postBalance(request, sucRPs);
-        } catch (IOException ioe) {
-          // balancing already succeeded so don't change the result
-          LOG.error("Error invoking master coprocessor postBalance()", ioe);
+        // For dry run we don't actually want to execute the moves, but we do 
want
+        // to execute the coprocessor below
+        List<RegionPlan> sucRPs =
+          request.isDryRun() ? Collections.emptyList() : 
executeRegionPlansWithThrottling(plans);
+
+        if (this.cpHost != null) {
+          try {
+            this.cpHost.postBalance(request, sucRPs);
+          } catch (IOException ioe) {
+            // balancing already succeeded so don't change the result
+            LOG.error("Error invoking master coprocessor postBalance()", ioe);
+          }
         }
-      }
 
-      responseBuilder.setMovesExecuted(sucRPs.size());
+        responseBuilder.setMovesExecuted(sucRPs.size());
+      } finally {
+        this.balancer.onBalancingComplete();
+      }
     }
 
     // If LoadBalancer did not generate any plans, it means the cluster is 
already balanced.
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java
index f539d1700ab..0b012eb9234 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java
@@ -26,6 +26,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ClusterMetrics;
@@ -78,6 +80,16 @@ public class RSGroupBasedLoadBalancer implements 
LoadBalancer {
   private volatile RSGroupInfoManager rsGroupInfoManager;
   private volatile LoadBalancer internalBalancer;
 
+  /**
+   * Tracks whether a balance run is currently in progress.
+   */
+  private final AtomicBoolean isBalancing = new AtomicBoolean(false);
+
+  /**
+   * Holds a configuration update that arrived while a balance run was in 
progress.
+   */
+  private AtomicReference<Configuration> pendingConfiguration = new 
AtomicReference<>();
+
   /**
    * Set this key to {@code true} to allow region fallback. Fallback to the 
default rsgroup first,
    * then fallback to any group if no online servers in default rsgroup. 
Please keep balancer switch
@@ -396,7 +408,26 @@ public class RSGroupBasedLoadBalancer implements 
LoadBalancer {
   }
 
   @Override
-  public synchronized void onConfigurationChange(Configuration conf) {
+  public void onConfigurationChange(Configuration conf) {
+    // Refer to HBASE-29933
+    synchronized (this) {
+      // If balance is running, store configuration in pendingConfiguration 
and return immediately.
+      // Defer the config update.
+      if (isBalancing.get()) {
+        LOG.debug(
+          "Balance is in progress, defer applying configuration change until 
balance completed.");
+        pendingConfiguration.set(conf);
+      } else {
+        // Apply configuration change immediately.
+        updateConfiguration(conf);
+      }
+    }
+  }
+
+  /**
+   * Applies the given configuration.
+   */
+  private void updateConfiguration(Configuration conf) {
     boolean newFallbackEnabled = conf.getBoolean(FALLBACK_GROUP_ENABLE_KEY, 
false);
     if (fallbackEnabled != newFallbackEnabled) {
       LOG.info("Changing the value of {} from {} to {}", 
FALLBACK_GROUP_ENABLE_KEY, fallbackEnabled,
@@ -407,6 +438,51 @@ public class RSGroupBasedLoadBalancer implements 
LoadBalancer {
     internalBalancer.onConfigurationChange(conf);
   }
 
+  /**
+   * If a pending configuration was stored during a balance run, apply it and 
clear the pending
+   * reference.
+   */
+  public void applyPendingConfiguration() {
+    Configuration toApply = pendingConfiguration.getAndSet(null);
+    if (toApply != null) {
+      LOG.info("Applying pending configuration after balance completed.");
+      updateConfiguration(toApply);
+    }
+  }
+
+  /**
+   * Sets {@link #isBalancing} to {@code true} before a balance run starts.
+   */
+  @Override
+  public void onBalancingStart() {
+    LOG.debug("setting isBalancing to true as balance is starting");
+    isBalancing.set(true);
+  }
+
+  /**
+   * Sets {@link #isBalancing} to {@code false} after a balance run completes 
and applies any
+   * pending configuration that arrived during balancing.
+   */
+  @Override
+  public void onBalancingComplete() {
+    LOG.debug("setting isBalancing to false as balance is completed");
+    isBalancing.set(false);
+    applyPendingConfiguration();
+  }
+
+  @Override
+  public void throttle(RegionPlan plan) throws Exception {
+    long throttlingTime = internalBalancer.getThrottleDurationMs(plan);
+    if (throttlingTime > 0) {
+      try {
+        // Release the monitor while waiting to avoid blocking other threads.
+        wait(throttlingTime);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
   @Override
   public void stop(String why) {
     internalBalancer.stop(why);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/RSGroupableBalancerTestBase.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/RSGroupableBalancerTestBase.java
index 1ec08fbefe0..ab76f1c7add 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/RSGroupableBalancerTestBase.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/RSGroupableBalancerTestBase.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.master.balancer;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -36,7 +38,10 @@ import java.util.concurrent.ThreadLocalRandom;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.RegionMetrics;
+import org.apache.hadoop.hbase.ServerMetrics;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Size;
 import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
@@ -431,4 +436,28 @@ public class RSGroupableBalancerTestBase extends 
BalancerTestBase {
     }
     return tableName;
   }
+
+  protected ServerMetrics mockServerMetricsWithRegionCacheInfo(ServerName 
server,
+    List<RegionInfo> regionsOnServer, float currentCacheRatio, 
List<RegionInfo> oldRegionCacheInfo,
+    int oldRegionCachedSize, int regionSize) {
+    ServerMetrics serverMetrics = mock(ServerMetrics.class);
+    Map<byte[], RegionMetrics> regionLoadMap = new 
TreeMap<>(Bytes.BYTES_COMPARATOR);
+    for (RegionInfo info : regionsOnServer) {
+      RegionMetrics rl = mock(RegionMetrics.class);
+      when(rl.getReadRequestCount()).thenReturn(0L);
+      when(rl.getWriteRequestCount()).thenReturn(0L);
+      when(rl.getMemStoreSize()).thenReturn(Size.ZERO);
+      when(rl.getStoreFileSize()).thenReturn(Size.ZERO);
+      when(rl.getCurrentRegionCachedRatio()).thenReturn(currentCacheRatio);
+      when(rl.getRegionSizeMB()).thenReturn(new Size(regionSize, 
Size.Unit.MEGABYTE));
+      regionLoadMap.put(info.getRegionName(), rl);
+    }
+    when(serverMetrics.getRegionMetrics()).thenReturn(regionLoadMap);
+    Map<String, Integer> oldCacheRatioMap = new HashMap<>();
+    for (RegionInfo info : oldRegionCacheInfo) {
+      oldCacheRatioMap.put(info.getEncodedName(), oldRegionCachedSize);
+    }
+    when(serverMetrics.getRegionCachedInfo()).thenReturn(oldCacheRatioMap);
+    return serverMetrics;
+  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancer.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancer.java
index 916dbcf2947..ac62da5c356 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancer.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancer.java
@@ -17,21 +17,16 @@
  */
 package org.apache.hadoop.hbase.master.balancer;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
-import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.hadoop.conf.Configuration;
@@ -50,8 +45,6 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.master.RegionPlan;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Pair;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -75,8 +68,6 @@ public class TestCacheAwareLoadBalancer extends 
BalancerTestBase {
 
   static List<TableDescriptor> tableDescs;
 
-  static Map<TableName, String> tableMap = new HashMap<>();
-
   static TableName[] tables = new TableName[] { TableName.valueOf("dt1"), 
TableName.valueOf("dt2"),
     TableName.valueOf("dt3"), TableName.valueOf("dt4") };
 
@@ -135,422 +126,6 @@ public class TestCacheAwareLoadBalancer extends 
BalancerTestBase {
     loadBalancer.loadConf(conf);
   }
 
-  @Test
-  public void testRegionsNotCachedOnOldServerAndCurrentServer() throws 
Exception {
-    // The regions are not cached on old server as well as the current server. 
This causes
-    // skewness in the region allocation which should be fixed by the balancer
-
-    Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
-    ServerName server0 = servers.get(0);
-    ServerName server1 = servers.get(1);
-    ServerName server2 = servers.get(2);
-
-    // Simulate that the regions previously hosted by server1 are now hosted 
on server0
-    List<RegionInfo> regionsOnServer0 = randomRegions(10);
-    List<RegionInfo> regionsOnServer1 = randomRegions(0);
-    List<RegionInfo> regionsOnServer2 = randomRegions(5);
-
-    clusterState.put(server0, regionsOnServer0);
-    clusterState.put(server1, regionsOnServer1);
-    clusterState.put(server2, regionsOnServer2);
-
-    // Mock cluster metrics
-    Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
-    serverMetricsMap.put(server0, 
mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
-      0.0f, new ArrayList<>(), 0, 10));
-    serverMetricsMap.put(server1, 
mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1,
-      0.0f, new ArrayList<>(), 0, 10));
-    serverMetricsMap.put(server2, 
mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2,
-      0.0f, new ArrayList<>(), 0, 10));
-    ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
-    when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
-    loadBalancer.updateClusterMetrics(clusterMetrics);
-
-    Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
-      (Map) mockClusterServersWithTables(clusterState);
-    List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
-    Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>();
-    Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>();
-    for (RegionPlan plan : plans) {
-      if (plan.getSource().equals(server0)) {
-        regionsMovedFromServer0.add(plan.getRegionInfo());
-        if (!targetServers.containsKey(plan.getDestination())) {
-          targetServers.put(plan.getDestination(), new ArrayList<>());
-        }
-        targetServers.get(plan.getDestination()).add(plan.getRegionInfo());
-      }
-    }
-    // should move 5 regions from server0 to server 1
-    assertEquals(5, regionsMovedFromServer0.size());
-    assertEquals(5, targetServers.get(server1).size());
-  }
-
-  @Test
-  public void 
testRegionsPartiallyCachedOnOldServerAndNotCachedOnCurrentServer() throws 
Exception {
-    // The regions are partially cached on old server but not cached on the 
current server
-
-    Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
-    ServerName server0 = servers.get(0);
-    ServerName server1 = servers.get(1);
-    ServerName server2 = servers.get(2);
-
-    // Simulate that the regions previously hosted by server1 are now hosted 
on server0
-    List<RegionInfo> regionsOnServer0 = randomRegions(10);
-    List<RegionInfo> regionsOnServer1 = randomRegions(0);
-    List<RegionInfo> regionsOnServer2 = randomRegions(5);
-
-    clusterState.put(server0, regionsOnServer0);
-    clusterState.put(server1, regionsOnServer1);
-    clusterState.put(server2, regionsOnServer2);
-
-    // Mock cluster metrics
-
-    // Mock 5 regions from server0 were previously hosted on server1
-    List<RegionInfo> oldCachedRegions = regionsOnServer0.subList(5, 
regionsOnServer0.size() - 1);
-
-    Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
-    serverMetricsMap.put(server0, 
mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
-      0.0f, new ArrayList<>(), 0, 10));
-    serverMetricsMap.put(server1, 
mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1,
-      0.0f, oldCachedRegions, 6, 10));
-    serverMetricsMap.put(server2, 
mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2,
-      0.0f, new ArrayList<>(), 0, 10));
-    ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
-    when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
-    loadBalancer.updateClusterMetrics(clusterMetrics);
-
-    Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
-      (Map) mockClusterServersWithTables(clusterState);
-    List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
-    Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>();
-    Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>();
-    for (RegionPlan plan : plans) {
-      if (plan.getSource().equals(server0)) {
-        regionsMovedFromServer0.add(plan.getRegionInfo());
-        if (!targetServers.containsKey(plan.getDestination())) {
-          targetServers.put(plan.getDestination(), new ArrayList<>());
-        }
-        targetServers.get(plan.getDestination()).add(plan.getRegionInfo());
-      }
-    }
-    // should move 5 regions from server0 to server1
-    assertEquals(5, regionsMovedFromServer0.size());
-    assertEquals(5, targetServers.get(server1).size());
-    assertTrue(targetServers.get(server1).containsAll(oldCachedRegions));
-  }
-
-  @Test
-  public void testThrottlingRegionBeyondThreshold() throws Exception {
-    Configuration conf = HBaseConfiguration.create();
-    CacheAwareLoadBalancer balancer = new CacheAwareLoadBalancer();
-    balancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf));
-    balancer.loadConf(conf);
-    balancer.initialize();
-    ServerName server0 = servers.get(0);
-    ServerName server1 = servers.get(1);
-    Pair<ServerName, Float> regionRatio = new Pair<>();
-    regionRatio.setFirst(server0);
-    regionRatio.setSecond(1.0f);
-    balancer.regionCacheRatioOnOldServerMap.put("region1", regionRatio);
-    RegionInfo mockedInfo = mock(RegionInfo.class);
-    when(mockedInfo.getEncodedName()).thenReturn("region1");
-    RegionPlan plan = new RegionPlan(mockedInfo, server1, server0);
-    long startTime = EnvironmentEdgeManager.currentTime();
-    balancer.throttle(plan);
-    long endTime = EnvironmentEdgeManager.currentTime();
-    assertTrue((endTime - startTime) < 10);
-  }
-
-  @Test
-  public void testThrottlingRegionBelowThreshold() throws Exception {
-    Configuration conf = HBaseConfiguration.create();
-    conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 100);
-    CacheAwareLoadBalancer balancer = new CacheAwareLoadBalancer();
-    balancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf));
-    balancer.loadConf(conf);
-    balancer.initialize();
-    ServerName server0 = servers.get(0);
-    ServerName server1 = servers.get(1);
-    Pair<ServerName, Float> regionRatio = new Pair<>();
-    regionRatio.setFirst(server0);
-    regionRatio.setSecond(0.1f);
-    balancer.regionCacheRatioOnOldServerMap.put("region1", regionRatio);
-    RegionInfo mockedInfo = mock(RegionInfo.class);
-    when(mockedInfo.getEncodedName()).thenReturn("region1");
-    RegionPlan plan = new RegionPlan(mockedInfo, server1, server0);
-    long startTime = EnvironmentEdgeManager.currentTime();
-    balancer.throttle(plan);
-    long endTime = EnvironmentEdgeManager.currentTime();
-    assertTrue((endTime - startTime) >= 100);
-  }
-
-  @Test
-  public void testThrottlingCacheRatioUnknownOnTarget() throws Exception {
-    Configuration conf = HBaseConfiguration.create();
-    conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 100);
-    CacheAwareLoadBalancer balancer = new CacheAwareLoadBalancer();
-    balancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf));
-    balancer.loadConf(conf);
-    balancer.initialize();
-    ServerName server0 = servers.get(0);
-    ServerName server1 = servers.get(1);
-    ServerName server3 = servers.get(2);
-    // setting region cache ratio 100% on server 3, though this is not the 
target in the region plan
-    Pair<ServerName, Float> regionRatio = new Pair<>();
-    regionRatio.setFirst(server3);
-    regionRatio.setSecond(1.0f);
-    balancer.regionCacheRatioOnOldServerMap.put("region1", regionRatio);
-    RegionInfo mockedInfo = mock(RegionInfo.class);
-    when(mockedInfo.getEncodedName()).thenReturn("region1");
-    RegionPlan plan = new RegionPlan(mockedInfo, server1, server0);
-    long startTime = EnvironmentEdgeManager.currentTime();
-    balancer.throttle(plan);
-    long endTime = EnvironmentEdgeManager.currentTime();
-    assertTrue((endTime - startTime) >= 100);
-  }
-
-  @Test
-  public void testThrottlingCacheRatioUnknownForRegion() throws Exception {
-    Configuration conf = HBaseConfiguration.create();
-    conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 100);
-    CacheAwareLoadBalancer balancer = new CacheAwareLoadBalancer();
-    balancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf));
-    balancer.loadConf(conf);
-    balancer.initialize();
-    ServerName server0 = servers.get(0);
-    ServerName server1 = servers.get(1);
-    ServerName server3 = servers.get(2);
-    // No cache ratio available for region1
-    RegionInfo mockedInfo = mock(RegionInfo.class);
-    when(mockedInfo.getEncodedName()).thenReturn("region1");
-    RegionPlan plan = new RegionPlan(mockedInfo, server1, server0);
-    long startTime = EnvironmentEdgeManager.currentTime();
-    balancer.throttle(plan);
-    long endTime = EnvironmentEdgeManager.currentTime();
-    assertTrue((endTime - startTime) >= 100);
-  }
-
-  @Test
-  public void testRegionPlansSortedByCacheRatioOnTarget() throws Exception {
-    // The regions are fully cached on old server
-
-    Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
-    ServerName server0 = servers.get(0);
-    ServerName server1 = servers.get(1);
-    ServerName server2 = servers.get(2);
-
-    // Simulate on RS with all regions, and two RSes with no regions
-    List<RegionInfo> regionsOnServer0 = randomRegions(15);
-    List<RegionInfo> regionsOnServer1 = randomRegions(0);
-    List<RegionInfo> regionsOnServer2 = randomRegions(0);
-
-    clusterState.put(server0, regionsOnServer0);
-    clusterState.put(server1, regionsOnServer1);
-    clusterState.put(server2, regionsOnServer2);
-
-    // Mock cluster metrics
-    // Mock 5 regions from server0 were previously hosted on server1
-    List<RegionInfo> oldCachedRegions1 = regionsOnServer0.subList(5, 10);
-    List<RegionInfo> oldCachedRegions2 = regionsOnServer0.subList(10, 
regionsOnServer0.size());
-    Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
-    // mock server metrics to set cache ratio as 0 in the RS 0
-    serverMetricsMap.put(server0, 
mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
-      0.0f, new ArrayList<>(), 0, 10));
-    // mock server metrics to set cache ratio as 1 in the RS 1
-    serverMetricsMap.put(server1, 
mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1,
-      0.0f, oldCachedRegions1, 10, 10));
-    // mock server metrics to set cache ratio as .8 in the RS 2
-    serverMetricsMap.put(server2, 
mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2,
-      0.0f, oldCachedRegions2, 8, 10));
-    ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
-    when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
-    loadBalancer.updateClusterMetrics(clusterMetrics);
-
-    Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
-      (Map) mockClusterServersWithTables(clusterState);
-    List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
-    LOG.debug("plans size: {}", plans.size());
-    LOG.debug("plans: {}", plans);
-    LOG.debug("server1 name: {}", server1.getServerName());
-    // assert the plans are in descending order from the most cached to the 
least cached
-    int highCacheCount = 0;
-    for (RegionPlan plan : plans) {
-      LOG.debug("plan region: {}, target server: {}", 
plan.getRegionInfo().getEncodedName(),
-        plan.getDestination().getServerName());
-      if (highCacheCount < 5) {
-        LOG.debug("Count: {}", highCacheCount);
-        assertTrue(oldCachedRegions1.contains(plan.getRegionInfo()));
-        assertFalse(oldCachedRegions2.contains(plan.getRegionInfo()));
-        highCacheCount++;
-      } else {
-        assertTrue(oldCachedRegions2.contains(plan.getRegionInfo()));
-        assertFalse(oldCachedRegions1.contains(plan.getRegionInfo()));
-      }
-    }
-
-  }
-
-  @Test
-  public void testRegionsFullyCachedOnOldServerAndNotCachedOnCurrentServers() 
throws Exception {
-    // The regions are fully cached on old server
-
-    Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
-    ServerName server0 = servers.get(0);
-    ServerName server1 = servers.get(1);
-    ServerName server2 = servers.get(2);
-
-    // Simulate that the regions previously hosted by server1 are now hosted 
on server0
-    List<RegionInfo> regionsOnServer0 = randomRegions(10);
-    List<RegionInfo> regionsOnServer1 = randomRegions(0);
-    List<RegionInfo> regionsOnServer2 = randomRegions(5);
-
-    clusterState.put(server0, regionsOnServer0);
-    clusterState.put(server1, regionsOnServer1);
-    clusterState.put(server2, regionsOnServer2);
-
-    // Mock cluster metrics
-
-    // Mock 5 regions from server0 were previously hosted on server1
-    List<RegionInfo> oldCachedRegions = regionsOnServer0.subList(5, 
regionsOnServer0.size() - 1);
-
-    Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
-    serverMetricsMap.put(server0, 
mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
-      0.0f, new ArrayList<>(), 0, 10));
-    serverMetricsMap.put(server1, 
mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1,
-      0.0f, oldCachedRegions, 10, 10));
-    serverMetricsMap.put(server2, 
mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2,
-      0.0f, new ArrayList<>(), 0, 10));
-    ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
-    when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
-    loadBalancer.updateClusterMetrics(clusterMetrics);
-
-    Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
-      (Map) mockClusterServersWithTables(clusterState);
-    List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
-    Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>();
-    Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>();
-    for (RegionPlan plan : plans) {
-      if (plan.getSource().equals(server0)) {
-        regionsMovedFromServer0.add(plan.getRegionInfo());
-        if (!targetServers.containsKey(plan.getDestination())) {
-          targetServers.put(plan.getDestination(), new ArrayList<>());
-        }
-        targetServers.get(plan.getDestination()).add(plan.getRegionInfo());
-      }
-    }
-    // should move 5 regions from server0 to server1
-    assertEquals(5, regionsMovedFromServer0.size());
-    assertEquals(5, targetServers.get(server1).size());
-    assertTrue(targetServers.get(server1).containsAll(oldCachedRegions));
-  }
-
-  @Test
-  public void testRegionsFullyCachedOnOldAndCurrentServers() throws Exception {
-    // The regions are fully cached on old server
-
-    Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
-    ServerName server0 = servers.get(0);
-    ServerName server1 = servers.get(1);
-    ServerName server2 = servers.get(2);
-
-    // Simulate that the regions previously hosted by server1 are now hosted 
on server0
-    List<RegionInfo> regionsOnServer0 = randomRegions(10);
-    List<RegionInfo> regionsOnServer1 = randomRegions(0);
-    List<RegionInfo> regionsOnServer2 = randomRegions(5);
-
-    clusterState.put(server0, regionsOnServer0);
-    clusterState.put(server1, regionsOnServer1);
-    clusterState.put(server2, regionsOnServer2);
-
-    // Mock cluster metrics
-
-    // Mock 5 regions from server0 were previously hosted on server1
-    List<RegionInfo> oldCachedRegions = regionsOnServer0.subList(5, 
regionsOnServer0.size() - 1);
-
-    Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
-    serverMetricsMap.put(server0, 
mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
-      1.0f, new ArrayList<>(), 0, 10));
-    serverMetricsMap.put(server1, 
mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1,
-      1.0f, oldCachedRegions, 10, 10));
-    serverMetricsMap.put(server2, 
mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2,
-      1.0f, new ArrayList<>(), 0, 10));
-    ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
-    when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
-    loadBalancer.updateClusterMetrics(clusterMetrics);
-
-    Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
-      (Map) mockClusterServersWithTables(clusterState);
-    List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
-    Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>();
-    Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>();
-    for (RegionPlan plan : plans) {
-      if (plan.getSource().equals(server0)) {
-        regionsMovedFromServer0.add(plan.getRegionInfo());
-        if (!targetServers.containsKey(plan.getDestination())) {
-          targetServers.put(plan.getDestination(), new ArrayList<>());
-        }
-        targetServers.get(plan.getDestination()).add(plan.getRegionInfo());
-      }
-    }
-    // should move 5 regions from server0 to server1
-    assertEquals(5, regionsMovedFromServer0.size());
-    assertEquals(5, targetServers.get(server1).size());
-    assertTrue(targetServers.get(server1).containsAll(oldCachedRegions));
-  }
-
-  @Test
-  public void testRegionsPartiallyCachedOnOldServerAndCurrentServer() throws 
Exception {
-    // The regions are partially cached on old server
-
-    Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
-    ServerName server0 = servers.get(0);
-    ServerName server1 = servers.get(1);
-    ServerName server2 = servers.get(2);
-
-    // Simulate that the regions previously hosted by server1 are now hosted 
on server0
-    List<RegionInfo> regionsOnServer0 = randomRegions(10);
-    List<RegionInfo> regionsOnServer1 = randomRegions(0);
-    List<RegionInfo> regionsOnServer2 = randomRegions(5);
-
-    clusterState.put(server0, regionsOnServer0);
-    clusterState.put(server1, regionsOnServer1);
-    clusterState.put(server2, regionsOnServer2);
-
-    // Mock cluster metrics
-
-    // Mock 5 regions from server0 were previously hosted on server1
-    List<RegionInfo> oldCachedRegions = regionsOnServer0.subList(5, 
regionsOnServer0.size() - 1);
-
-    Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
-    serverMetricsMap.put(server0, 
mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
-      0.2f, new ArrayList<>(), 0, 10));
-    serverMetricsMap.put(server1, 
mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1,
-      0.0f, oldCachedRegions, 6, 10));
-    serverMetricsMap.put(server2, 
mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2,
-      1.0f, new ArrayList<>(), 0, 10));
-    ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
-    when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
-    loadBalancer.updateClusterMetrics(clusterMetrics);
-
-    Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
-      (Map) mockClusterServersWithTables(clusterState);
-    List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
-    Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>();
-    Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>();
-    for (RegionPlan plan : plans) {
-      if (plan.getSource().equals(server0)) {
-        regionsMovedFromServer0.add(plan.getRegionInfo());
-        if (!targetServers.containsKey(plan.getDestination())) {
-          targetServers.put(plan.getDestination(), new ArrayList<>());
-        }
-        targetServers.get(plan.getDestination()).add(plan.getRegionInfo());
-      }
-    }
-    assertEquals(5, regionsMovedFromServer0.size());
-    assertEquals(5, targetServers.get(server1).size());
-    assertTrue(targetServers.get(server1).containsAll(oldCachedRegions));
-  }
-
   @Test
   public void testBalancerNotThrowNPEWhenBalancerPlansIsNull() throws 
Exception {
     Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancer.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancerWithCacheAwareLoadBalancerAsInternal.java
similarity index 71%
copy from 
hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancer.java
copy to 
hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancerWithCacheAwareLoadBalancerAsInternal.java
index 916dbcf2947..c92f1e5a205 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancer.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancerWithCacheAwareLoadBalancerAsInternal.java
@@ -19,9 +19,7 @@ package org.apache.hadoop.hbase.master.balancer;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -30,26 +28,25 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ClusterMetrics;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.RegionMetrics;
 import org.apache.hadoop.hbase.ServerMetrics;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.Size;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.master.RegionPlan;
+import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
+import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Pair;
 import org.junit.BeforeClass;
@@ -59,80 +56,31 @@ import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-
-@Category({ LargeTests.class })
-public class TestCacheAwareLoadBalancer extends BalancerTestBase {
+@Category(LargeTests.class)
+public class TestRSGroupBasedLoadBalancerWithCacheAwareLoadBalancerAsInternal
+  extends RSGroupableBalancerTestBase {
   @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-    HBaseClassTestRule.forClass(TestCacheAwareLoadBalancer.class);
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(TestCacheAwareLoadBalancer.class);
-
-  private static CacheAwareLoadBalancer loadBalancer;
-
-  static List<ServerName> servers;
-
-  static List<TableDescriptor> tableDescs;
-
-  static Map<TableName, String> tableMap = new HashMap<>();
-
-  static TableName[] tables = new TableName[] { TableName.valueOf("dt1"), 
TableName.valueOf("dt2"),
-    TableName.valueOf("dt3"), TableName.valueOf("dt4") };
+  public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule
+    
.forClass(TestRSGroupBasedLoadBalancerWithCacheAwareLoadBalancerAsInternal.class);
 
-  private static List<ServerName> generateServers(int numServers) {
-    List<ServerName> servers = new ArrayList<>(numServers);
-    Random rand = ThreadLocalRandom.current();
-    for (int i = 0; i < numServers; i++) {
-      String host = "server" + rand.nextInt(100000);
-      int port = rand.nextInt(60000);
-      servers.add(ServerName.valueOf(host, port, -1));
-    }
-    return servers;
-  }
+  private static final Logger LOG =
+    
LoggerFactory.getLogger(TestRSGroupBasedLoadBalancerWithCacheAwareLoadBalancerAsInternal.class);
 
-  private static List<TableDescriptor> constructTableDesc(boolean 
hasBogusTable) {
-    List<TableDescriptor> tds = Lists.newArrayList();
-    for (int i = 0; i < tables.length; i++) {
-      TableDescriptor htd = 
TableDescriptorBuilder.newBuilder(tables[i]).build();
-      tds.add(htd);
-    }
-    return tds;
-  }
-
-  private ServerMetrics mockServerMetricsWithRegionCacheInfo(ServerName server,
-    List<RegionInfo> regionsOnServer, float currentCacheRatio, 
List<RegionInfo> oldRegionCacheInfo,
-    int oldRegionCachedSize, int regionSize) {
-    ServerMetrics serverMetrics = mock(ServerMetrics.class);
-    Map<byte[], RegionMetrics> regionLoadMap = new 
TreeMap<>(Bytes.BYTES_COMPARATOR);
-    for (RegionInfo info : regionsOnServer) {
-      RegionMetrics rl = mock(RegionMetrics.class);
-      when(rl.getReadRequestCount()).thenReturn(0L);
-      when(rl.getWriteRequestCount()).thenReturn(0L);
-      when(rl.getMemStoreSize()).thenReturn(Size.ZERO);
-      when(rl.getStoreFileSize()).thenReturn(Size.ZERO);
-      when(rl.getCurrentRegionCachedRatio()).thenReturn(currentCacheRatio);
-      when(rl.getRegionSizeMB()).thenReturn(new Size(regionSize, 
Size.Unit.MEGABYTE));
-      regionLoadMap.put(info.getRegionName(), rl);
-    }
-    when(serverMetrics.getRegionMetrics()).thenReturn(regionLoadMap);
-    Map<String, Integer> oldCacheRatioMap = new HashMap<>();
-    for (RegionInfo info : oldRegionCacheInfo) {
-      oldCacheRatioMap.put(info.getEncodedName(), oldRegionCachedSize);
-    }
-    when(serverMetrics.getRegionCachedInfo()).thenReturn(oldCacheRatioMap);
-    return serverMetrics;
-  }
+  private static RSGroupBasedLoadBalancer loadBalancer;
 
   @BeforeClass
   public static void beforeAllTests() throws Exception {
+    groups = new String[] { RSGroupInfo.DEFAULT_GROUP };
     servers = generateServers(3);
+    groupMap = constructGroupInfo(servers, groups);
     tableDescs = constructTableDesc(false);
-    Configuration conf = HBaseConfiguration.create();
+    Configuration cong = HBaseConfiguration.create();
     conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, 
"prefetch_file_list");
-    loadBalancer = new CacheAwareLoadBalancer();
-    loadBalancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf));
-    loadBalancer.loadConf(conf);
+    conf.set("hbase.rsgroup.grouploadbalancer.class",
+      CacheAwareLoadBalancer.class.getCanonicalName());
+    loadBalancer = new RSGroupBasedLoadBalancer();
+    loadBalancer.setMasterServices(getMockedMaster());
+    loadBalancer.initialize();
   }
 
   @Test
@@ -242,21 +190,28 @@ public class TestCacheAwareLoadBalancer extends 
BalancerTestBase {
   @Test
   public void testThrottlingRegionBeyondThreshold() throws Exception {
     Configuration conf = HBaseConfiguration.create();
-    CacheAwareLoadBalancer balancer = new CacheAwareLoadBalancer();
-    balancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf));
-    balancer.loadConf(conf);
+    conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, 
"prefetch_file_list");
+    conf.set("hbase.rsgroup.grouploadbalancer.class",
+      CacheAwareLoadBalancer.class.getCanonicalName());
+    RSGroupBasedLoadBalancer balancer = new RSGroupBasedLoadBalancer();
+    balancer.setMasterServices(getMockedMaster());
     balancer.initialize();
+
     ServerName server0 = servers.get(0);
     ServerName server1 = servers.get(1);
     Pair<ServerName, Float> regionRatio = new Pair<>();
     regionRatio.setFirst(server0);
     regionRatio.setSecond(1.0f);
-    balancer.regionCacheRatioOnOldServerMap.put("region1", regionRatio);
+    CacheAwareLoadBalancer internalBalancer =
+      (CacheAwareLoadBalancer) balancer.getInternalBalancer();
+    internalBalancer.regionCacheRatioOnOldServerMap.put("region1", 
regionRatio);
     RegionInfo mockedInfo = mock(RegionInfo.class);
     when(mockedInfo.getEncodedName()).thenReturn("region1");
     RegionPlan plan = new RegionPlan(mockedInfo, server1, server0);
     long startTime = EnvironmentEdgeManager.currentTime();
-    balancer.throttle(plan);
+    synchronized (balancer) {
+      balancer.throttle(plan);
+    }
     long endTime = EnvironmentEdgeManager.currentTime();
     assertTrue((endTime - startTime) < 10);
   }
@@ -264,22 +219,31 @@ public class TestCacheAwareLoadBalancer extends 
BalancerTestBase {
   @Test
   public void testThrottlingRegionBelowThreshold() throws Exception {
     Configuration conf = HBaseConfiguration.create();
+    conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, 
"prefetch_file_list");
     conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 100);
-    CacheAwareLoadBalancer balancer = new CacheAwareLoadBalancer();
-    balancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf));
-    balancer.loadConf(conf);
-    balancer.initialize();
+    conf.set("hbase.rsgroup.grouploadbalancer.class",
+      CacheAwareLoadBalancer.class.getCanonicalName());
+    RSGroupBasedLoadBalancer loadBalancer = new RSGroupBasedLoadBalancer();
+    loadBalancer.setMasterServices(getMockedMaster());
+    loadBalancer.initialize();
+    CacheAwareLoadBalancer internalBalancer =
+      (CacheAwareLoadBalancer) loadBalancer.getInternalBalancer();
+    internalBalancer.loadConf(conf);
+
     ServerName server0 = servers.get(0);
     ServerName server1 = servers.get(1);
     Pair<ServerName, Float> regionRatio = new Pair<>();
     regionRatio.setFirst(server0);
     regionRatio.setSecond(0.1f);
-    balancer.regionCacheRatioOnOldServerMap.put("region1", regionRatio);
+    internalBalancer = (CacheAwareLoadBalancer) 
loadBalancer.getInternalBalancer();
+    internalBalancer.regionCacheRatioOnOldServerMap.put("region1", 
regionRatio);
     RegionInfo mockedInfo = mock(RegionInfo.class);
     when(mockedInfo.getEncodedName()).thenReturn("region1");
     RegionPlan plan = new RegionPlan(mockedInfo, server1, server0);
     long startTime = EnvironmentEdgeManager.currentTime();
-    balancer.throttle(plan);
+    synchronized (loadBalancer) {
+      loadBalancer.throttle(plan);
+    }
     long endTime = EnvironmentEdgeManager.currentTime();
     assertTrue((endTime - startTime) >= 100);
   }
@@ -287,11 +251,17 @@ public class TestCacheAwareLoadBalancer extends 
BalancerTestBase {
   @Test
   public void testThrottlingCacheRatioUnknownOnTarget() throws Exception {
     Configuration conf = HBaseConfiguration.create();
+    conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, 
"prefetch_file_list");
     conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 100);
-    CacheAwareLoadBalancer balancer = new CacheAwareLoadBalancer();
-    balancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf));
-    balancer.loadConf(conf);
-    balancer.initialize();
+    conf.set("hbase.rsgroup.grouploadbalancer.class",
+      CacheAwareLoadBalancer.class.getCanonicalName());
+    RSGroupBasedLoadBalancer loadBalancer = new RSGroupBasedLoadBalancer();
+    loadBalancer.setMasterServices(getMockedMaster());
+    loadBalancer.initialize();
+    CacheAwareLoadBalancer internalBalancer =
+      (CacheAwareLoadBalancer) loadBalancer.getInternalBalancer();
+    internalBalancer.loadConf(conf);
+
     ServerName server0 = servers.get(0);
     ServerName server1 = servers.get(1);
     ServerName server3 = servers.get(2);
@@ -299,12 +269,15 @@ public class TestCacheAwareLoadBalancer extends 
BalancerTestBase {
     Pair<ServerName, Float> regionRatio = new Pair<>();
     regionRatio.setFirst(server3);
     regionRatio.setSecond(1.0f);
-    balancer.regionCacheRatioOnOldServerMap.put("region1", regionRatio);
+    internalBalancer = (CacheAwareLoadBalancer) 
loadBalancer.getInternalBalancer();
+    internalBalancer.regionCacheRatioOnOldServerMap.put("region1", 
regionRatio);
     RegionInfo mockedInfo = mock(RegionInfo.class);
     when(mockedInfo.getEncodedName()).thenReturn("region1");
     RegionPlan plan = new RegionPlan(mockedInfo, server1, server0);
     long startTime = EnvironmentEdgeManager.currentTime();
-    balancer.throttle(plan);
+    synchronized (loadBalancer) {
+      loadBalancer.throttle(plan);
+    }
     long endTime = EnvironmentEdgeManager.currentTime();
     assertTrue((endTime - startTime) >= 100);
   }
@@ -312,11 +285,17 @@ public class TestCacheAwareLoadBalancer extends 
BalancerTestBase {
   @Test
   public void testThrottlingCacheRatioUnknownForRegion() throws Exception {
     Configuration conf = HBaseConfiguration.create();
+    conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, 
"prefetch_file_list");
     conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 100);
-    CacheAwareLoadBalancer balancer = new CacheAwareLoadBalancer();
-    balancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf));
-    balancer.loadConf(conf);
-    balancer.initialize();
+    conf.set("hbase.rsgroup.grouploadbalancer.class",
+      CacheAwareLoadBalancer.class.getCanonicalName());
+    RSGroupBasedLoadBalancer loadBalancer = new RSGroupBasedLoadBalancer();
+    loadBalancer.setMasterServices(getMockedMaster());
+    loadBalancer.initialize();
+    CacheAwareLoadBalancer internalBalancer =
+      (CacheAwareLoadBalancer) loadBalancer.getInternalBalancer();
+    internalBalancer.loadConf(conf);
+
     ServerName server0 = servers.get(0);
     ServerName server1 = servers.get(1);
     ServerName server3 = servers.get(2);
@@ -325,7 +304,9 @@ public class TestCacheAwareLoadBalancer extends 
BalancerTestBase {
     when(mockedInfo.getEncodedName()).thenReturn("region1");
     RegionPlan plan = new RegionPlan(mockedInfo, server1, server0);
     long startTime = EnvironmentEdgeManager.currentTime();
-    balancer.throttle(plan);
+    synchronized (loadBalancer) {
+      loadBalancer.throttle(plan);
+    }
     long endTime = EnvironmentEdgeManager.currentTime();
     assertTrue((endTime - startTime) >= 100);
   }
@@ -387,7 +368,6 @@ public class TestCacheAwareLoadBalancer extends 
BalancerTestBase {
         assertFalse(oldCachedRegions1.contains(plan.getRegionInfo()));
       }
     }
-
   }
 
   @Test
@@ -551,22 +531,38 @@ public class TestCacheAwareLoadBalancer extends 
BalancerTestBase {
     assertTrue(targetServers.get(server1).containsAll(oldCachedRegions));
   }
 
-  @Test
-  public void testBalancerNotThrowNPEWhenBalancerPlansIsNull() throws 
Exception {
+  @Test(timeout = 60000)
+  public void testConfigUpdateDuringBalance() throws Exception {
+    float expectedOldRatioThreshold = 0.8f;
+    float expectedNewRatioThreshold = 0.95f;
+    long throttlingTimeMillis = 10000;
+
+    conf = HBaseConfiguration.create();
+    conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, throttlingTimeMillis);
+    conf.setFloat(CacheAwareLoadBalancer.CACHE_RATIO_THRESHOLD, 
expectedOldRatioThreshold);
+    conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, 
"prefetch_file_list");
+    conf.set("hbase.rsgroup.grouploadbalancer.class",
+      CacheAwareLoadBalancer.class.getCanonicalName());
+
+    RSGroupBasedLoadBalancer balancer = new RSGroupBasedLoadBalancer();
+    balancer.setMasterServices(getMockedMaster());
+    balancer.initialize();
+
     Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
     ServerName server0 = servers.get(0);
     ServerName server1 = servers.get(1);
     ServerName server2 = servers.get(2);
 
-    List<RegionInfo> regionsOnServer0 = randomRegions(5);
-    List<RegionInfo> regionsOnServer1 = randomRegions(5);
-    List<RegionInfo> regionsOnServer2 = randomRegions(5);
+    // Setup cluster: all 3 regions on server0 (unbalanced)
+    List<RegionInfo> regionsOnServer0 = randomRegions(3);
+    List<RegionInfo> regionsOnServer1 = randomRegions(0);
+    List<RegionInfo> regionsOnServer2 = randomRegions(0);
 
     clusterState.put(server0, regionsOnServer0);
     clusterState.put(server1, regionsOnServer1);
     clusterState.put(server2, regionsOnServer2);
 
-    // Mock cluster metrics
+    // Mock metrics: NO cache info for any region = all will be throttled
     Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
     serverMetricsMap.put(server0, 
mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
       0.0f, new ArrayList<>(), 0, 10));
@@ -577,15 +573,99 @@ public class TestCacheAwareLoadBalancer extends 
BalancerTestBase {
 
     ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
     when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
-    loadBalancer.updateClusterMetrics(clusterMetrics);
+    balancer.updateClusterMetrics(clusterMetrics);
 
-    Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
+    final Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable =
       (Map) mockClusterServersWithTables(clusterState);
+
+    // Verify initial configuration is set correctly
+    CacheAwareLoadBalancer internalBalancer =
+      (CacheAwareLoadBalancer) balancer.getInternalBalancer();
+    assertEquals(expectedOldRatioThreshold, internalBalancer.ratioThreshold, 
0.001f);
+
+    CountDownLatch balanceStarted = new CountDownLatch(1);
+    CountDownLatch configUpdateInitiated = new CountDownLatch(1);
+    long[] configUpdateDuration = new long[1];
+    long[] balanceDuration = new long[1];
+
+    // Actual old ratio threshold used during balance
+    float[] actualOldRatioThresholdDuringBalance = new float[1];
+
+    ExecutorService executor = Executors.newFixedThreadPool(2);
+
     try {
-      List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
-      assertNull(plans);
-    } catch (NullPointerException npe) {
-      fail("NPE should not be thrown");
+      // Thread 1 Simulate similar flow to HMaster.balance() which holds 
synchronized(balancer) for
+      // the duration of balance
+      Future<Long> balanceFuture = executor.submit(() -> {
+        try {
+          long start = EnvironmentEdgeManager.currentTime();
+          synchronized (balancer) {
+            try {
+              // Simulate beginning of HMaster.balance() mark balancing window 
open
+              balancer.onBalancingStart();
+              balanceStarted.countDown();
+
+              List<RegionPlan> plans = balancer.balanceCluster(loadOfAllTable);
+              if (plans != null) {
+                for (RegionPlan plan : plans) {
+                  balancer.throttle(plan);
+                }
+              }
+              // Wait until config update is initiated while balance is still 
in progress
+              configUpdateInitiated.await();
+
+              // Old config should still be visible during current balance run
+              CacheAwareLoadBalancer currentInternal =
+                (CacheAwareLoadBalancer) balancer.getInternalBalancer();
+              actualOldRatioThresholdDuringBalance[0] = 
currentInternal.ratioThreshold;
+
+            } finally {
+              balancer.onBalancingComplete();
+            }
+          }
+          return EnvironmentEdgeManager.currentTime() - start;
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      });
+
+      // Thread 2: Simulate update_all_config / onConfigurationChange
+      Future<Long> configUpdateFuture = executor.submit(() -> {
+        try {
+          long startTime = System.currentTimeMillis();
+          // Wait for balance to start
+          balanceStarted.await();
+
+          // Call onConfigurationChange - should NOT hang
+          Configuration newConf = new Configuration(conf);
+          newConf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, 
"prefetch_file_list");
+          newConf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 10000);
+          newConf.setFloat(CacheAwareLoadBalancer.CACHE_RATIO_THRESHOLD, 
expectedNewRatioThreshold);
+          balancer.onConfigurationChange(newConf);
+          configUpdateInitiated.countDown();
+
+          return System.currentTimeMillis() - startTime;
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      });
+
+      // Wait for both threads to complete
+      configUpdateDuration[0] = configUpdateFuture.get();
+      balanceDuration[0] = balanceFuture.get();
+
+      // Verify that config update didn't hang/timeout waiting for balance
+      assertTrue(configUpdateDuration[0] < balanceDuration[0]);
+
+      // Verify that ratio threshold used during balance is stll the old
+      assertEquals(expectedOldRatioThreshold, 
actualOldRatioThresholdDuringBalance[0], 0.001f);
+
+      // Verify that config updated successfully after balance completed
+      internalBalancer = (CacheAwareLoadBalancer) 
balancer.getInternalBalancer();
+      assertEquals(expectedNewRatioThreshold, internalBalancer.ratioThreshold, 
0.001f);
+
+    } finally {
+      executor.shutdownNow();
     }
   }
 }

Reply via email to