This is an automated email from the ASF dual-hosted git repository.

rmattingly pushed a commit to branch HBASE-28513-branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit f006a74a66f462b6b8b11d4d8bb049ac062e9e01
Author: Ray Mattingly <rmattin...@apache.org>
AuthorDate: Mon Feb 24 14:29:06 2025 -0500

    HBASE-28513 The StochasticLoadBalancer should support discrete evaluations 
(#6651)
    
    Co-authored-by: Ray Mattingly <rmattin...@hubspot.com>
    Signed-off-by: Nick Dimiduk <ndimi...@apache.org>
---
 .../hbase/master/balancer/AssignRegionAction.java  |  10 +
 .../hbase/master/balancer/BalanceAction.java       |  36 ++-
 .../master/balancer/BalancerClusterState.java      |  91 +++++++-
 .../master/balancer/BalancerConditionals.java      | 212 +++++++++++++++++
 .../hbase/master/balancer/BaseLoadBalancer.java    |  12 +-
 .../master/balancer/CacheAwareLoadBalancer.java    |   2 +-
 .../hbase/master/balancer/CandidateGenerator.java  |   2 +
 .../hadoop/hbase/master/balancer/CostFunction.java |   7 +
 .../DistributeReplicasCandidateGenerator.java      | 115 ++++++++++
 .../balancer/DistributeReplicasConditional.java    |  97 ++++++++
 .../master/balancer/FavoredStochasticBalancer.java |   4 +-
 .../hbase/master/balancer/MoveBatchAction.java     |  77 +++++++
 .../hbase/master/balancer/MoveRegionAction.java    |  10 +
 .../master/balancer/RegionPlanConditional.java     | 133 +++++++++++
 .../RegionPlanConditionalCandidateGenerator.java   | 113 ++++++++++
 .../balancer/SlopFixingCandidateGenerator.java     | 105 +++++++++
 .../master/balancer/StochasticLoadBalancer.java    | 135 ++++++++---
 .../hbase/master/balancer/SwapRegionsAction.java   |  13 ++
 .../hbase/master/balancer/replicas/ReplicaKey.java |  54 +++++
 .../master/balancer/replicas/ReplicaKeyCache.java  |  93 ++++++++
 .../balancer/BalancerConditionalsTestUtil.java     | 221 ++++++++++++++++++
 .../balancer/CandidateGeneratorTestUtil.java       | 250 +++++++++++++++++++++
 .../DistributeReplicasTestConditional.java}        |  41 +---
 .../LoadOnlyFavoredStochasticBalancer.java         |   3 +-
 .../master/balancer/TestBalancerConditionals.java  |  83 +++++++
 ...terBalancingConditionalReplicaDistribution.java | 113 ++++++++++
 ...TestReplicaDistributionBalancerConditional.java | 120 ++++++++++
 ...estStochasticLoadBalancerHeterogeneousCost.java |   2 +-
 28 files changed, 2083 insertions(+), 71 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/AssignRegionAction.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/AssignRegionAction.java
index c99ae092d77..8a79b64142e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/AssignRegionAction.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/AssignRegionAction.java
@@ -17,9 +17,13 @@
  */
 package org.apache.hadoop.hbase.master.balancer;
 
+import java.util.List;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.master.RegionPlan;
 import org.apache.yetus.audience.InterfaceAudience;
 
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+
 @InterfaceAudience.Private
 class AssignRegionAction extends BalanceAction {
   private final int region;
@@ -46,6 +50,12 @@ class AssignRegionAction extends BalanceAction {
     throw new UnsupportedOperationException(HConstants.NOT_IMPLEMENTED);
   }
 
+  @Override
+  List<RegionPlan> toRegionPlans(BalancerClusterState cluster) {
+    return ImmutableList
+      .of(new RegionPlan(cluster.regions[getRegion()], null, 
cluster.servers[getServer()]));
+  }
+
   @Override
   public String toString() {
     return getType() + ": " + region + ":" + server;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalanceAction.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalanceAction.java
index 56b473ae710..a65b5253907 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalanceAction.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalanceAction.java
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hbase.master.balancer;
 
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.hbase.master.RegionPlan;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -28,11 +31,11 @@ abstract class BalanceAction {
     ASSIGN_REGION,
     MOVE_REGION,
     SWAP_REGIONS,
+    MOVE_BATCH,
     NULL,
   }
 
-  static final BalanceAction NULL_ACTION = new BalanceAction(Type.NULL) {
-  };
+  static final BalanceAction NULL_ACTION = new NullBalanceAction();
 
   private final Type type;
 
@@ -43,16 +46,39 @@ abstract class BalanceAction {
   /**
    * Returns an Action which would undo this action
    */
-  BalanceAction undoAction() {
-    return this;
-  }
+  abstract BalanceAction undoAction();
+
+  /**
+   * Returns the Action represented as RegionPlans
+   */
+  abstract List<RegionPlan> toRegionPlans(BalancerClusterState cluster);
 
   Type getType() {
     return type;
   }
 
+  long getStepCount() {
+    return 1;
+  }
+
   @Override
   public String toString() {
     return type + ":";
   }
+
+  private static final class NullBalanceAction extends BalanceAction {
+    private NullBalanceAction() {
+      super(Type.NULL);
+    }
+
+    @Override
+    BalanceAction undoAction() {
+      return this;
+    }
+
+    @Override
+    List<RegionPlan> toRegionPlans(BalancerClusterState cluster) {
+      return Collections.emptyList();
+    }
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java
index b857055fb3a..67755fc317c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java
@@ -26,6 +26,9 @@ import java.util.Deque;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 import org.agrona.collections.Hashing;
 import org.agrona.collections.Int2IntCounterMap;
 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
@@ -39,6 +42,8 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.base.Suppliers;
+
 /**
  * An efficient array based implementation similar to ClusterState for keeping 
the status of the
  * cluster in terms of region assignment and distribution. LoadBalancers, such 
as
@@ -123,6 +128,15 @@ class BalancerClusterState {
   // Maps regionName -> oldServerName -> cache ratio of the region on the old 
server
   Map<String, Pair<ServerName, Float>> regionCacheRatioOnOldServerMap;
 
+  private final Supplier<List<Integer>> shuffledServerIndicesSupplier =
+    Suppliers.memoizeWithExpiration(() -> {
+      Collection<Integer> serverIndices = serversToIndex.values();
+      List<Integer> shuffledServerIndices = new ArrayList<>(serverIndices);
+      Collections.shuffle(shuffledServerIndices);
+      return shuffledServerIndices;
+    }, 5, TimeUnit.SECONDS);
+  private long stopRequestedAt = Long.MAX_VALUE;
+
   static class DefaultRackManager extends RackManager {
     @Override
     public String getRack(ServerName server) {
@@ -728,8 +742,25 @@ class BalancerClusterState {
         regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer());
         regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer());
         break;
+      case MOVE_BATCH:
+        assert action instanceof MoveBatchAction : action.getClass();
+        MoveBatchAction mba = (MoveBatchAction) action;
+        for (int serverIndex : mba.getServerToRegionsToRemove().keySet()) {
+          Set<Integer> regionsToRemove = 
mba.getServerToRegionsToRemove().get(serverIndex);
+          regionsPerServer[serverIndex] =
+            removeRegions(regionsPerServer[serverIndex], regionsToRemove);
+        }
+        for (int serverIndex : mba.getServerToRegionsToAdd().keySet()) {
+          Set<Integer> regionsToAdd = 
mba.getServerToRegionsToAdd().get(serverIndex);
+          regionsPerServer[serverIndex] = 
addRegions(regionsPerServer[serverIndex], regionsToAdd);
+        }
+        for (MoveRegionAction moveRegionAction : mba.getMoveActions()) {
+          regionMoved(moveRegionAction.getRegion(), 
moveRegionAction.getFromServer(),
+            moveRegionAction.getToServer());
+        }
+        break;
       default:
-        throw new RuntimeException("Uknown action:" + action.getType());
+        throw new RuntimeException("Unknown action:" + action.getType());
     }
   }
 
@@ -891,6 +922,52 @@ class BalancerClusterState {
     return newRegions;
   }
 
+  int[] removeRegions(int[] regions, Set<Integer> regionIndicesToRemove) {
+    // Calculate the size of the new regions array
+    int newSize = regions.length - regionIndicesToRemove.size();
+    if (newSize < 0) {
+      throw new IllegalStateException(
+        "Region indices mismatch: more regions to remove than in the regions 
array");
+    }
+
+    int[] newRegions = new int[newSize];
+    int newIndex = 0;
+
+    // Copy only the regions not in the removal set
+    for (int region : regions) {
+      if (!regionIndicesToRemove.contains(region)) {
+        newRegions[newIndex++] = region;
+      }
+    }
+
+    // If the newIndex is smaller than newSize, some regions were missing from 
the input array
+    if (newIndex != newSize) {
+      throw new IllegalStateException("Region indices mismatch: some regions 
in the removal "
+        + "set were not found in the regions array");
+    }
+
+    return newRegions;
+  }
+
+  int[] addRegions(int[] regions, Set<Integer> regionIndicesToAdd) {
+    int[] newRegions = new int[regions.length + regionIndicesToAdd.size()];
+
+    // Copy the existing regions to the new array
+    System.arraycopy(regions, 0, newRegions, 0, regions.length);
+
+    // Add the new regions at the end of the array
+    int newIndex = regions.length;
+    for (int regionIndex : regionIndicesToAdd) {
+      newRegions[newIndex++] = regionIndex;
+    }
+
+    return newRegions;
+  }
+
+  List<Integer> getShuffledServerIndices() {
+    return shuffledServerIndicesSupplier.get();
+  }
+
   int[] addRegionSorted(int[] regions, int regionIndex) {
     int[] newRegions = new int[regions.length + 1];
     int i = 0;
@@ -990,6 +1067,18 @@ class BalancerClusterState {
     this.numMovedRegions = numMovedRegions;
   }
 
+  public int getMaxReplicas() {
+    return maxReplicas;
+  }
+
+  void setStopRequestedAt(long stopRequestedAt) {
+    this.stopRequestedAt = stopRequestedAt;
+  }
+
+  long getStopRequestedAt() {
+    return stopRequestedAt;
+  }
+
   @Override
   public String toString() {
     StringBuilder desc = new StringBuilder("Cluster={servers=[");
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerConditionals.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerConditionals.java
new file mode 100644
index 00000000000..6ad09519e82
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerConditionals.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import java.lang.reflect.Constructor;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.master.RegionPlan;
+import org.apache.hadoop.hbase.master.balancer.replicas.ReplicaKeyCache;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
+
+/**
+ * Balancer conditionals supplement cost functions in the {@link 
StochasticLoadBalancer}. Cost
+ * functions are insufficient and difficult to work with when making discrete 
decisions; this is
+ * because they operate on a continuous scale, and each cost function's 
multiplier affects the
+ * relative importance of every other cost function. So it is difficult to 
meaningfully and clearly
+ * value many aspects of your region distribution via cost functions alone. 
Conditionals allow you
+ * to very clearly define discrete rules that your balancer would ideally 
follow. To clarify, a
+ * conditional violation will not block a region assignment because we would 
prefer to have uptime
+ * than have perfectly intentional balance. But conditionals allow you to, for 
example, define that
+ * a region's primary and secondary should not live on the same rack. Another 
example, conditionals
+ * make it easy to define that system tables will ideally be isolated on their 
own RegionServer
+ * (without needing to manage distinct RegionServer groups).
+ */
+@InterfaceAudience.Private
+final class BalancerConditionals implements Configurable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BalancerConditionals.class);
+
+  public static final String DISTRIBUTE_REPLICAS_KEY =
+    "hbase.master.balancer.stochastic.conditionals.distributeReplicas";
+  public static final boolean DISTRIBUTE_REPLICAS_DEFAULT = false;
+
+  public static final String ADDITIONAL_CONDITIONALS_KEY =
+    "hbase.master.balancer.stochastic.additionalConditionals";
+
+  private Set<Class<? extends RegionPlanConditional>> conditionalClasses = 
Collections.emptySet();
+  private Set<RegionPlanConditional> conditionals = Collections.emptySet();
+  private Configuration conf;
+
+  static BalancerConditionals create() {
+    return new BalancerConditionals();
+  }
+
+  private BalancerConditionals() {
+  }
+
+  boolean shouldRunBalancer(BalancerClusterState cluster) {
+    return isConditionalBalancingEnabled() && conditionals.stream()
+      
.map(RegionPlanConditional::getCandidateGenerators).flatMap(Collection::stream)
+      .map(generator -> generator.getWeight(cluster)).anyMatch(weight -> 
weight > 0);
+  }
+
+  Set<Class<? extends RegionPlanConditional>> getConditionalClasses() {
+    return Set.copyOf(conditionalClasses);
+  }
+
+  Collection<RegionPlanConditional> getConditionals() {
+    return conditionals;
+  }
+
+  boolean isReplicaDistributionEnabled() {
+    return conditionalClasses.stream()
+      .anyMatch(DistributeReplicasConditional.class::isAssignableFrom);
+  }
+
+  boolean shouldSkipSloppyServerEvaluation() {
+    return isConditionalBalancingEnabled();
+  }
+
+  boolean isConditionalBalancingEnabled() {
+    return !conditionalClasses.isEmpty();
+  }
+
+  void clearConditionalWeightCaches() {
+    conditionals.stream().map(RegionPlanConditional::getCandidateGenerators)
+      .flatMap(Collection::stream)
+      .forEach(RegionPlanConditionalCandidateGenerator::clearWeightCache);
+  }
+
+  void loadClusterState(BalancerClusterState cluster) {
+    conditionals = conditionalClasses.stream().map(clazz -> 
createConditional(clazz, cluster))
+      .filter(Objects::nonNull).collect(Collectors.toSet());
+  }
+
+  /**
+   * Indicates whether the action is good for our conditional compliance.
+   * @param cluster The cluster state
+   * @param action  The proposed action
+   * @return -1 if conditionals improve, 0 if neutral, 1 if conditionals 
degrade
+   */
+  int getViolationCountChange(BalancerClusterState cluster, BalanceAction 
action) {
+    // Cluster is in pre-move state, so figure out the proposed violations
+    boolean isViolatingPost = isViolating(cluster, action);
+    cluster.doAction(action);
+
+    // Cluster is in post-move state, so figure out the original violations
+    BalanceAction undoAction = action.undoAction();
+    boolean isViolatingPre = isViolating(cluster, undoAction);
+
+    // Reset cluster
+    cluster.doAction(undoAction);
+
+    if (isViolatingPre && isViolatingPost) {
+      return 0;
+    } else if (!isViolatingPre && isViolatingPost) {
+      return 1;
+    } else {
+      return -1;
+    }
+  }
+
+  /**
+   * Check if the proposed action violates conditionals
+   * @param cluster The cluster state
+   * @param action  The proposed action
+   */
+  boolean isViolating(BalancerClusterState cluster, BalanceAction action) {
+    conditionals.forEach(conditional -> conditional.setClusterState(cluster));
+    if (conditionals.isEmpty()) {
+      return false;
+    }
+    List<RegionPlan> regionPlans = action.toRegionPlans(cluster);
+    for (RegionPlan regionPlan : regionPlans) {
+      if (isViolating(regionPlan)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean isViolating(RegionPlan regionPlan) {
+    for (RegionPlanConditional conditional : conditionals) {
+      if (conditional.isViolating(regionPlan)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private RegionPlanConditional createConditional(Class<? extends 
RegionPlanConditional> clazz,
+    BalancerClusterState cluster) {
+    if (cluster == null) {
+      cluster = new BalancerClusterState(Collections.emptyMap(), null, null, 
null, null);
+    }
+    try {
+      Constructor<? extends RegionPlanConditional> ctor =
+        clazz.getDeclaredConstructor(BalancerConditionals.class, 
BalancerClusterState.class);
+      return ReflectionUtils.instantiate(clazz.getName(), ctor, this, cluster);
+    } catch (NoSuchMethodException e) {
+      LOG.warn("Cannot find constructor with Configuration and "
+        + "BalancerClusterState parameters for class '{}': {}", 
clazz.getName(), e.getMessage());
+    }
+    return null;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+    ImmutableSet.Builder<Class<? extends RegionPlanConditional>> 
conditionalClasses =
+      ImmutableSet.builder();
+
+    boolean distributeReplicas =
+      conf.getBoolean(DISTRIBUTE_REPLICAS_KEY, DISTRIBUTE_REPLICAS_DEFAULT);
+    if (distributeReplicas) {
+      conditionalClasses.add(DistributeReplicasConditional.class);
+    }
+
+    Class<?>[] classes = conf.getClasses(ADDITIONAL_CONDITIONALS_KEY);
+    for (Class<?> clazz : classes) {
+      if (!RegionPlanConditional.class.isAssignableFrom(clazz)) {
+        LOG.warn("Class {} is not a RegionPlanConditional", clazz.getName());
+        continue;
+      }
+      conditionalClasses.add(clazz.asSubclass(RegionPlanConditional.class));
+    }
+    this.conditionalClasses = conditionalClasses.build();
+    ReplicaKeyCache.getInstance().setConf(conf);
+    loadClusterState(null);
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
index 07cd5892086..fac0d82fe01 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
@@ -77,6 +77,9 @@ public abstract class BaseLoadBalancer implements 
LoadBalancer {
 
   public static final boolean DEFAULT_HBASE_MASTER_LOADBALANCE_BYTABLE = false;
 
+  public static final String REGIONS_SLOP_KEY = "hbase.regions.slop";
+  public static final float REGIONS_SLOP_DEFAULT = 0.2f;
+
   protected static final int MIN_SERVER_BALANCE = 2;
   private volatile boolean stopped = false;
 
@@ -256,7 +259,9 @@ public abstract class BaseLoadBalancer implements 
LoadBalancer {
     float average = cs.getLoadAverage(); // for logging
     int floor = (int) Math.floor(average * (1 - slop));
     int ceiling = (int) Math.ceil(average * (1 + slop));
-    if (!(cs.getMaxLoad() > ceiling || cs.getMinLoad() < floor)) {
+    int maxLoad = cs.getMaxLoad();
+    int minLoad = cs.getMinLoad();
+    if (!(maxLoad > ceiling || minLoad < floor)) {
       NavigableMap<ServerAndLoad, List<RegionInfo>> serversByLoad = 
cs.getServersByLoad();
       if (LOG.isTraceEnabled()) {
         // If nothing to balance, then don't say anything unless trace-level 
logging.
@@ -549,7 +554,7 @@ public abstract class BaseLoadBalancer implements 
LoadBalancer {
   }
 
   protected float getDefaultSlop() {
-    return 0.2f;
+    return REGIONS_SLOP_DEFAULT;
   }
 
   private RegionLocationFinder createRegionLocationFinder(Configuration conf) {
@@ -560,9 +565,8 @@ public abstract class BaseLoadBalancer implements 
LoadBalancer {
   }
 
   protected void loadConf(Configuration conf) {
-    this.slop = conf.getFloat("hbase.regions.slop", getDefaultSlop());
+    this.slop = conf.getFloat(REGIONS_SLOP_KEY, getDefaultSlop());
     this.rackManager = new RackManager(getConf());
-    this.onlySystemTablesOnMaster = 
LoadBalancer.isSystemTablesOnlyOnMaster(conf);
     useRegionFinder = conf.getBoolean("hbase.master.balancer.uselocality", 
true);
     if (useRegionFinder) {
       regionFinder = createRegionLocationFinder(conf);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java
index 385ed564c7d..d651cd49d1d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java
@@ -69,7 +69,7 @@ public class CacheAwareLoadBalancer extends 
StochasticLoadBalancer {
 
   @Override
   protected Map<Class<? extends CandidateGenerator>, CandidateGenerator>
-    createCandidateGenerators() {
+    createCandidateGenerators(Configuration conf) {
     Map<Class<? extends CandidateGenerator>, CandidateGenerator> 
candidateGenerators =
       new HashMap<>(2);
     candidateGenerators.put(CacheAwareSkewnessCandidateGenerator.class,
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CandidateGenerator.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CandidateGenerator.java
index d9245495e20..642e8162fff 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CandidateGenerator.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CandidateGenerator.java
@@ -28,6 +28,8 @@ import org.apache.yetus.audience.InterfaceAudience;
 @InterfaceAudience.Private
 abstract class CandidateGenerator {
 
+  protected static final double MAX_WEIGHT = 1.0;
+
   abstract BalanceAction generate(BalancerClusterState cluster);
 
   /**
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CostFunction.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CostFunction.java
index 1dcd4580b1a..ee2fc2b6a5e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CostFunction.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CostFunction.java
@@ -76,6 +76,13 @@ abstract class CostFunction {
         regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer());
         regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer());
         break;
+      case MOVE_BATCH:
+        MoveBatchAction mba = (MoveBatchAction) action;
+        for (MoveRegionAction moveRegionAction : mba.getMoveActions()) {
+          regionMoved(moveRegionAction.getRegion(), 
moveRegionAction.getFromServer(),
+            moveRegionAction.getToServer());
+        }
+        break;
       default:
         throw new RuntimeException("Uknown action:" + action.getType());
     }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/DistributeReplicasCandidateGenerator.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/DistributeReplicasCandidateGenerator.java
new file mode 100644
index 00000000000..38fbcc4a0fb
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/DistributeReplicasCandidateGenerator.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import static 
org.apache.hadoop.hbase.master.balancer.DistributeReplicasConditional.getReplicaKey;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.hbase.master.balancer.replicas.ReplicaKey;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * CandidateGenerator to distribute colocated replicas across different 
servers.
+ */
+@InterfaceAudience.Private
+final class DistributeReplicasCandidateGenerator extends 
RegionPlanConditionalCandidateGenerator {
+
+  private static final Logger LOG =
+    LoggerFactory.getLogger(DistributeReplicasCandidateGenerator.class);
+  private static final int BATCH_SIZE = 100_000;
+
+  DistributeReplicasCandidateGenerator(BalancerConditionals 
balancerConditionals) {
+    super(balancerConditionals);
+  }
+
+  @Override
+  BalanceAction generateCandidate(BalancerClusterState cluster, boolean 
isWeighing) {
+    return generateCandidate(cluster, isWeighing, false);
+  }
+
+  BalanceAction generateCandidate(BalancerClusterState cluster, boolean 
isWeighing,
+    boolean isForced) {
+    if (cluster.getMaxReplicas() < cluster.numRacks) {
+      LOG.trace("Skipping replica distribution as there are not enough racks 
to distribute them.");
+      return BalanceAction.NULL_ACTION;
+    }
+
+    // Iterate through shuffled servers to find colocated replicas
+    boolean foundColocatedReplicas = false;
+    List<MoveRegionAction> moveRegionActions = new ArrayList<>();
+    List<Integer> shuffledServerIndices = cluster.getShuffledServerIndices();
+    for (int sourceIndex : shuffledServerIndices) {
+      if (
+        moveRegionActions.size() >= BATCH_SIZE
+          || EnvironmentEdgeManager.currentTime() > 
cluster.getStopRequestedAt()
+      ) {
+        break;
+      }
+      int[] serverRegions = cluster.regionsPerServer[sourceIndex];
+      Set<ReplicaKey> replicaKeys = new HashSet<>(serverRegions.length);
+      for (int regionIndex : serverRegions) {
+        ReplicaKey replicaKey = getReplicaKey(cluster.regions[regionIndex]);
+        if (replicaKeys.contains(replicaKey)) {
+          foundColocatedReplicas = true;
+          if (isWeighing) {
+            // If weighing, fast exit with an actionable move
+            return getAction(sourceIndex, regionIndex, 
pickOtherRandomServer(cluster, sourceIndex),
+              -1);
+          }
+          // If not weighing, pick a good move
+          for (int i = 0; i < cluster.numServers; i++) {
+            // Randomize destination ordering so we aren't overloading one 
destination
+            int destinationIndex = pickOtherRandomServer(cluster, sourceIndex);
+            if (destinationIndex == sourceIndex) {
+              continue;
+            }
+            MoveRegionAction possibleAction =
+              new MoveRegionAction(regionIndex, sourceIndex, destinationIndex);
+            if (isForced) {
+              return possibleAction;
+            }
+            if (willBeAccepted(cluster, possibleAction)) {
+              cluster.doAction(possibleAction); // Update cluster state to 
reflect move
+              moveRegionActions.add(possibleAction);
+              break;
+            }
+          }
+        } else {
+          replicaKeys.add(replicaKey);
+        }
+      }
+    }
+
+    if (!moveRegionActions.isEmpty()) {
+      return batchMovesAndResetClusterState(cluster, moveRegionActions);
+    }
+    // If no colocated replicas are found, return NULL_ACTION
+    if (foundColocatedReplicas) {
+      LOG.warn("Could not find a place to put a colocated replica! We will 
force a move.");
+      return generateCandidate(cluster, isWeighing, true);
+    }
+    LOG.trace("No colocated replicas found. No balancing action required.");
+    return BalanceAction.NULL_ACTION;
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/DistributeReplicasConditional.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/DistributeReplicasConditional.java
new file mode 100644
index 00000000000..2cd27615e5f
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/DistributeReplicasConditional.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.master.RegionPlan;
+import org.apache.hadoop.hbase.master.balancer.replicas.ReplicaKey;
+import org.apache.hadoop.hbase.master.balancer.replicas.ReplicaKeyCache;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+
+/**
+ * If enabled, this class will help the balancer ensure that replicas aren't 
placed on the same
+ * servers or racks as their primary. Configure this via
+ * {@link BalancerConditionals#DISTRIBUTE_REPLICAS_KEY}
+ */
+@InterfaceAudience.Private
+public class DistributeReplicasConditional extends RegionPlanConditional {
+
+  private final List<RegionPlanConditionalCandidateGenerator> 
candidateGenerators;
+
+  public DistributeReplicasConditional(BalancerConditionals 
balancerConditionals,
+    BalancerClusterState cluster) {
+    super(balancerConditionals.getConf(), cluster);
+    Configuration conf = balancerConditionals.getConf();
+    float slop =
+      conf.getFloat(BaseLoadBalancer.REGIONS_SLOP_KEY, 
BaseLoadBalancer.REGIONS_SLOP_DEFAULT);
+    this.candidateGenerators =
+      ImmutableList.of(new 
DistributeReplicasCandidateGenerator(balancerConditionals),
+        new SlopFixingCandidateGenerator(balancerConditionals, slop));
+  }
+
+  @Override
+  public ValidationLevel getValidationLevel() {
+    return ValidationLevel.SERVER_HOST_RACK;
+  }
+
+  @Override
+  List<RegionPlanConditionalCandidateGenerator> getCandidateGenerators() {
+    return candidateGenerators;
+  }
+
+  @Override
+  boolean isViolatingServer(RegionPlan regionPlan, Set<RegionInfo> 
serverRegions) {
+    return checkViolation(regionPlan.getRegionInfo(), 
getReplicaKey(regionPlan.getRegionInfo()),
+      serverRegions);
+  }
+
+  @Override
+  boolean isViolatingHost(RegionPlan regionPlan, Set<RegionInfo> hostRegions) {
+    return checkViolation(regionPlan.getRegionInfo(), 
getReplicaKey(regionPlan.getRegionInfo()),
+      hostRegions);
+  }
+
+  @Override
+  boolean isViolatingRack(RegionPlan regionPlan, Set<RegionInfo> rackRegions) {
+    return checkViolation(regionPlan.getRegionInfo(), 
getReplicaKey(regionPlan.getRegionInfo()),
+      rackRegions);
+  }
+
+  private boolean checkViolation(RegionInfo movingRegion, ReplicaKey 
movingReplicaKey,
+    Set<RegionInfo> destinationRegions) {
+    for (RegionInfo regionInfo : destinationRegions) {
+      if (regionInfo.equals(movingRegion)) {
+        continue;
+      }
+      if (getReplicaKey(regionInfo).equals(movingReplicaKey)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  static ReplicaKey getReplicaKey(RegionInfo regionInfo) {
+    return ReplicaKeyCache.getInstance().getReplicaKey(regionInfo);
+  }
+
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java
index db4c7c95b65..98ad3beac8d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java
@@ -81,7 +81,7 @@ public class FavoredStochasticBalancer extends 
StochasticLoadBalancer
 
   @Override
   protected Map<Class<? extends CandidateGenerator>, CandidateGenerator>
-    createCandidateGenerators() {
+    createCandidateGenerators(Configuration conf) {
     Map<Class<? extends CandidateGenerator>, CandidateGenerator> fnPickers = 
new HashMap<>(2);
     fnPickers.put(FavoredNodeLoadPicker.class, new FavoredNodeLoadPicker());
     fnPickers.put(FavoredNodeLocalityPicker.class, new 
FavoredNodeLocalityPicker());
@@ -90,7 +90,7 @@ public class FavoredStochasticBalancer extends 
StochasticLoadBalancer
 
   /** Returns any candidate generator in random */
   @Override
-  protected CandidateGenerator getRandomGenerator() {
+  protected CandidateGenerator getRandomGenerator(BalancerClusterState 
cluster) {
     Class<? extends CandidateGenerator> clazz = shuffledGeneratorClasses.get()
       .get(ThreadLocalRandom.current().nextInt(candidateGenerators.size()));
     return candidateGenerators.get(clazz);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/MoveBatchAction.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/MoveBatchAction.java
new file mode 100644
index 00000000000..e7ea3ed15e1
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/MoveBatchAction.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.hbase.master.RegionPlan;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
+import org.apache.hbase.thirdparty.com.google.common.collect.Multimaps;
+
+@InterfaceAudience.Private
+public class MoveBatchAction extends BalanceAction {
+  private final List<MoveRegionAction> moveActions;
+
+  MoveBatchAction(List<MoveRegionAction> moveActions) {
+    super(Type.MOVE_BATCH);
+    this.moveActions = moveActions;
+  }
+
+  @Override
+  BalanceAction undoAction() {
+    List<MoveRegionAction> undoMoves = new 
ArrayList<>(getMoveActions().size());
+    for (int i = getMoveActions().size() - 1; i >= 0; i--) {
+      MoveRegionAction move = getMoveActions().get(i);
+      undoMoves
+        .add(new MoveRegionAction(move.getRegion(), move.getToServer(), 
move.getFromServer()));
+    }
+    return new MoveBatchAction(undoMoves);
+  }
+
+  @Override
+  List<RegionPlan> toRegionPlans(BalancerClusterState cluster) {
+    List<RegionPlan> mbRegionPlans = new ArrayList<>(getMoveActions().size());
+    for (MoveRegionAction moveRegionAction : getMoveActions()) {
+      mbRegionPlans.add(new 
RegionPlan(cluster.regions[moveRegionAction.getRegion()],
+        cluster.servers[moveRegionAction.getFromServer()],
+        cluster.servers[moveRegionAction.getToServer()]));
+    }
+    return mbRegionPlans;
+  }
+
+  @Override
+  long getStepCount() {
+    return moveActions.size();
+  }
+
+  public HashMultimap<Integer, Integer> getServerToRegionsToRemove() {
+    return 
moveActions.stream().collect(Multimaps.toMultimap(MoveRegionAction::getFromServer,
+      MoveRegionAction::getRegion, HashMultimap::create));
+  }
+
+  public HashMultimap<Integer, Integer> getServerToRegionsToAdd() {
+    return 
moveActions.stream().collect(Multimaps.toMultimap(MoveRegionAction::getToServer,
+      MoveRegionAction::getRegion, HashMultimap::create));
+  }
+
+  List<MoveRegionAction> getMoveActions() {
+    return moveActions;
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/MoveRegionAction.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/MoveRegionAction.java
index 547c9c5b28e..9798e9cebe8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/MoveRegionAction.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/MoveRegionAction.java
@@ -17,8 +17,12 @@
  */
 package org.apache.hadoop.hbase.master.balancer;
 
+import java.util.List;
+import org.apache.hadoop.hbase.master.RegionPlan;
 import org.apache.yetus.audience.InterfaceAudience;
 
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+
 @InterfaceAudience.Private
 class MoveRegionAction extends BalanceAction {
   private final int region;
@@ -49,6 +53,12 @@ class MoveRegionAction extends BalanceAction {
     return new MoveRegionAction(region, toServer, fromServer);
   }
 
+  @Override
+  List<RegionPlan> toRegionPlans(BalancerClusterState cluster) {
+    return ImmutableList.of(new RegionPlan(cluster.regions[getRegion()],
+      cluster.servers[getFromServer()], cluster.servers[getToServer()]));
+  }
+
   @Override
   public String toString() {
     return getType() + ": " + region + ":" + fromServer + " -> " + toServer;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionPlanConditional.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionPlanConditional.java
new file mode 100644
index 00000000000..8de371d341c
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionPlanConditional.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.master.RegionPlan;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+@InterfaceStability.Evolving
+public abstract class RegionPlanConditional {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RegionPlanConditional.class);
+  private BalancerClusterState cluster;
+
+  RegionPlanConditional(Configuration conf, BalancerClusterState cluster) {
+    this.cluster = cluster;
+  }
+
+  public enum ValidationLevel {
+    /**
+     * Just check the server.
+     */
+    SERVER,
+    /**
+     * Check the server and the host.
+     */
+    SERVER_HOST,
+    /**
+     * Check the server, host, and rack.
+     */
+    SERVER_HOST_RACK
+  }
+
+  void setClusterState(BalancerClusterState cluster) {
+    this.cluster = cluster;
+  }
+
+  /**
+   * Returns a {@link ValidationLevel} that is appropriate for this 
conditional.
+   * @return the validation level
+   */
+  abstract ValidationLevel getValidationLevel();
+
+  /**
+   * Get the candidate generator(s) for this conditional. This can be useful 
to provide the balancer
+   * with hints that will appease your conditional. Your conditionals will be 
triggered in order.
+   * @return the candidate generator for this conditional
+   */
+  abstract List<RegionPlanConditionalCandidateGenerator> 
getCandidateGenerators();
+
+  /**
+   * Check if the conditional is violated by the given region plan.
+   * @param regionPlan the region plan to check
+   * @return true if the conditional is violated
+   */
+  boolean isViolating(RegionPlan regionPlan) {
+    if (regionPlan == null) {
+      return false;
+    }
+    int destinationServerIdx = 
cluster.serversToIndex.get(regionPlan.getDestination().getAddress());
+
+    // Check Server
+    int[] destinationRegionIndices = 
cluster.regionsPerServer[destinationServerIdx];
+    Set<RegionInfo> serverRegions = 
Arrays.stream(cluster.regionsPerServer[destinationServerIdx])
+      .mapToObj(idx -> cluster.regions[idx]).collect(Collectors.toSet());
+    for (int regionIdx : destinationRegionIndices) {
+      serverRegions.add(cluster.regions[regionIdx]);
+    }
+    if (isViolatingServer(regionPlan, serverRegions)) {
+      return true;
+    }
+
+    if (getValidationLevel() == ValidationLevel.SERVER) {
+      return false;
+    }
+
+    // Check Host
+    int hostIdx = cluster.serverIndexToHostIndex[destinationServerIdx];
+    Set<RegionInfo> hostRegions = 
Arrays.stream(cluster.regionsPerHost[hostIdx])
+      .mapToObj(idx -> cluster.regions[idx]).collect(Collectors.toSet());
+    if (isViolatingHost(regionPlan, hostRegions)) {
+      return true;
+    }
+
+    if (getValidationLevel() == ValidationLevel.SERVER_HOST) {
+      return false;
+    }
+
+    // Check Rack
+    int rackIdx = cluster.serverIndexToRackIndex[destinationServerIdx];
+    Set<RegionInfo> rackRegions = 
Arrays.stream(cluster.regionsPerRack[rackIdx])
+      .mapToObj(idx -> cluster.regions[idx]).collect(Collectors.toSet());
+    if (isViolatingRack(regionPlan, rackRegions)) {
+      return true;
+    }
+
+    return false;
+  }
+
+  abstract boolean isViolatingServer(RegionPlan regionPlan, Set<RegionInfo> 
destinationRegions);
+
+  boolean isViolatingHost(RegionPlan regionPlan, Set<RegionInfo> 
destinationRegions) {
+    return false;
+  }
+
+  boolean isViolatingRack(RegionPlan regionPlan, Set<RegionInfo> 
destinationRegions) {
+    return false;
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionPlanConditionalCandidateGenerator.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionPlanConditionalCandidateGenerator.java
new file mode 100644
index 00000000000..f8274841f72
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionPlanConditionalCandidateGenerator.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import java.time.Duration;
+import java.util.List;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class RegionPlanConditionalCandidateGenerator extends 
CandidateGenerator {
+
+  private static final Logger LOG =
+    LoggerFactory.getLogger(RegionPlanConditionalCandidateGenerator.class);
+
+  private static final Duration WEIGHT_CACHE_TTL = Duration.ofMinutes(1);
+  private long lastWeighedAt = -1;
+  private double lastWeight = 0.0;
+
+  private final BalancerConditionals balancerConditionals;
+
+  RegionPlanConditionalCandidateGenerator(BalancerConditionals 
balancerConditionals) {
+    this.balancerConditionals = balancerConditionals;
+  }
+
+  BalancerConditionals getBalancerConditionals() {
+    return this.balancerConditionals;
+  }
+
+  /**
+   * Generates a balancing action to appease the conditional.
+   * @param cluster    Current state of the cluster.
+   * @param isWeighing Flag indicating if the generator is being used for 
weighing.
+   * @return A BalanceAction, or NULL_ACTION if no action is needed.
+   */
+  abstract BalanceAction generateCandidate(BalancerClusterState cluster, 
boolean isWeighing);
+
+  @Override
+  BalanceAction generate(BalancerClusterState cluster) {
+    BalanceAction balanceAction = generateCandidate(cluster, false);
+    if (!willBeAccepted(cluster, balanceAction)) {
+      LOG.debug("Generated action is not widely accepted by all conditionals. "
+        + "Likely we are finding our way out of a deadlock. balanceAction={}", 
balanceAction);
+    }
+    return balanceAction;
+  }
+
+  MoveBatchAction batchMovesAndResetClusterState(BalancerClusterState cluster,
+    List<MoveRegionAction> moves) {
+    MoveBatchAction batchAction = new MoveBatchAction(moves);
+    undoBatchAction(cluster, batchAction);
+    return batchAction;
+  }
+
+  boolean willBeAccepted(BalancerClusterState cluster, BalanceAction action) {
+    BalancerConditionals balancerConditionals = getBalancerConditionals();
+    if (balancerConditionals == null) {
+      return true;
+    }
+    return !balancerConditionals.isViolating(cluster, action);
+  }
+
+  void undoBatchAction(BalancerClusterState cluster, MoveBatchAction 
batchAction) {
+    for (int i = batchAction.getMoveActions().size() - 1; i >= 0; i--) {
+      MoveRegionAction action = batchAction.getMoveActions().get(i);
+      cluster.doAction(action.undoAction());
+    }
+  }
+
+  void clearWeightCache() {
+    lastWeighedAt = -1;
+  }
+
+  double getWeight(BalancerClusterState cluster) {
+    boolean hasCandidate = false;
+
+    // Candidate generation is expensive, so for re-weighing generators we 
will cache
+    // the value for a bit
+    if (EnvironmentEdgeManager.currentTime() - lastWeighedAt < 
WEIGHT_CACHE_TTL.toMillis()) {
+      return lastWeight;
+    } else {
+      hasCandidate = generateCandidate(cluster, true) != 
BalanceAction.NULL_ACTION;
+      lastWeighedAt = EnvironmentEdgeManager.currentTime();
+    }
+
+    if (hasCandidate) {
+      // If this generator has something to do, then it's important
+      lastWeight = CandidateGenerator.MAX_WEIGHT;
+    } else {
+      lastWeight = 0;
+    }
+    return lastWeight;
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SlopFixingCandidateGenerator.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SlopFixingCandidateGenerator.java
new file mode 100644
index 00000000000..070e4903394
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SlopFixingCandidateGenerator.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A simple candidate generator that attempts to move regions from the 
most-loaded servers to the
+ * least-loaded servers.
+ */
+@InterfaceAudience.Private
+final class SlopFixingCandidateGenerator extends 
RegionPlanConditionalCandidateGenerator {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SlopFixingCandidateGenerator.class);
+
+  private final float slop;
+
+  SlopFixingCandidateGenerator(BalancerConditionals balancerConditionals, 
float slop) {
+    super(balancerConditionals);
+    this.slop = slop;
+  }
+
+  @Override
+  BalanceAction generateCandidate(BalancerClusterState cluster, boolean 
isWeighing) {
+    ClusterLoadState cs = new ClusterLoadState(cluster.clusterState);
+    float average = cs.getLoadAverage();
+    int ceiling = (int) Math.ceil(average * (1 + slop));
+    Set<Integer> sloppyServerIndices = new HashSet<>();
+    for (int i = 0; i < cluster.numServers; i++) {
+      int regionCount = cluster.regionsPerServer[i].length;
+      if (regionCount > ceiling) {
+        sloppyServerIndices.add(i);
+      }
+    }
+
+    if (sloppyServerIndices.isEmpty()) {
+      LOG.trace("No action to take because no sloppy servers exist.");
+      return BalanceAction.NULL_ACTION;
+    }
+
+    List<MoveRegionAction> moves = new ArrayList<>();
+    Set<ServerAndLoad> fixedServers = new HashSet<>();
+    for (int sourceServer : sloppyServerIndices) {
+      for (int regionIdx : cluster.regionsPerServer[sourceServer]) {
+        boolean regionFoundMove = false;
+        for (ServerAndLoad serverAndLoad : cs.getServersByLoad().keySet()) {
+          ServerName destinationServer = serverAndLoad.getServerName();
+          int destinationServerIdx = 
cluster.serversToIndex.get(destinationServer.getAddress());
+          int regionsOnDestination = 
cluster.regionsPerServer[destinationServerIdx].length;
+          if (regionsOnDestination < average) {
+            MoveRegionAction move =
+              new MoveRegionAction(regionIdx, sourceServer, 
destinationServerIdx);
+            if (willBeAccepted(cluster, move)) {
+              if (isWeighing) {
+                // Fast exit for weighing candidate
+                return move;
+              }
+              moves.add(move);
+              cluster.doAction(move);
+              regionFoundMove = true;
+              break;
+            }
+          } else {
+            fixedServers.add(serverAndLoad);
+          }
+        }
+        fixedServers.forEach(s -> cs.getServersByLoad().remove(s));
+        fixedServers.clear();
+        if (!regionFoundMove) {
+          LOG.debug("Could not find a destination for region {} from server 
{}.", regionIdx,
+            sourceServer);
+        }
+        if (cluster.regionsPerServer[sourceServer].length <= ceiling) {
+          break;
+        }
+      }
+    }
+
+    MoveBatchAction batch = new MoveBatchAction(moves);
+    undoBatchAction(cluster, batch);
+    return batch;
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
index fca4ef95207..42784ea4440 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
@@ -54,7 +54,6 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hbase.thirdparty.com.google.common.base.Suppliers;
 
 /**
@@ -192,6 +191,8 @@ public class StochasticLoadBalancer extends 
BaseLoadBalancer {
       return shuffled;
     }, 5, TimeUnit.SECONDS);
 
+  private final BalancerConditionals balancerConditionals = 
BalancerConditionals.create();
+
   /**
    * The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer 
to replace its
    * default MetricsBalancer
@@ -244,16 +245,24 @@ public class StochasticLoadBalancer extends 
BaseLoadBalancer {
   }
 
   protected Map<Class<? extends CandidateGenerator>, CandidateGenerator>
-    createCandidateGenerators() {
-    Map<Class<? extends CandidateGenerator>, CandidateGenerator> 
candidateGenerators =
-      new HashMap<>(5);
-    candidateGenerators.put(RandomCandidateGenerator.class, new 
RandomCandidateGenerator());
-    candidateGenerators.put(LoadCandidateGenerator.class, new 
LoadCandidateGenerator());
-    candidateGenerators.put(LocalityBasedCandidateGenerator.class, 
localityCandidateGenerator);
-    candidateGenerators.put(RegionReplicaCandidateGenerator.class,
-      new RegionReplicaCandidateGenerator());
-    candidateGenerators.put(RegionReplicaRackCandidateGenerator.class,
-      new RegionReplicaRackCandidateGenerator());
+    createCandidateGenerators(Configuration conf) {
+    balancerConditionals.setConf(conf);
+    Map<Class<? extends CandidateGenerator>, CandidateGenerator> 
candidateGenerators;
+    if (balancerConditionals.isReplicaDistributionEnabled()) {
+      candidateGenerators = new HashMap<>(3);
+      candidateGenerators.put(RandomCandidateGenerator.class, new 
RandomCandidateGenerator());
+      candidateGenerators.put(LoadCandidateGenerator.class, new 
LoadCandidateGenerator());
+      candidateGenerators.put(LocalityBasedCandidateGenerator.class, 
localityCandidateGenerator);
+    } else {
+      candidateGenerators = new HashMap<>(5);
+      candidateGenerators.put(RandomCandidateGenerator.class, new 
RandomCandidateGenerator());
+      candidateGenerators.put(LoadCandidateGenerator.class, new 
LoadCandidateGenerator());
+      candidateGenerators.put(LocalityBasedCandidateGenerator.class, 
localityCandidateGenerator);
+      candidateGenerators.put(RegionReplicaCandidateGenerator.class,
+        new RegionReplicaCandidateGenerator());
+      candidateGenerators.put(RegionReplicaRackCandidateGenerator.class,
+        new RegionReplicaRackCandidateGenerator());
+    }
     return candidateGenerators;
   }
 
@@ -288,7 +297,8 @@ public class StochasticLoadBalancer extends 
BaseLoadBalancer {
     localityCost = new ServerLocalityCostFunction(conf);
     rackLocalityCost = new RackLocalityCostFunction(conf);
 
-    this.candidateGenerators = createCandidateGenerators();
+    balancerConditionals.setConf(conf);
+    this.candidateGenerators = createCandidateGenerators(conf);
 
     regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
     regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
@@ -377,6 +387,11 @@ public class StochasticLoadBalancer extends 
BaseLoadBalancer {
   }
 
   private boolean areSomeRegionReplicasColocatedOnHost(BalancerClusterState c) 
{
+    if (!c.hasRegionReplicas || 
balancerConditionals.isReplicaDistributionEnabled()) {
+      // This check is unnecessary without replicas, or with conditional 
replica distribution
+      // The balancer will auto-run if conditional replica distribution 
candidates are available
+      return false;
+    }
     if (c.numHosts >= c.maxReplicas) {
       regionReplicaHostCostFunction.prepare(c);
       double hostCost = Math.abs(regionReplicaHostCostFunction.cost());
@@ -390,6 +405,11 @@ public class StochasticLoadBalancer extends 
BaseLoadBalancer {
   }
 
   private boolean areSomeRegionReplicasColocatedOnRack(BalancerClusterState c) 
{
+    if (!c.hasRegionReplicas || 
balancerConditionals.isReplicaDistributionEnabled()) {
+      // This check is unnecessary without replicas, or with conditional 
replica distribution
+      // The balancer will auto-run if conditional replica distribution 
candidates are available
+      return false;
+    }
     if (c.numRacks >= c.maxReplicas) {
       regionReplicaRackCostFunction.prepare(c);
       double rackCost = Math.abs(regionReplicaRackCostFunction.cost());
@@ -441,6 +461,11 @@ public class StochasticLoadBalancer extends 
BaseLoadBalancer {
       return true;
     }
 
+    if (balancerConditionals.shouldRunBalancer(cluster)) {
+      LOG.info("Running balancer because conditional candidate generators have 
important moves");
+      return true;
+    }
+
     double total = 0.0;
     float localSumMultiplier = 0; // in case this.sumMultiplier is not 
initialized
     for (CostFunction c : costFunctions) {
@@ -470,14 +495,17 @@ public class StochasticLoadBalancer extends 
BaseLoadBalancer {
       }
       LOG.info(
         "{} - skipping load balancing because weighted average imbalance={} <= 
"
-          + "threshold({}). If you want more aggressive balancing, either 
lower "
+          + "threshold({}) and conditionals do not have opinionated move 
candidates. "
+          + "If you want more aggressive balancing, either lower "
           + "hbase.master.balancer.stochastic.minCostNeedBalance from {} or 
increase the relative "
           + "multiplier(s) of the specific cost function(s). functionCost={}",
         isByTable ? "Table specific (" + tableName + ")" : "Cluster wide", 
total / sumMultiplier,
         minCostNeedBalance, minCostNeedBalance, functionCost());
     } else {
-      LOG.info("{} - Calculating plan. may take up to {}ms to complete.",
-        isByTable ? "Table specific (" + tableName + ")" : "Cluster wide", 
maxRunningTime);
+      LOG.info(
+        "{} - Calculating plan. may take up to {}ms to complete. 
currentCost={}, targetCost={}",
+        isByTable ? "Table specific (" + tableName + ")" : "Cluster wide", 
maxRunningTime, total,
+        minCostNeedBalance);
     }
     return !balanced;
   }
@@ -485,7 +513,7 @@ public class StochasticLoadBalancer extends 
BaseLoadBalancer {
   @RestrictedApi(explanation = "Should only be called in tests", link = "",
       allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
   Pair<CandidateGenerator, BalanceAction> nextAction(BalancerClusterState 
cluster) {
-    CandidateGenerator generator = getRandomGenerator();
+    CandidateGenerator generator = getRandomGenerator(cluster);
     return Pair.newPair(generator, generator.generate(cluster));
   }
 
@@ -494,8 +522,20 @@ public class StochasticLoadBalancer extends 
BaseLoadBalancer {
    * selecting a candidate generator is proportional to the share of cost of 
all cost functions
    * among all cost functions that benefit from it.
    */
-  protected CandidateGenerator getRandomGenerator() {
-    Preconditions.checkState(!candidateGenerators.isEmpty(), "No candidate 
generators available.");
+  protected CandidateGenerator getRandomGenerator(BalancerClusterState 
cluster) {
+    // Prefer conditional generators if they have moves to make
+    if (balancerConditionals.isConditionalBalancingEnabled()) {
+      for (RegionPlanConditional conditional : 
balancerConditionals.getConditionals()) {
+        List<RegionPlanConditionalCandidateGenerator> generators =
+          conditional.getCandidateGenerators();
+        for (RegionPlanConditionalCandidateGenerator generator : generators) {
+          if (generator.getWeight(cluster) > 0) {
+            return generator;
+          }
+        }
+      }
+    }
+
     List<Class<? extends CandidateGenerator>> generatorClasses = 
shuffledGeneratorClasses.get();
     List<Double> partialSums = new ArrayList<>(generatorClasses.size());
     double sum = 0.0;
@@ -583,8 +623,12 @@ public class StochasticLoadBalancer extends 
BaseLoadBalancer {
       rackManager, regionCacheRatioOnOldServerMap);
 
     long startTime = EnvironmentEdgeManager.currentTime();
+    cluster.setStopRequestedAt(startTime + maxRunningTime);
 
     initCosts(cluster);
+    balancerConditionals.loadClusterState(cluster);
+    balancerConditionals.clearConditionalWeightCaches();
+
     float localSumMultiplier = 0;
     for (CostFunction c : costFunctions) {
       if (c.isNeeded()) {
@@ -632,6 +676,7 @@ public class StochasticLoadBalancer extends 
BaseLoadBalancer {
     final String initFunctionTotalCosts = totalCostsPerFunc();
     // Perform a stochastic walk to see if we can get a good fit.
     long step;
+    boolean planImprovedConditionals = false;
     Map<Class<? extends CandidateGenerator>, Long> generatorToStepCount = new 
HashMap<>();
     Map<Class<? extends CandidateGenerator>, Long> 
generatorToApprovedActionCount = new HashMap<>();
     for (step = 0; step < computedMaxSteps; step++) {
@@ -643,16 +688,53 @@ public class StochasticLoadBalancer extends 
BaseLoadBalancer {
         continue;
       }
 
-      cluster.doAction(action);
+      int conditionalViolationsChange = 0;
+      boolean isViolatingConditionals = false;
+      boolean moveImprovedConditionals = false;
+      // Only check conditionals if they are enabled
+      if (balancerConditionals.isConditionalBalancingEnabled()) {
+        // Always accept a conditional generator output. Sometimes conditional 
generators
+        // may need to make controversial moves in order to break what would 
otherwise
+        // be a deadlocked situation.
+        // Otherwise, for normal moves, evaluate the action.
+        if 
(RegionPlanConditionalCandidateGenerator.class.isAssignableFrom(generator.getClass()))
 {
+          conditionalViolationsChange = -1;
+        } else {
+          conditionalViolationsChange =
+            balancerConditionals.getViolationCountChange(cluster, action);
+          isViolatingConditionals = balancerConditionals.isViolating(cluster, 
action);
+        }
+        moveImprovedConditionals = conditionalViolationsChange < 0;
+        if (moveImprovedConditionals) {
+          planImprovedConditionals = true;
+        }
+      }
+
+      // Change state and evaluate costs
+      try {
+        cluster.doAction(action);
+      } catch (IllegalStateException | ArrayIndexOutOfBoundsException e) {
+        LOG.warn(
+          "Generator {} produced invalid action! "
+            + "Debug your candidate generator as this is likely a bug, "
+            + "and may cause a balancer deadlock. {}",
+          generator.getClass().getSimpleName(), action, e);
+        continue;
+      }
       updateCostsAndWeightsWithAction(cluster, action);
-      generatorToStepCount.merge(generator.getClass(), 1L, Long::sum);
+      generatorToStepCount.merge(generator.getClass(), action.getStepCount(), 
Long::sum);
 
       newCost = computeCost(cluster, currentCost);
 
-      // Should this be kept?
-      if (newCost < currentCost) {
+      boolean conditionalsSimilarCostsImproved =
+        (newCost < currentCost && conditionalViolationsChange == 0 && 
!isViolatingConditionals);
+      // Our first priority is to reduce conditional violations
+      // Our second priority is to reduce balancer cost
+      // change, regardless of cost change
+      if (moveImprovedConditionals || conditionalsSimilarCostsImproved) {
         currentCost = newCost;
-        generatorToApprovedActionCount.merge(generator.getClass(), 1L, 
Long::sum);
+        generatorToApprovedActionCount.merge(generator.getClass(), 
action.getStepCount(),
+          Long::sum);
 
         // save for JMX
         curOverallCost = currentCost;
@@ -665,7 +747,7 @@ public class StochasticLoadBalancer extends 
BaseLoadBalancer {
         updateCostsAndWeightsWithAction(cluster, undoAction);
       }
 
-      if (EnvironmentEdgeManager.currentTime() - startTime > maxRunningTime) {
+      if (EnvironmentEdgeManager.currentTime() > cluster.getStopRequestedAt()) 
{
         break;
       }
     }
@@ -682,7 +764,7 @@ public class StochasticLoadBalancer extends 
BaseLoadBalancer {
 
     metricsBalancer.balanceCluster(endTime - startTime);
 
-    if (initCost > currentCost) {
+    if (planImprovedConditionals || (initCost > currentCost)) {
       updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
       plans = createRegionPlans(cluster);
       LOG.info(
@@ -697,7 +779,8 @@ public class StochasticLoadBalancer extends 
BaseLoadBalancer {
     }
     LOG.info(
       "Could not find a better moving plan.  Tried {} different configurations 
in "
-        + "{} ms, and did not find anything with an imbalance score less than 
{}",
+        + "{} ms, and did not find anything with an imbalance score less than 
{} "
+        + "and could not improve conditional violations",
       step, endTime - startTime, initCost / sumMultiplier);
     return null;
   }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SwapRegionsAction.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SwapRegionsAction.java
index 6f83d2bc930..c99de022f03 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SwapRegionsAction.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SwapRegionsAction.java
@@ -17,8 +17,12 @@
  */
 package org.apache.hadoop.hbase.master.balancer;
 
+import java.util.List;
+import org.apache.hadoop.hbase.master.RegionPlan;
 import org.apache.yetus.audience.InterfaceAudience;
 
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+
 @InterfaceAudience.Private
 public class SwapRegionsAction extends BalanceAction {
   private final int fromServer;
@@ -55,6 +59,15 @@ public class SwapRegionsAction extends BalanceAction {
     return new SwapRegionsAction(fromServer, toRegion, toServer, fromRegion);
   }
 
+  @Override
+  List<RegionPlan> toRegionPlans(BalancerClusterState cluster) {
+    return ImmutableList.of(
+      new RegionPlan(cluster.regions[getFromRegion()], 
cluster.servers[getFromServer()],
+        cluster.servers[getToServer()]),
+      new RegionPlan(cluster.regions[getToRegion()], 
cluster.servers[getToServer()],
+        cluster.servers[getFromServer()]));
+  }
+
   @Override
   public String toString() {
     return getType() + ": " + fromRegion + ":" + fromServer + " <-> " + 
toRegion + ":" + toServer;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/replicas/ReplicaKey.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/replicas/ReplicaKey.java
new file mode 100644
index 00000000000..88cfa82dcc6
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/replicas/ReplicaKey.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer.replicas;
+
+import java.util.Arrays;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public final class ReplicaKey {
+  private final TableName tableName;
+  private final byte[] start;
+  private final byte[] stop;
+
+  public ReplicaKey(RegionInfo regionInfo) {
+    this.tableName = regionInfo.getTable();
+    this.start = regionInfo.getStartKey();
+    this.stop = regionInfo.getEndKey();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof ReplicaKey other)) {
+      return false;
+    }
+    return Arrays.equals(this.start, other.start) && Arrays.equals(this.stop, 
other.stop)
+      && this.tableName.equals(other.tableName);
+  }
+
+  @Override
+  public int hashCode() {
+    return new 
HashCodeBuilder().append(tableName).append(start).append(stop).toHashCode();
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/replicas/ReplicaKeyCache.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/replicas/ReplicaKeyCache.java
new file mode 100644
index 00000000000..a40e5f9a2f2
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/replicas/ReplicaKeyCache.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer.replicas;
+
+import java.time.Duration;
+import java.util.function.Supplier;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Suppliers;
+import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
+import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
+import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
+
+@InterfaceAudience.Private
+public final class ReplicaKeyCache implements Configurable {
+  /**
+   * ReplicaKey creation is expensive if you have lots of regions. If your 
HMaster has adequate
+   * memory, and you would like balancing to be faster, then you can turn on 
this flag to cache
+   * ReplicaKey objects.
+   */
+  public static final String CACHE_REPLICA_KEYS_KEY =
+    "hbase.replica.distribution.conditional.cacheReplicaKeys";
+  public static final boolean CACHE_REPLICA_KEYS_DEFAULT = false;
+
+  /**
+   * If memory is available, then set this to a value greater than your region 
count to maximize
+   * replica distribution performance.
+   */
+  public static final String REPLICA_KEY_CACHE_SIZE_KEY =
+    "hbase.replica.distribution.conditional.replicaKeyCacheSize";
+  public static final int REPLICA_KEY_CACHE_SIZE_DEFAULT = 1000;
+
+  private static final Supplier<ReplicaKeyCache> INSTANCE = 
Suppliers.memoize(ReplicaKeyCache::new);
+
+  private volatile LoadingCache<RegionInfo, ReplicaKey> replicaKeyCache = null;
+
+  private Configuration conf;
+
+  public static ReplicaKeyCache getInstance() {
+    return INSTANCE.get();
+  }
+
+  private ReplicaKeyCache() {
+  }
+
+  public ReplicaKey getReplicaKey(RegionInfo regionInfo) {
+    return replicaKeyCache == null
+      ? new ReplicaKey(regionInfo)
+      : replicaKeyCache.getUnchecked(regionInfo);
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+    boolean cacheKeys = conf.getBoolean(CACHE_REPLICA_KEYS_KEY, 
CACHE_REPLICA_KEYS_DEFAULT);
+    if (cacheKeys && replicaKeyCache == null) {
+      int replicaKeyCacheSize =
+        conf.getInt(REPLICA_KEY_CACHE_SIZE_KEY, 
REPLICA_KEY_CACHE_SIZE_DEFAULT);
+      replicaKeyCache = 
CacheBuilder.newBuilder().maximumSize(replicaKeyCacheSize)
+        .expireAfterAccess(Duration.ofMinutes(30)).build(new 
CacheLoader<RegionInfo, ReplicaKey>() {
+          @Override
+          public ReplicaKey load(RegionInfo regionInfo) {
+            return new ReplicaKey(regionInfo);
+          }
+        });
+    } else if (!cacheKeys) {
+      replicaKeyCache = null;
+    }
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerConditionalsTestUtil.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerConditionalsTestUtil.java
new file mode 100644
index 00000000000..8a7169b0930
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerConditionalsTestUtil.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.quotas.QuotaUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
+
+public final class BalancerConditionalsTestUtil {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BalancerConditionalsTestUtil.class);
+
+  private BalancerConditionalsTestUtil() {
+  }
+
+  static byte[][] generateSplits(int numRegions) {
+    byte[][] splitKeys = new byte[numRegions - 1][];
+    for (int i = 0; i < numRegions - 1; i++) {
+      splitKeys[i] =
+        Bytes.toBytes(String.format("%09d", (i + 1) * (Integer.MAX_VALUE / 
numRegions)));
+    }
+    return splitKeys;
+  }
+
+  static void printRegionLocations(Connection connection) throws IOException {
+    Admin admin = connection.getAdmin();
+
+    // Get all table names in the cluster
+    Set<TableName> tableNames = admin.listTableDescriptors(true).stream()
+      .map(TableDescriptor::getTableName).collect(Collectors.toSet());
+
+    // Group regions by server
+    Map<ServerName, Map<TableName, List<RegionInfo>>> serverToRegions =
+      admin.getClusterMetrics().getLiveServerMetrics().keySet().stream()
+        .collect(Collectors.toMap(server -> server, server -> {
+          try {
+            return listRegionsByTable(connection, server, tableNames);
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        }));
+
+    // Pretty print region locations
+    StringBuilder regionLocationOutput = new StringBuilder();
+    regionLocationOutput.append("Pretty printing region locations...\n");
+    serverToRegions.forEach((server, tableRegions) -> {
+      regionLocationOutput.append("Server: " + server.getServerName() + "\n");
+      tableRegions.forEach((table, regions) -> {
+        if (regions.isEmpty()) {
+          return;
+        }
+        regionLocationOutput.append("  Table: " + table.getNameAsString() + 
"\n");
+        regions.forEach(region -> regionLocationOutput
+          .append(String.format("    Region: %s, start: %s, end: %s, replica: 
%s\n",
+            region.getEncodedName(), Bytes.toString(region.getStartKey()),
+            Bytes.toString(region.getEndKey()), region.getReplicaId())));
+      });
+    });
+    LOG.info(regionLocationOutput.toString());
+  }
+
+  private static Map<TableName, List<RegionInfo>> 
listRegionsByTable(Connection connection,
+    ServerName server, Set<TableName> tableNames) throws IOException {
+    Admin admin = connection.getAdmin();
+
+    // Find regions for each table
+    return tableNames.stream().collect(Collectors.toMap(tableName -> 
tableName, tableName -> {
+      List<RegionInfo> allRegions = null;
+      try {
+        allRegions = admin.getRegions(server);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      return allRegions.stream().filter(region -> 
region.getTable().equals(tableName))
+        .collect(Collectors.toList());
+    }));
+  }
+
+  static void validateReplicaDistribution(Connection connection, TableName 
tableName,
+    boolean shouldBeDistributed) {
+    Map<ServerName, List<RegionInfo>> serverToRegions = null;
+    try {
+      serverToRegions = 
connection.getRegionLocator(tableName).getAllRegionLocations().stream()
+        .collect(Collectors.groupingBy(location -> location.getServerName(),
+          Collectors.mapping(location -> location.getRegion(), 
Collectors.toList())));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    if (shouldBeDistributed) {
+      // Ensure no server hosts more than one replica of any region
+      for (Map.Entry<ServerName, List<RegionInfo>> serverAndRegions : 
serverToRegions.entrySet()) {
+        List<RegionInfo> regionInfos = serverAndRegions.getValue();
+        Set<byte[]> startKeys = new HashSet<>();
+        for (RegionInfo regionInfo : regionInfos) {
+          // each region should have a distinct start key
+          assertFalse(
+            "Each region should have its own start key, "
+              + "demonstrating it is not a replica of any others on this host",
+            startKeys.contains(regionInfo.getStartKey()));
+          startKeys.add(regionInfo.getStartKey());
+        }
+      }
+    } else {
+      // Ensure all replicas are on the same server
+      assertEquals("All regions should share one server", 1, 
serverToRegions.size());
+    }
+  }
+
+  static void validateRegionLocations(Map<TableName, Set<ServerName>> 
tableToServers,
+    TableName productTableName, boolean shouldBeBalanced) {
+    ServerName metaServer =
+      
tableToServers.get(TableName.META_TABLE_NAME).stream().findFirst().orElseThrow();
+    ServerName quotaServer =
+      
tableToServers.get(QuotaUtil.QUOTA_TABLE_NAME).stream().findFirst().orElseThrow();
+    Set<ServerName> productServers = tableToServers.get(productTableName);
+
+    if (shouldBeBalanced) {
+      for (ServerName server : productServers) {
+        assertNotEquals("Meta table and product table should not share 
servers", server,
+          metaServer);
+        assertNotEquals("Quota table and product table should not share 
servers", server,
+          quotaServer);
+      }
+      assertNotEquals("The meta server and quotas server should be different", 
metaServer,
+        quotaServer);
+    } else {
+      for (ServerName server : productServers) {
+        assertEquals("Meta table and product table must share servers", 
server, metaServer);
+        assertEquals("Quota table and product table must share servers", 
server, quotaServer);
+      }
+      assertEquals("The meta server and quotas server must be the same", 
metaServer, quotaServer);
+    }
+  }
+
+  static Map<TableName, Set<ServerName>> getTableToServers(Connection 
connection,
+    Set<TableName> tableNames) {
+    return tableNames.stream().collect(Collectors.toMap(t -> t, t -> {
+      try {
+        return connection.getRegionLocator(t).getAllRegionLocations().stream()
+          .map(HRegionLocation::getServerName).collect(Collectors.toSet());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }));
+  }
+
+  @FunctionalInterface
+  interface AssertionRunnable {
+    void run() throws AssertionError;
+  }
+
+  static void validateAssertionsWithRetries(HBaseTestingUtil testUtil, boolean 
runBalancerOnFailure,
+    AssertionRunnable assertion) {
+    validateAssertionsWithRetries(testUtil, runBalancerOnFailure, 
ImmutableSet.of(assertion));
+  }
+
+  static void validateAssertionsWithRetries(HBaseTestingUtil testUtil, boolean 
runBalancerOnFailure,
+    Set<AssertionRunnable> assertions) {
+    int maxAttempts = 50;
+    for (int i = 0; i < maxAttempts; i++) {
+      try {
+        for (AssertionRunnable assertion : assertions) {
+          assertion.run();
+        }
+      } catch (AssertionError e) {
+        if (i == maxAttempts - 1) {
+          throw e;
+        }
+        try {
+          LOG.warn("Failed to validate region locations. Will retry", e);
+          Thread.sleep(1000);
+          
BalancerConditionalsTestUtil.printRegionLocations(testUtil.getConnection());
+          if (runBalancerOnFailure) {
+            testUtil.getAdmin().balance();
+          }
+          Thread.sleep(1000);
+        } catch (Exception ex) {
+          throw new RuntimeException(ex);
+        }
+      }
+    }
+  }
+
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/CandidateGeneratorTestUtil.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/CandidateGeneratorTestUtil.java
new file mode 100644
index 00000000000..116ee4fc657
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/CandidateGeneratorTestUtil.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import static 
org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer.MAX_RUNNING_TIME_KEY;
+import static 
org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer.MIN_COST_NEED_BALANCE_KEY;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.master.RegionPlan;
+import org.apache.hadoop.hbase.master.balancer.replicas.ReplicaKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class CandidateGeneratorTestUtil {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CandidateGeneratorTestUtil.class);
+
+  private CandidateGeneratorTestUtil() {
+  }
+
+  static void runBalancerToExhaustion(Configuration conf,
+    Map<ServerName, List<RegionInfo>> serverToRegions,
+    Set<Function<BalancerClusterState, Boolean>> expectations, float 
targetMaxBalancerCost) {
+    // Do the full plan. We're testing with a lot of regions
+    conf.setBoolean("hbase.master.balancer.stochastic.runMaxSteps", true);
+    conf.setLong(MAX_RUNNING_TIME_KEY, 15000);
+
+    conf.setFloat(MIN_COST_NEED_BALANCE_KEY, targetMaxBalancerCost);
+
+    BalancerClusterState cluster = 
createMockBalancerClusterState(serverToRegions);
+    StochasticLoadBalancer stochasticLoadBalancer = 
buildStochasticLoadBalancer(cluster, conf);
+    printClusterDistribution(cluster, 0);
+    int balancerRuns = 0;
+    int actionsTaken = 0;
+    long balancingMillis = 0;
+    boolean isBalanced = false;
+    while (!isBalanced) {
+      balancerRuns++;
+      if (balancerRuns > 1000) {
+        throw new RuntimeException("Balancer failed to find balance & meet 
expectations");
+      }
+      long start = System.currentTimeMillis();
+      List<RegionPlan> regionPlans =
+        
stochasticLoadBalancer.balanceCluster(partitionRegionsByTable(serverToRegions));
+      balancingMillis += System.currentTimeMillis() - start;
+      actionsTaken++;
+      if (regionPlans != null) {
+        // Apply all plans to serverToRegions
+        for (RegionPlan rp : regionPlans) {
+          ServerName source = rp.getSource();
+          ServerName dest = rp.getDestination();
+          RegionInfo region = rp.getRegionInfo();
+
+          // Update serverToRegions
+          serverToRegions.get(source).remove(region);
+          serverToRegions.get(dest).add(region);
+          actionsTaken++;
+        }
+
+        // Now rebuild cluster and balancer from updated serverToRegions
+        cluster = createMockBalancerClusterState(serverToRegions);
+        stochasticLoadBalancer = buildStochasticLoadBalancer(cluster, conf);
+      }
+      printClusterDistribution(cluster, actionsTaken);
+      isBalanced = true;
+      for (Function<BalancerClusterState, Boolean> condition : expectations) {
+        // Check if we've met all expectations for the candidate generator
+        if (!condition.apply(cluster)) {
+          isBalanced = false;
+          break;
+        }
+      }
+      if (isBalanced) { // Check if the balancer thinks we're done too
+        LOG.info("All balancer conditions passed. Checking if balancer thinks 
it's done.");
+        if 
(stochasticLoadBalancer.needsBalance(HConstants.ENSEMBLE_TABLE_NAME, cluster)) {
+          LOG.info("Balancer would still like to run");
+          isBalanced = false;
+        } else {
+          LOG.info("Balancer is done");
+        }
+      }
+    }
+    LOG.info("Balancing took {}sec", 
Duration.ofMillis(balancingMillis).toMinutes());
+  }
+
+  /**
+   * Prints the current cluster distribution of regions per table per server
+   */
+  static void printClusterDistribution(BalancerClusterState cluster, long 
actionsTaken) {
+    LOG.info("=== Cluster Distribution after {} balancer actions taken ===", 
actionsTaken);
+
+    for (int i = 0; i < cluster.numServers; i++) {
+      int[] regions = cluster.regionsPerServer[i];
+      int regionCount = (regions == null) ? 0 : regions.length;
+
+      LOG.info("Server {}: {} regions", cluster.servers[i].getServerName(), 
regionCount);
+
+      if (regionCount > 0) {
+        Map<TableName, Integer> tableRegionCounts = new HashMap<>();
+
+        for (int regionIndex : regions) {
+          RegionInfo regionInfo = cluster.regions[regionIndex];
+          TableName tableName = regionInfo.getTable();
+          tableRegionCounts.put(tableName, 
tableRegionCounts.getOrDefault(tableName, 0) + 1);
+        }
+
+        tableRegionCounts
+          .forEach((table, count) -> LOG.info("  - Table {}: {} regions", 
table, count));
+      }
+    }
+
+    LOG.info("===========================================");
+  }
+
+  /**
+   * Partitions the given serverToRegions map by table The tables are derived 
from the RegionInfo
+   * objects found in serverToRegions.
+   * @param serverToRegions The map of servers to their assigned regions.
+   * @return A map of tables to their server-to-region assignments.
+   */
+  public static Map<TableName, Map<ServerName, List<RegionInfo>>>
+    partitionRegionsByTable(Map<ServerName, List<RegionInfo>> serverToRegions) 
{
+
+    // First, gather all tables from the regions
+    Set<TableName> allTables = new HashSet<>();
+    for (List<RegionInfo> regions : serverToRegions.values()) {
+      for (RegionInfo region : regions) {
+        allTables.add(region.getTable());
+      }
+    }
+
+    Map<TableName, Map<ServerName, List<RegionInfo>>> tablesToServersToRegions 
= new HashMap<>();
+
+    // Initialize each table with all servers mapped to empty lists
+    for (TableName table : allTables) {
+      Map<ServerName, List<RegionInfo>> serverMap = new HashMap<>();
+      for (ServerName server : serverToRegions.keySet()) {
+        serverMap.put(server, new ArrayList<>());
+      }
+      tablesToServersToRegions.put(table, serverMap);
+    }
+
+    // Distribute regions to their respective tables
+    for (Map.Entry<ServerName, List<RegionInfo>> serverAndRegions : 
serverToRegions.entrySet()) {
+      ServerName server = serverAndRegions.getKey();
+      List<RegionInfo> regions = serverAndRegions.getValue();
+
+      for (RegionInfo region : regions) {
+        TableName regionTable = region.getTable();
+        // Now we know for sure regionTable is in allTables
+        Map<ServerName, List<RegionInfo>> tableServerMap =
+          tablesToServersToRegions.get(regionTable);
+        tableServerMap.get(server).add(region);
+      }
+    }
+
+    return tablesToServersToRegions;
+  }
+
+  static StochasticLoadBalancer 
buildStochasticLoadBalancer(BalancerClusterState cluster,
+    Configuration conf) {
+    StochasticLoadBalancer stochasticLoadBalancer =
+      new StochasticLoadBalancer(new DummyMetricsStochasticBalancer());
+    stochasticLoadBalancer.setClusterInfoProvider(new 
DummyClusterInfoProvider(conf));
+    stochasticLoadBalancer.loadConf(conf);
+    stochasticLoadBalancer.initCosts(cluster);
+    return stochasticLoadBalancer;
+  }
+
+  static BalancerClusterState
+    createMockBalancerClusterState(Map<ServerName, List<RegionInfo>> 
serverToRegions) {
+    return new BalancerClusterState(serverToRegions, null, null, null, null);
+  }
+
+  /**
+   * Validates that each replica is isolated from its others. Ensures that no 
server hosts more than
+   * one replica of the same region (i.e., regions with identical start and 
end keys).
+   * @param cluster The current state of the cluster.
+   * @return true if all replicas are properly isolated, false otherwise.
+   */
+  static boolean areAllReplicasDistributed(BalancerClusterState cluster) {
+    // Iterate over each server
+    for (int[] regionsPerServer : cluster.regionsPerServer) {
+      if (regionsPerServer == null || regionsPerServer.length == 0) {
+        continue; // Skip empty servers
+      }
+
+      Set<ReplicaKey> foundKeys = new HashSet<>();
+      for (int regionIndex : regionsPerServer) {
+        RegionInfo regionInfo = cluster.regions[regionIndex];
+        ReplicaKey replicaKey = new ReplicaKey(regionInfo);
+        if (foundKeys.contains(replicaKey)) {
+          // Violation: Multiple replicas of the same region on the same server
+          LOG.warn("Replica isolation violated: one server hosts multiple 
replicas of key [{}].",
+            generateRegionKey(regionInfo));
+          return false;
+        }
+
+        foundKeys.add(replicaKey);
+      }
+    }
+
+    LOG.info(
+      "Replica isolation validation passed: No server hosts multiple replicas 
of the same region.");
+    return true;
+  }
+
+  /**
+   * Generates a unique key for a region based on its start and end keys. This 
method ensures that
+   * regions with identical start and end keys have the same key.
+   * @param regionInfo The RegionInfo object.
+   * @return A string representing the unique key of the region.
+   */
+  private static String generateRegionKey(RegionInfo regionInfo) {
+    // Using Base64 encoding for byte arrays to ensure uniqueness and 
readability
+    String startKey = 
Base64.getEncoder().encodeToString(regionInfo.getStartKey());
+    String endKey = Base64.getEncoder().encodeToString(regionInfo.getEndKey());
+
+    return regionInfo.getTable().getNameAsString() + ":" + startKey + ":" + 
endKey;
+  }
+
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalanceAction.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/DistributeReplicasTestConditional.java
similarity index 57%
copy from 
hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalanceAction.java
copy to 
hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/DistributeReplicasTestConditional.java
index 56b473ae710..5a8fa2524fe 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalanceAction.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/DistributeReplicasTestConditional.java
@@ -17,42 +17,23 @@
  */
 package org.apache.hadoop.hbase.master.balancer;
 
-import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
 
-/**
- * An action to move or swap a region
- */
-@InterfaceAudience.Private
-abstract class BalanceAction {
-  enum Type {
-    ASSIGN_REGION,
-    MOVE_REGION,
-    SWAP_REGIONS,
-    NULL,
-  }
-
-  static final BalanceAction NULL_ACTION = new BalanceAction(Type.NULL) {
-  };
-
-  private final Type type;
-
-  BalanceAction(Type type) {
-    this.type = type;
-  }
+public class DistributeReplicasTestConditional extends 
DistributeReplicasConditional {
 
-  /**
-   * Returns an Action which would undo this action
-   */
-  BalanceAction undoAction() {
-    return this;
+  static void enableConditionalReplicaDistributionForTest(Configuration conf) {
+    conf.set(BalancerConditionals.ADDITIONAL_CONDITIONALS_KEY,
+      DistributeReplicasTestConditional.class.getCanonicalName());
   }
 
-  Type getType() {
-    return type;
+  public DistributeReplicasTestConditional(BalancerConditionals 
balancerConditionals,
+    BalancerClusterState cluster) {
+    super(balancerConditionals, cluster);
   }
 
   @Override
-  public String toString() {
-    return type + ":";
+  public ValidationLevel getValidationLevel() {
+    // Mini-cluster tests can't validate at host/rack levels
+    return ValidationLevel.SERVER;
   }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/LoadOnlyFavoredStochasticBalancer.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/LoadOnlyFavoredStochasticBalancer.java
index d658f7cfa16..dfacad1a747 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/LoadOnlyFavoredStochasticBalancer.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/LoadOnlyFavoredStochasticBalancer.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master.balancer;
 
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
 
 /**
  * Used for FavoredNode unit tests
@@ -27,7 +28,7 @@ public class LoadOnlyFavoredStochasticBalancer extends 
FavoredStochasticBalancer
 
   @Override
   protected Map<Class<? extends CandidateGenerator>, CandidateGenerator>
-    createCandidateGenerators() {
+    createCandidateGenerators(Configuration conf) {
     Map<Class<? extends CandidateGenerator>, CandidateGenerator> fnPickers = 
new HashMap<>(1);
     fnPickers.put(FavoredNodeLoadPicker.class, new FavoredNodeLoadPicker());
     return fnPickers;
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBalancerConditionals.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBalancerConditionals.java
new file mode 100644
index 00000000000..884331f161a
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBalancerConditionals.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ SmallTests.class, MasterTests.class })
+public class TestBalancerConditionals extends BalancerTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestBalancerConditionals.class);
+
+  private BalancerConditionals balancerConditionals;
+  private BalancerClusterState mockCluster;
+
+  @Before
+  public void setUp() {
+    balancerConditionals = BalancerConditionals.create();
+    mockCluster = mockCluster(new int[] { 0, 1, 2 });
+  }
+
+  @Test
+  public void testDefaultConfiguration() {
+    Configuration conf = new Configuration();
+    balancerConditionals.setConf(conf);
+    balancerConditionals.loadClusterState(mockCluster);
+
+    assertEquals("No conditionals should be loaded by default", 0,
+      balancerConditionals.getConditionalClasses().size());
+  }
+
+  @Test
+  public void testCustomConditionalsViaConfiguration() {
+    Configuration conf = new Configuration();
+    conf.set(BalancerConditionals.ADDITIONAL_CONDITIONALS_KEY,
+      DistributeReplicasConditional.class.getName());
+
+    balancerConditionals.setConf(conf);
+    balancerConditionals.loadClusterState(mockCluster);
+
+    assertTrue("Custom conditionals should be loaded",
+      balancerConditionals.shouldSkipSloppyServerEvaluation());
+  }
+
+  @Test
+  public void testInvalidCustomConditionalClass() {
+    Configuration conf = new Configuration();
+    conf.set(BalancerConditionals.ADDITIONAL_CONDITIONALS_KEY, 
"java.lang.String");
+
+    balancerConditionals.setConf(conf);
+    balancerConditionals.loadClusterState(mockCluster);
+
+    assertEquals("Invalid classes should not be loaded as conditionals", 0,
+      balancerConditionals.getConditionalClasses().size());
+  }
+
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestLargeClusterBalancingConditionalReplicaDistribution.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestLargeClusterBalancingConditionalReplicaDistribution.java
new file mode 100644
index 00000000000..e6cec7045e7
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestLargeClusterBalancingConditionalReplicaDistribution.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import static 
org.apache.hadoop.hbase.master.balancer.CandidateGeneratorTestUtil.runBalancerToExhaustion;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.master.balancer.replicas.ReplicaKeyCache;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ MediumTests.class, MasterTests.class })
+public class TestLargeClusterBalancingConditionalReplicaDistribution {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    
HBaseClassTestRule.forClass(TestLargeClusterBalancingConditionalReplicaDistribution.class);
+
+  private static final Logger LOG =
+    
LoggerFactory.getLogger(TestLargeClusterBalancingConditionalReplicaDistribution.class);
+
+  private static final int NUM_SERVERS = 1000;
+  private static final int NUM_REGIONS = 20_000;
+  private static final int NUM_REPLICAS = 3;
+  private static final int NUM_TABLES = 100;
+
+  private static final ServerName[] servers = new ServerName[NUM_SERVERS];
+  private static final Map<ServerName, List<RegionInfo>> serverToRegions = new 
HashMap<>();
+
+  @BeforeClass
+  public static void setup() {
+    // Initialize servers
+    for (int i = 0; i < NUM_SERVERS; i++) {
+      servers[i] = ServerName.valueOf("server" + i, i, 
System.currentTimeMillis());
+      serverToRegions.put(servers[i], new ArrayList<>());
+    }
+
+    // Create primary regions and their replicas
+    List<RegionInfo> allRegions = new ArrayList<>();
+    for (int i = 0; i < NUM_REGIONS; i++) {
+      TableName tableName = getTableName(i);
+      // Define startKey and endKey for the region
+      byte[] startKey = Bytes.toBytes(i);
+      byte[] endKey = Bytes.toBytes(i + 1);
+
+      // Create 3 replicas for each primary region
+      for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) {
+        RegionInfo regionInfo = 
RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey)
+          .setEndKey(endKey).setReplicaId(replicaId).build();
+        allRegions.add(regionInfo);
+      }
+    }
+
+    // Assign all regions to one server
+    for (RegionInfo regionInfo : allRegions) {
+      serverToRegions.get(servers[0]).add(regionInfo);
+    }
+  }
+
+  private static TableName getTableName(int i) {
+    return TableName.valueOf("userTable" + i % NUM_TABLES);
+  }
+
+  @Test
+  public void testReplicaDistribution() {
+    Configuration conf = new Configuration();
+    
DistributeReplicasTestConditional.enableConditionalReplicaDistributionForTest(conf);
+    conf.setBoolean(ReplicaKeyCache.CACHE_REPLICA_KEYS_KEY, true);
+    conf.setInt(ReplicaKeyCache.REPLICA_KEY_CACHE_SIZE_KEY, Integer.MAX_VALUE);
+    conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 30_000);
+
+    // turn off replica cost functions
+    conf.setLong("hbase.master.balancer.stochastic.regionReplicaRackCostKey", 
0);
+    conf.setLong("hbase.master.balancer.stochastic.regionReplicaHostCostKey", 
0);
+
+    runBalancerToExhaustion(conf, serverToRegions,
+      Set.of(CandidateGeneratorTestUtil::areAllReplicasDistributed), 10.0f);
+    LOG.info("Meta table and system table regions are successfully isolated, "
+      + "meanwhile region replicas are appropriately distributed across 
RegionServers.");
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestReplicaDistributionBalancerConditional.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestReplicaDistributionBalancerConditional.java
new file mode 100644
index 00000000000..0aae720f838
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestReplicaDistributionBalancerConditional.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import static 
org.apache.hadoop.hbase.master.balancer.BalancerConditionalsTestUtil.validateAssertionsWithRetries;
+
+import java.util.List;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ LargeTests.class, MasterTests.class })
+public class TestReplicaDistributionBalancerConditional {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    
HBaseClassTestRule.forClass(TestReplicaDistributionBalancerConditional.class);
+
+  private static final Logger LOG =
+    LoggerFactory.getLogger(TestReplicaDistributionBalancerConditional.class);
+  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
+  private static final int REPLICAS = 3;
+  private static final int NUM_SERVERS = REPLICAS;
+  private static final int REGIONS_PER_SERVER = 5;
+
+  @Before
+  public void setUp() throws Exception {
+    DistributeReplicasTestConditional
+      
.enableConditionalReplicaDistributionForTest(TEST_UTIL.getConfiguration());
+    TEST_UTIL.getConfiguration()
+      .setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, 
true);
+    TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_BALANCER_PERIOD, 
1000L);
+    
TEST_UTIL.getConfiguration().setBoolean("hbase.master.balancer.stochastic.runMaxSteps",
 true);
+
+    // turn off replica cost functions
+    TEST_UTIL.getConfiguration()
+      .setLong("hbase.master.balancer.stochastic.regionReplicaRackCostKey", 0);
+    TEST_UTIL.getConfiguration()
+      .setLong("hbase.master.balancer.stochastic.regionReplicaHostCostKey", 0);
+
+    TEST_UTIL.startMiniCluster(NUM_SERVERS);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testReplicaDistribution() throws Exception {
+    Connection connection = TEST_UTIL.getConnection();
+    Admin admin = connection.getAdmin();
+
+    // Create a "replicated_table" with region replicas
+    TableName replicatedTableName = TableName.valueOf("replicated_table");
+    TableDescriptor replicatedTableDescriptor =
+      TableDescriptorBuilder.newBuilder(replicatedTableName)
+        
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("0")).build())
+        .setRegionReplication(REPLICAS).build();
+    admin.createTable(replicatedTableDescriptor,
+      BalancerConditionalsTestUtil.generateSplits(REGIONS_PER_SERVER * 
NUM_SERVERS));
+
+    // Pause the balancer
+    admin.balancerSwitch(false, true);
+
+    // Collect all region replicas and place them on one RegionServer
+    List<RegionInfo> allRegions = admin.getRegions(replicatedTableName);
+    String targetServer =
+      
TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName().getServerName();
+
+    for (RegionInfo region : allRegions) {
+      admin.move(region.getEncodedNameAsBytes(), Bytes.toBytes(targetServer));
+    }
+
+    
BalancerConditionalsTestUtil.printRegionLocations(TEST_UTIL.getConnection());
+    validateAssertionsWithRetries(TEST_UTIL, false, () -> 
BalancerConditionalsTestUtil
+      .validateReplicaDistribution(connection, replicatedTableName, false));
+
+    // Unpause the balancer and trigger balancing
+    admin.balancerSwitch(true, true);
+    admin.balance();
+
+    validateAssertionsWithRetries(TEST_UTIL, true, () -> 
BalancerConditionalsTestUtil
+      .validateReplicaDistribution(connection, replicatedTableName, true));
+    
BalancerConditionalsTestUtil.printRegionLocations(TEST_UTIL.getConnection());
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerHeterogeneousCost.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerHeterogeneousCost.java
index 960783a8467..188efa64dcd 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerHeterogeneousCost.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerHeterogeneousCost.java
@@ -254,7 +254,7 @@ public class TestStochasticLoadBalancerHeterogeneousCost 
extends BalancerTestBas
     }
 
     @Override
-    protected CandidateGenerator getRandomGenerator() {
+    protected CandidateGenerator getRandomGenerator(BalancerClusterState 
cluster) {
       return fairRandomCandidateGenerator;
     }
   }

Reply via email to