This is an automated email from the ASF dual-hosted git repository.
rmattingly pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 720c38e3523 HBASE-28513 The StochasticLoadBalancer should support
discrete evaluations (#6651) (#6720)
720c38e3523 is described below
commit 720c38e35239b1e449ecb26d6f2636b1ccefd546
Author: Ray Mattingly <[email protected]>
AuthorDate: Fri Feb 28 16:39:02 2025 -0500
HBASE-28513 The StochasticLoadBalancer should support discrete evaluations
(#6651) (#6720)
Signed-off-by: Nick Dimiduk <[email protected]>
Co-authored-by: Ray Mattingly <[email protected]>
---
.../hbase/master/balancer/AssignRegionAction.java | 10 +
.../hbase/master/balancer/BalanceAction.java | 36 ++-
.../master/balancer/BalancerClusterState.java | 91 +++++++-
.../master/balancer/BalancerConditionals.java | 213 +++++++++++++++++
.../hbase/master/balancer/BaseLoadBalancer.java | 12 +-
.../master/balancer/CacheAwareLoadBalancer.java | 2 +-
.../hbase/master/balancer/CandidateGenerator.java | 2 +
.../hadoop/hbase/master/balancer/CostFunction.java | 7 +
.../DistributeReplicasCandidateGenerator.java | 115 +++++++++
.../balancer/DistributeReplicasConditional.java | 97 ++++++++
.../master/balancer/FavoredStochasticBalancer.java | 4 +-
.../hbase/master/balancer/MoveBatchAction.java | 77 +++++++
.../hbase/master/balancer/MoveRegionAction.java | 10 +
.../master/balancer/RegionPlanConditional.java | 133 +++++++++++
.../RegionPlanConditionalCandidateGenerator.java | 113 +++++++++
.../balancer/SlopFixingCandidateGenerator.java | 105 +++++++++
.../master/balancer/StochasticLoadBalancer.java | 135 ++++++++---
.../hbase/master/balancer/SwapRegionsAction.java | 13 ++
.../hbase/master/balancer/replicas/ReplicaKey.java | 55 +++++
.../master/balancer/replicas/ReplicaKeyCache.java | 93 ++++++++
.../balancer/BalancerConditionalsTestUtil.java | 221 ++++++++++++++++++
.../balancer/CandidateGeneratorTestUtil.java | 256 +++++++++++++++++++++
.../DistributeReplicasTestConditional.java} | 41 +---
.../LoadOnlyFavoredStochasticBalancer.java | 3 +-
.../master/balancer/TestBalancerConditionals.java | 83 +++++++
...terBalancingConditionalReplicaDistribution.java | 114 +++++++++
...TestReplicaDistributionBalancerConditional.java | 120 ++++++++++
...estStochasticLoadBalancerHeterogeneousCost.java | 2 +-
28 files changed, 2092 insertions(+), 71 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/AssignRegionAction.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/AssignRegionAction.java
index c99ae092d77..8a79b64142e 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/AssignRegionAction.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/AssignRegionAction.java
@@ -17,9 +17,13 @@
*/
package org.apache.hadoop.hbase.master.balancer;
+import java.util.List;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+
@InterfaceAudience.Private
class AssignRegionAction extends BalanceAction {
private final int region;
@@ -46,6 +50,12 @@ class AssignRegionAction extends BalanceAction {
throw new UnsupportedOperationException(HConstants.NOT_IMPLEMENTED);
}
+ @Override
+ List<RegionPlan> toRegionPlans(BalancerClusterState cluster) {
+ return ImmutableList
+ .of(new RegionPlan(cluster.regions[getRegion()], null,
cluster.servers[getServer()]));
+ }
+
@Override
public String toString() {
return getType() + ": " + region + ":" + server;
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalanceAction.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalanceAction.java
index 56b473ae710..a65b5253907 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalanceAction.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalanceAction.java
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hbase.master.balancer;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -28,11 +31,11 @@ abstract class BalanceAction {
ASSIGN_REGION,
MOVE_REGION,
SWAP_REGIONS,
+ MOVE_BATCH,
NULL,
}
- static final BalanceAction NULL_ACTION = new BalanceAction(Type.NULL) {
- };
+ static final BalanceAction NULL_ACTION = new NullBalanceAction();
private final Type type;
@@ -43,16 +46,39 @@ abstract class BalanceAction {
/**
* Returns an Action which would undo this action
*/
- BalanceAction undoAction() {
- return this;
- }
+ abstract BalanceAction undoAction();
+
+ /**
+ * Returns the Action represented as RegionPlans
+ */
+ abstract List<RegionPlan> toRegionPlans(BalancerClusterState cluster);
Type getType() {
return type;
}
+ long getStepCount() {
+ return 1;
+ }
+
@Override
public String toString() {
return type + ":";
}
+
+ private static final class NullBalanceAction extends BalanceAction {
+ private NullBalanceAction() {
+ super(Type.NULL);
+ }
+
+ @Override
+ BalanceAction undoAction() {
+ return this;
+ }
+
+ @Override
+ List<RegionPlan> toRegionPlans(BalancerClusterState cluster) {
+ return Collections.emptyList();
+ }
+ }
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java
index b857055fb3a..67755fc317c 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java
@@ -26,6 +26,9 @@ import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import org.agrona.collections.Hashing;
import org.agrona.collections.Int2IntCounterMap;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
@@ -39,6 +42,8 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.base.Suppliers;
+
/**
* An efficient array based implementation similar to ClusterState for keeping
the status of the
* cluster in terms of region assignment and distribution. LoadBalancers, such
as
@@ -123,6 +128,15 @@ class BalancerClusterState {
// Maps regionName -> oldServerName -> cache ratio of the region on the old
server
Map<String, Pair<ServerName, Float>> regionCacheRatioOnOldServerMap;
+ private final Supplier<List<Integer>> shuffledServerIndicesSupplier =
+ Suppliers.memoizeWithExpiration(() -> {
+ Collection<Integer> serverIndices = serversToIndex.values();
+ List<Integer> shuffledServerIndices = new ArrayList<>(serverIndices);
+ Collections.shuffle(shuffledServerIndices);
+ return shuffledServerIndices;
+ }, 5, TimeUnit.SECONDS);
+ private long stopRequestedAt = Long.MAX_VALUE;
+
static class DefaultRackManager extends RackManager {
@Override
public String getRack(ServerName server) {
@@ -728,8 +742,25 @@ class BalancerClusterState {
regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer());
regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer());
break;
+ case MOVE_BATCH:
+ assert action instanceof MoveBatchAction : action.getClass();
+ MoveBatchAction mba = (MoveBatchAction) action;
+ for (int serverIndex : mba.getServerToRegionsToRemove().keySet()) {
+ Set<Integer> regionsToRemove =
mba.getServerToRegionsToRemove().get(serverIndex);
+ regionsPerServer[serverIndex] =
+ removeRegions(regionsPerServer[serverIndex], regionsToRemove);
+ }
+ for (int serverIndex : mba.getServerToRegionsToAdd().keySet()) {
+ Set<Integer> regionsToAdd =
mba.getServerToRegionsToAdd().get(serverIndex);
+ regionsPerServer[serverIndex] =
addRegions(regionsPerServer[serverIndex], regionsToAdd);
+ }
+ for (MoveRegionAction moveRegionAction : mba.getMoveActions()) {
+ regionMoved(moveRegionAction.getRegion(),
moveRegionAction.getFromServer(),
+ moveRegionAction.getToServer());
+ }
+ break;
default:
- throw new RuntimeException("Uknown action:" + action.getType());
+ throw new RuntimeException("Unknown action:" + action.getType());
}
}
@@ -891,6 +922,52 @@ class BalancerClusterState {
return newRegions;
}
+ int[] removeRegions(int[] regions, Set<Integer> regionIndicesToRemove) {
+ // Calculate the size of the new regions array
+ int newSize = regions.length - regionIndicesToRemove.size();
+ if (newSize < 0) {
+ throw new IllegalStateException(
+ "Region indices mismatch: more regions to remove than in the regions
array");
+ }
+
+ int[] newRegions = new int[newSize];
+ int newIndex = 0;
+
+ // Copy only the regions not in the removal set
+ for (int region : regions) {
+ if (!regionIndicesToRemove.contains(region)) {
+ newRegions[newIndex++] = region;
+ }
+ }
+
+ // If the newIndex is smaller than newSize, some regions were missing from
the input array
+ if (newIndex != newSize) {
+ throw new IllegalStateException("Region indices mismatch: some regions
in the removal "
+ + "set were not found in the regions array");
+ }
+
+ return newRegions;
+ }
+
+ int[] addRegions(int[] regions, Set<Integer> regionIndicesToAdd) {
+ int[] newRegions = new int[regions.length + regionIndicesToAdd.size()];
+
+ // Copy the existing regions to the new array
+ System.arraycopy(regions, 0, newRegions, 0, regions.length);
+
+ // Add the new regions at the end of the array
+ int newIndex = regions.length;
+ for (int regionIndex : regionIndicesToAdd) {
+ newRegions[newIndex++] = regionIndex;
+ }
+
+ return newRegions;
+ }
+
+ List<Integer> getShuffledServerIndices() {
+ return shuffledServerIndicesSupplier.get();
+ }
+
int[] addRegionSorted(int[] regions, int regionIndex) {
int[] newRegions = new int[regions.length + 1];
int i = 0;
@@ -990,6 +1067,18 @@ class BalancerClusterState {
this.numMovedRegions = numMovedRegions;
}
+ public int getMaxReplicas() {
+ return maxReplicas;
+ }
+
+ void setStopRequestedAt(long stopRequestedAt) {
+ this.stopRequestedAt = stopRequestedAt;
+ }
+
+ long getStopRequestedAt() {
+ return stopRequestedAt;
+ }
+
@Override
public String toString() {
StringBuilder desc = new StringBuilder("Cluster={servers=[");
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerConditionals.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerConditionals.java
new file mode 100644
index 00000000000..c44e4799693
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerConditionals.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import java.lang.reflect.Constructor;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.master.RegionPlan;
+import org.apache.hadoop.hbase.master.balancer.replicas.ReplicaKeyCache;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
+
+/**
+ * Balancer conditionals supplement cost functions in the {@link
StochasticLoadBalancer}. Cost
+ * functions are insufficient and difficult to work with when making discrete
decisions; this is
+ * because they operate on a continuous scale, and each cost function's
multiplier affects the
+ * relative importance of every other cost function. So it is difficult to
meaningfully and clearly
+ * value many aspects of your region distribution via cost functions alone.
Conditionals allow you
+ * to very clearly define discrete rules that your balancer would ideally
follow. To clarify, a
+ * conditional violation will not block a region assignment because we would
prefer to have uptime
+ * than have perfectly intentional balance. But conditionals allow you to, for
example, define that
+ * a region's primary and secondary should not live on the same rack. Another
example, conditionals
+ * make it easy to define that system tables will ideally be isolated on their
own RegionServer
+ * (without needing to manage distinct RegionServer groups).
+ */
[email protected]
+final class BalancerConditionals implements Configurable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(BalancerConditionals.class);
+
+ public static final String DISTRIBUTE_REPLICAS_KEY =
+ "hbase.master.balancer.stochastic.conditionals.distributeReplicas";
+ public static final boolean DISTRIBUTE_REPLICAS_DEFAULT = false;
+
+ public static final String ADDITIONAL_CONDITIONALS_KEY =
+ "hbase.master.balancer.stochastic.additionalConditionals";
+
+ private Set<Class<? extends RegionPlanConditional>> conditionalClasses =
Collections.emptySet();
+ private Set<RegionPlanConditional> conditionals = Collections.emptySet();
+ private Configuration conf;
+
+ static BalancerConditionals create() {
+ return new BalancerConditionals();
+ }
+
+ private BalancerConditionals() {
+ }
+
+ boolean shouldRunBalancer(BalancerClusterState cluster) {
+ return isConditionalBalancingEnabled() && conditionals.stream()
+
.map(RegionPlanConditional::getCandidateGenerators).flatMap(Collection::stream)
+ .map(generator -> generator.getWeight(cluster)).anyMatch(weight ->
weight > 0);
+ }
+
+ Set<Class<? extends RegionPlanConditional>> getConditionalClasses() {
+ return new HashSet<>(conditionalClasses);
+ }
+
+ Collection<RegionPlanConditional> getConditionals() {
+ return conditionals;
+ }
+
+ boolean isReplicaDistributionEnabled() {
+ return conditionalClasses.stream()
+ .anyMatch(DistributeReplicasConditional.class::isAssignableFrom);
+ }
+
+ boolean shouldSkipSloppyServerEvaluation() {
+ return isConditionalBalancingEnabled();
+ }
+
+ boolean isConditionalBalancingEnabled() {
+ return !conditionalClasses.isEmpty();
+ }
+
+ void clearConditionalWeightCaches() {
+ conditionals.stream().map(RegionPlanConditional::getCandidateGenerators)
+ .flatMap(Collection::stream)
+ .forEach(RegionPlanConditionalCandidateGenerator::clearWeightCache);
+ }
+
+ void loadClusterState(BalancerClusterState cluster) {
+ conditionals = conditionalClasses.stream().map(clazz ->
createConditional(clazz, cluster))
+ .filter(Objects::nonNull).collect(Collectors.toSet());
+ }
+
+ /**
+ * Indicates whether the action is good for our conditional compliance.
+ * @param cluster The cluster state
+ * @param action The proposed action
+ * @return -1 if conditionals improve, 0 if neutral, 1 if conditionals
degrade
+ */
+ int getViolationCountChange(BalancerClusterState cluster, BalanceAction
action) {
+ // Cluster is in pre-move state, so figure out the proposed violations
+ boolean isViolatingPost = isViolating(cluster, action);
+ cluster.doAction(action);
+
+ // Cluster is in post-move state, so figure out the original violations
+ BalanceAction undoAction = action.undoAction();
+ boolean isViolatingPre = isViolating(cluster, undoAction);
+
+ // Reset cluster
+ cluster.doAction(undoAction);
+
+ if (isViolatingPre && isViolatingPost) {
+ return 0;
+ } else if (!isViolatingPre && isViolatingPost) {
+ return 1;
+ } else {
+ return -1;
+ }
+ }
+
+ /**
+ * Check if the proposed action violates conditionals
+ * @param cluster The cluster state
+ * @param action The proposed action
+ */
+ boolean isViolating(BalancerClusterState cluster, BalanceAction action) {
+ conditionals.forEach(conditional -> conditional.setClusterState(cluster));
+ if (conditionals.isEmpty()) {
+ return false;
+ }
+ List<RegionPlan> regionPlans = action.toRegionPlans(cluster);
+ for (RegionPlan regionPlan : regionPlans) {
+ if (isViolating(regionPlan)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean isViolating(RegionPlan regionPlan) {
+ for (RegionPlanConditional conditional : conditionals) {
+ if (conditional.isViolating(regionPlan)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private RegionPlanConditional createConditional(Class<? extends
RegionPlanConditional> clazz,
+ BalancerClusterState cluster) {
+ if (cluster == null) {
+ cluster = new BalancerClusterState(Collections.emptyMap(), null, null,
null, null);
+ }
+ try {
+ Constructor<? extends RegionPlanConditional> ctor =
+ clazz.getDeclaredConstructor(BalancerConditionals.class,
BalancerClusterState.class);
+ return ReflectionUtils.instantiate(clazz.getName(), ctor, this, cluster);
+ } catch (NoSuchMethodException e) {
+ LOG.warn("Cannot find constructor with Configuration and "
+ + "BalancerClusterState parameters for class '{}': {}",
clazz.getName(), e.getMessage());
+ }
+ return null;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ ImmutableSet.Builder<Class<? extends RegionPlanConditional>>
conditionalClasses =
+ ImmutableSet.builder();
+
+ boolean distributeReplicas =
+ conf.getBoolean(DISTRIBUTE_REPLICAS_KEY, DISTRIBUTE_REPLICAS_DEFAULT);
+ if (distributeReplicas) {
+ conditionalClasses.add(DistributeReplicasConditional.class);
+ }
+
+ Class<?>[] classes = conf.getClasses(ADDITIONAL_CONDITIONALS_KEY);
+ for (Class<?> clazz : classes) {
+ if (!RegionPlanConditional.class.isAssignableFrom(clazz)) {
+ LOG.warn("Class {} is not a RegionPlanConditional", clazz.getName());
+ continue;
+ }
+ conditionalClasses.add(clazz.asSubclass(RegionPlanConditional.class));
+ }
+ this.conditionalClasses = conditionalClasses.build();
+ ReplicaKeyCache.getInstance().setConf(conf);
+ loadClusterState(null);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
index 07cd5892086..fac0d82fe01 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
@@ -77,6 +77,9 @@ public abstract class BaseLoadBalancer implements
LoadBalancer {
public static final boolean DEFAULT_HBASE_MASTER_LOADBALANCE_BYTABLE = false;
+ public static final String REGIONS_SLOP_KEY = "hbase.regions.slop";
+ public static final float REGIONS_SLOP_DEFAULT = 0.2f;
+
protected static final int MIN_SERVER_BALANCE = 2;
private volatile boolean stopped = false;
@@ -256,7 +259,9 @@ public abstract class BaseLoadBalancer implements
LoadBalancer {
float average = cs.getLoadAverage(); // for logging
int floor = (int) Math.floor(average * (1 - slop));
int ceiling = (int) Math.ceil(average * (1 + slop));
- if (!(cs.getMaxLoad() > ceiling || cs.getMinLoad() < floor)) {
+ int maxLoad = cs.getMaxLoad();
+ int minLoad = cs.getMinLoad();
+ if (!(maxLoad > ceiling || minLoad < floor)) {
NavigableMap<ServerAndLoad, List<RegionInfo>> serversByLoad =
cs.getServersByLoad();
if (LOG.isTraceEnabled()) {
// If nothing to balance, then don't say anything unless trace-level
logging.
@@ -549,7 +554,7 @@ public abstract class BaseLoadBalancer implements
LoadBalancer {
}
protected float getDefaultSlop() {
- return 0.2f;
+ return REGIONS_SLOP_DEFAULT;
}
private RegionLocationFinder createRegionLocationFinder(Configuration conf) {
@@ -560,9 +565,8 @@ public abstract class BaseLoadBalancer implements
LoadBalancer {
}
protected void loadConf(Configuration conf) {
- this.slop = conf.getFloat("hbase.regions.slop", getDefaultSlop());
+ this.slop = conf.getFloat(REGIONS_SLOP_KEY, getDefaultSlop());
this.rackManager = new RackManager(getConf());
- this.onlySystemTablesOnMaster =
LoadBalancer.isSystemTablesOnlyOnMaster(conf);
useRegionFinder = conf.getBoolean("hbase.master.balancer.uselocality",
true);
if (useRegionFinder) {
regionFinder = createRegionLocationFinder(conf);
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java
index 385ed564c7d..d651cd49d1d 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java
@@ -69,7 +69,7 @@ public class CacheAwareLoadBalancer extends
StochasticLoadBalancer {
@Override
protected Map<Class<? extends CandidateGenerator>, CandidateGenerator>
- createCandidateGenerators() {
+ createCandidateGenerators(Configuration conf) {
Map<Class<? extends CandidateGenerator>, CandidateGenerator>
candidateGenerators =
new HashMap<>(2);
candidateGenerators.put(CacheAwareSkewnessCandidateGenerator.class,
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CandidateGenerator.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CandidateGenerator.java
index d9245495e20..642e8162fff 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CandidateGenerator.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CandidateGenerator.java
@@ -28,6 +28,8 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
abstract class CandidateGenerator {
+ protected static final double MAX_WEIGHT = 1.0;
+
abstract BalanceAction generate(BalancerClusterState cluster);
/**
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CostFunction.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CostFunction.java
index 1dcd4580b1a..ee2fc2b6a5e 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CostFunction.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CostFunction.java
@@ -76,6 +76,13 @@ abstract class CostFunction {
regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer());
regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer());
break;
+ case MOVE_BATCH:
+ MoveBatchAction mba = (MoveBatchAction) action;
+ for (MoveRegionAction moveRegionAction : mba.getMoveActions()) {
+ regionMoved(moveRegionAction.getRegion(),
moveRegionAction.getFromServer(),
+ moveRegionAction.getToServer());
+ }
+ break;
default:
throw new RuntimeException("Uknown action:" + action.getType());
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/DistributeReplicasCandidateGenerator.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/DistributeReplicasCandidateGenerator.java
new file mode 100644
index 00000000000..38fbcc4a0fb
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/DistributeReplicasCandidateGenerator.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import static
org.apache.hadoop.hbase.master.balancer.DistributeReplicasConditional.getReplicaKey;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.hbase.master.balancer.replicas.ReplicaKey;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * CandidateGenerator to distribute colocated replicas across different
servers.
+ */
[email protected]
+final class DistributeReplicasCandidateGenerator extends
RegionPlanConditionalCandidateGenerator {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DistributeReplicasCandidateGenerator.class);
+ private static final int BATCH_SIZE = 100_000;
+
+ DistributeReplicasCandidateGenerator(BalancerConditionals
balancerConditionals) {
+ super(balancerConditionals);
+ }
+
+ @Override
+ BalanceAction generateCandidate(BalancerClusterState cluster, boolean
isWeighing) {
+ return generateCandidate(cluster, isWeighing, false);
+ }
+
+ BalanceAction generateCandidate(BalancerClusterState cluster, boolean
isWeighing,
+ boolean isForced) {
+ if (cluster.getMaxReplicas() < cluster.numRacks) {
+ LOG.trace("Skipping replica distribution as there are not enough racks
to distribute them.");
+ return BalanceAction.NULL_ACTION;
+ }
+
+ // Iterate through shuffled servers to find colocated replicas
+ boolean foundColocatedReplicas = false;
+ List<MoveRegionAction> moveRegionActions = new ArrayList<>();
+ List<Integer> shuffledServerIndices = cluster.getShuffledServerIndices();
+ for (int sourceIndex : shuffledServerIndices) {
+ if (
+ moveRegionActions.size() >= BATCH_SIZE
+ || EnvironmentEdgeManager.currentTime() >
cluster.getStopRequestedAt()
+ ) {
+ break;
+ }
+ int[] serverRegions = cluster.regionsPerServer[sourceIndex];
+ Set<ReplicaKey> replicaKeys = new HashSet<>(serverRegions.length);
+ for (int regionIndex : serverRegions) {
+ ReplicaKey replicaKey = getReplicaKey(cluster.regions[regionIndex]);
+ if (replicaKeys.contains(replicaKey)) {
+ foundColocatedReplicas = true;
+ if (isWeighing) {
+ // If weighing, fast exit with an actionable move
+ return getAction(sourceIndex, regionIndex,
pickOtherRandomServer(cluster, sourceIndex),
+ -1);
+ }
+ // If not weighing, pick a good move
+ for (int i = 0; i < cluster.numServers; i++) {
+ // Randomize destination ordering so we aren't overloading one
destination
+ int destinationIndex = pickOtherRandomServer(cluster, sourceIndex);
+ if (destinationIndex == sourceIndex) {
+ continue;
+ }
+ MoveRegionAction possibleAction =
+ new MoveRegionAction(regionIndex, sourceIndex, destinationIndex);
+ if (isForced) {
+ return possibleAction;
+ }
+ if (willBeAccepted(cluster, possibleAction)) {
+ cluster.doAction(possibleAction); // Update cluster state to
reflect move
+ moveRegionActions.add(possibleAction);
+ break;
+ }
+ }
+ } else {
+ replicaKeys.add(replicaKey);
+ }
+ }
+ }
+
+ if (!moveRegionActions.isEmpty()) {
+ return batchMovesAndResetClusterState(cluster, moveRegionActions);
+ }
+ // If no colocated replicas are found, return NULL_ACTION
+ if (foundColocatedReplicas) {
+ LOG.warn("Could not find a place to put a colocated replica! We will
force a move.");
+ return generateCandidate(cluster, isWeighing, true);
+ }
+ LOG.trace("No colocated replicas found. No balancing action required.");
+ return BalanceAction.NULL_ACTION;
+ }
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/DistributeReplicasConditional.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/DistributeReplicasConditional.java
new file mode 100644
index 00000000000..2cd27615e5f
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/DistributeReplicasConditional.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.master.RegionPlan;
+import org.apache.hadoop.hbase.master.balancer.replicas.ReplicaKey;
+import org.apache.hadoop.hbase.master.balancer.replicas.ReplicaKeyCache;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+
+/**
+ * If enabled, this class will help the balancer ensure that replicas aren't
placed on the same
+ * servers or racks as their primary. Configure this via
+ * {@link BalancerConditionals#DISTRIBUTE_REPLICAS_KEY}
+ */
[email protected]
+public class DistributeReplicasConditional extends RegionPlanConditional {
+
+ private final List<RegionPlanConditionalCandidateGenerator>
candidateGenerators;
+
+ public DistributeReplicasConditional(BalancerConditionals
balancerConditionals,
+ BalancerClusterState cluster) {
+ super(balancerConditionals.getConf(), cluster);
+ Configuration conf = balancerConditionals.getConf();
+ float slop =
+ conf.getFloat(BaseLoadBalancer.REGIONS_SLOP_KEY,
BaseLoadBalancer.REGIONS_SLOP_DEFAULT);
+ this.candidateGenerators =
+ ImmutableList.of(new
DistributeReplicasCandidateGenerator(balancerConditionals),
+ new SlopFixingCandidateGenerator(balancerConditionals, slop));
+ }
+
+ @Override
+ public ValidationLevel getValidationLevel() {
+ return ValidationLevel.SERVER_HOST_RACK;
+ }
+
+ @Override
+ List<RegionPlanConditionalCandidateGenerator> getCandidateGenerators() {
+ return candidateGenerators;
+ }
+
+ @Override
+ boolean isViolatingServer(RegionPlan regionPlan, Set<RegionInfo>
serverRegions) {
+ return checkViolation(regionPlan.getRegionInfo(),
getReplicaKey(regionPlan.getRegionInfo()),
+ serverRegions);
+ }
+
+ @Override
+ boolean isViolatingHost(RegionPlan regionPlan, Set<RegionInfo> hostRegions) {
+ return checkViolation(regionPlan.getRegionInfo(),
getReplicaKey(regionPlan.getRegionInfo()),
+ hostRegions);
+ }
+
+ @Override
+ boolean isViolatingRack(RegionPlan regionPlan, Set<RegionInfo> rackRegions) {
+ return checkViolation(regionPlan.getRegionInfo(),
getReplicaKey(regionPlan.getRegionInfo()),
+ rackRegions);
+ }
+
+ private boolean checkViolation(RegionInfo movingRegion, ReplicaKey
movingReplicaKey,
+ Set<RegionInfo> destinationRegions) {
+ for (RegionInfo regionInfo : destinationRegions) {
+ if (regionInfo.equals(movingRegion)) {
+ continue;
+ }
+ if (getReplicaKey(regionInfo).equals(movingReplicaKey)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ static ReplicaKey getReplicaKey(RegionInfo regionInfo) {
+ return ReplicaKeyCache.getInstance().getReplicaKey(regionInfo);
+ }
+
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java
index db4c7c95b65..98ad3beac8d 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java
@@ -81,7 +81,7 @@ public class FavoredStochasticBalancer extends
StochasticLoadBalancer
@Override
protected Map<Class<? extends CandidateGenerator>, CandidateGenerator>
- createCandidateGenerators() {
+ createCandidateGenerators(Configuration conf) {
Map<Class<? extends CandidateGenerator>, CandidateGenerator> fnPickers =
new HashMap<>(2);
fnPickers.put(FavoredNodeLoadPicker.class, new FavoredNodeLoadPicker());
fnPickers.put(FavoredNodeLocalityPicker.class, new
FavoredNodeLocalityPicker());
@@ -90,7 +90,7 @@ public class FavoredStochasticBalancer extends
StochasticLoadBalancer
/** Returns any candidate generator in random */
@Override
- protected CandidateGenerator getRandomGenerator() {
+ protected CandidateGenerator getRandomGenerator(BalancerClusterState
cluster) {
Class<? extends CandidateGenerator> clazz = shuffledGeneratorClasses.get()
.get(ThreadLocalRandom.current().nextInt(candidateGenerators.size()));
return candidateGenerators.get(clazz);
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/MoveBatchAction.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/MoveBatchAction.java
new file mode 100644
index 00000000000..e7ea3ed15e1
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/MoveBatchAction.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.hbase.master.RegionPlan;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
+import org.apache.hbase.thirdparty.com.google.common.collect.Multimaps;
+
[email protected]
+public class MoveBatchAction extends BalanceAction {
+ private final List<MoveRegionAction> moveActions;
+
+ MoveBatchAction(List<MoveRegionAction> moveActions) {
+ super(Type.MOVE_BATCH);
+ this.moveActions = moveActions;
+ }
+
+ @Override
+ BalanceAction undoAction() {
+ List<MoveRegionAction> undoMoves = new
ArrayList<>(getMoveActions().size());
+ for (int i = getMoveActions().size() - 1; i >= 0; i--) {
+ MoveRegionAction move = getMoveActions().get(i);
+ undoMoves
+ .add(new MoveRegionAction(move.getRegion(), move.getToServer(),
move.getFromServer()));
+ }
+ return new MoveBatchAction(undoMoves);
+ }
+
+ @Override
+ List<RegionPlan> toRegionPlans(BalancerClusterState cluster) {
+ List<RegionPlan> mbRegionPlans = new ArrayList<>(getMoveActions().size());
+ for (MoveRegionAction moveRegionAction : getMoveActions()) {
+ mbRegionPlans.add(new
RegionPlan(cluster.regions[moveRegionAction.getRegion()],
+ cluster.servers[moveRegionAction.getFromServer()],
+ cluster.servers[moveRegionAction.getToServer()]));
+ }
+ return mbRegionPlans;
+ }
+
+ @Override
+ long getStepCount() {
+ return moveActions.size();
+ }
+
+ public HashMultimap<Integer, Integer> getServerToRegionsToRemove() {
+ return
moveActions.stream().collect(Multimaps.toMultimap(MoveRegionAction::getFromServer,
+ MoveRegionAction::getRegion, HashMultimap::create));
+ }
+
+ public HashMultimap<Integer, Integer> getServerToRegionsToAdd() {
+ return
moveActions.stream().collect(Multimaps.toMultimap(MoveRegionAction::getToServer,
+ MoveRegionAction::getRegion, HashMultimap::create));
+ }
+
+ List<MoveRegionAction> getMoveActions() {
+ return moveActions;
+ }
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/MoveRegionAction.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/MoveRegionAction.java
index 547c9c5b28e..9798e9cebe8 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/MoveRegionAction.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/MoveRegionAction.java
@@ -17,8 +17,12 @@
*/
package org.apache.hadoop.hbase.master.balancer;
+import java.util.List;
+import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+
@InterfaceAudience.Private
class MoveRegionAction extends BalanceAction {
private final int region;
@@ -49,6 +53,12 @@ class MoveRegionAction extends BalanceAction {
return new MoveRegionAction(region, toServer, fromServer);
}
+ @Override
+ List<RegionPlan> toRegionPlans(BalancerClusterState cluster) {
+ return ImmutableList.of(new RegionPlan(cluster.regions[getRegion()],
+ cluster.servers[getFromServer()], cluster.servers[getToServer()]));
+ }
+
@Override
public String toString() {
return getType() + ": " + region + ":" + fromServer + " -> " + toServer;
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionPlanConditional.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionPlanConditional.java
new file mode 100644
index 00000000000..8de371d341c
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionPlanConditional.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.master.RegionPlan;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
[email protected](HBaseInterfaceAudience.CONFIG)
[email protected]
+public abstract class RegionPlanConditional {
+ private static final Logger LOG =
LoggerFactory.getLogger(RegionPlanConditional.class);
+ private BalancerClusterState cluster;
+
+ RegionPlanConditional(Configuration conf, BalancerClusterState cluster) {
+ this.cluster = cluster;
+ }
+
+ public enum ValidationLevel {
+ /**
+ * Just check the server.
+ */
+ SERVER,
+ /**
+ * Check the server and the host.
+ */
+ SERVER_HOST,
+ /**
+ * Check the server, host, and rack.
+ */
+ SERVER_HOST_RACK
+ }
+
+ void setClusterState(BalancerClusterState cluster) {
+ this.cluster = cluster;
+ }
+
+ /**
+ * Returns a {@link ValidationLevel} that is appropriate for this
conditional.
+ * @return the validation level
+ */
+ abstract ValidationLevel getValidationLevel();
+
+ /**
+ * Get the candidate generator(s) for this conditional. This can be useful
to provide the balancer
+ * with hints that will appease your conditional. Your conditionals will be
triggered in order.
+ * @return the candidate generator for this conditional
+ */
+ abstract List<RegionPlanConditionalCandidateGenerator>
getCandidateGenerators();
+
+ /**
+ * Check if the conditional is violated by the given region plan.
+ * @param regionPlan the region plan to check
+ * @return true if the conditional is violated
+ */
+ boolean isViolating(RegionPlan regionPlan) {
+ if (regionPlan == null) {
+ return false;
+ }
+ int destinationServerIdx =
cluster.serversToIndex.get(regionPlan.getDestination().getAddress());
+
+ // Check Server
+ int[] destinationRegionIndices =
cluster.regionsPerServer[destinationServerIdx];
+ Set<RegionInfo> serverRegions =
Arrays.stream(cluster.regionsPerServer[destinationServerIdx])
+ .mapToObj(idx -> cluster.regions[idx]).collect(Collectors.toSet());
+ for (int regionIdx : destinationRegionIndices) {
+ serverRegions.add(cluster.regions[regionIdx]);
+ }
+ if (isViolatingServer(regionPlan, serverRegions)) {
+ return true;
+ }
+
+ if (getValidationLevel() == ValidationLevel.SERVER) {
+ return false;
+ }
+
+ // Check Host
+ int hostIdx = cluster.serverIndexToHostIndex[destinationServerIdx];
+ Set<RegionInfo> hostRegions =
Arrays.stream(cluster.regionsPerHost[hostIdx])
+ .mapToObj(idx -> cluster.regions[idx]).collect(Collectors.toSet());
+ if (isViolatingHost(regionPlan, hostRegions)) {
+ return true;
+ }
+
+ if (getValidationLevel() == ValidationLevel.SERVER_HOST) {
+ return false;
+ }
+
+ // Check Rack
+ int rackIdx = cluster.serverIndexToRackIndex[destinationServerIdx];
+ Set<RegionInfo> rackRegions =
Arrays.stream(cluster.regionsPerRack[rackIdx])
+ .mapToObj(idx -> cluster.regions[idx]).collect(Collectors.toSet());
+ if (isViolatingRack(regionPlan, rackRegions)) {
+ return true;
+ }
+
+ return false;
+ }
+
+ abstract boolean isViolatingServer(RegionPlan regionPlan, Set<RegionInfo>
destinationRegions);
+
+ boolean isViolatingHost(RegionPlan regionPlan, Set<RegionInfo>
destinationRegions) {
+ return false;
+ }
+
+ boolean isViolatingRack(RegionPlan regionPlan, Set<RegionInfo>
destinationRegions) {
+ return false;
+ }
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionPlanConditionalCandidateGenerator.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionPlanConditionalCandidateGenerator.java
new file mode 100644
index 00000000000..f8274841f72
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionPlanConditionalCandidateGenerator.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import java.time.Duration;
+import java.util.List;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
[email protected]
[email protected]
+public abstract class RegionPlanConditionalCandidateGenerator extends
CandidateGenerator {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(RegionPlanConditionalCandidateGenerator.class);
+
+ private static final Duration WEIGHT_CACHE_TTL = Duration.ofMinutes(1);
+ private long lastWeighedAt = -1;
+ private double lastWeight = 0.0;
+
+ private final BalancerConditionals balancerConditionals;
+
+ RegionPlanConditionalCandidateGenerator(BalancerConditionals
balancerConditionals) {
+ this.balancerConditionals = balancerConditionals;
+ }
+
+ BalancerConditionals getBalancerConditionals() {
+ return this.balancerConditionals;
+ }
+
+ /**
+ * Generates a balancing action to appease the conditional.
+ * @param cluster Current state of the cluster.
+ * @param isWeighing Flag indicating if the generator is being used for
weighing.
+ * @return A BalanceAction, or NULL_ACTION if no action is needed.
+ */
+ abstract BalanceAction generateCandidate(BalancerClusterState cluster,
boolean isWeighing);
+
+ @Override
+ BalanceAction generate(BalancerClusterState cluster) {
+ BalanceAction balanceAction = generateCandidate(cluster, false);
+ if (!willBeAccepted(cluster, balanceAction)) {
+ LOG.debug("Generated action is not widely accepted by all conditionals. "
+ + "Likely we are finding our way out of a deadlock. balanceAction={}",
balanceAction);
+ }
+ return balanceAction;
+ }
+
+ MoveBatchAction batchMovesAndResetClusterState(BalancerClusterState cluster,
+ List<MoveRegionAction> moves) {
+ MoveBatchAction batchAction = new MoveBatchAction(moves);
+ undoBatchAction(cluster, batchAction);
+ return batchAction;
+ }
+
+ boolean willBeAccepted(BalancerClusterState cluster, BalanceAction action) {
+ BalancerConditionals balancerConditionals = getBalancerConditionals();
+ if (balancerConditionals == null) {
+ return true;
+ }
+ return !balancerConditionals.isViolating(cluster, action);
+ }
+
+ void undoBatchAction(BalancerClusterState cluster, MoveBatchAction
batchAction) {
+ for (int i = batchAction.getMoveActions().size() - 1; i >= 0; i--) {
+ MoveRegionAction action = batchAction.getMoveActions().get(i);
+ cluster.doAction(action.undoAction());
+ }
+ }
+
+ void clearWeightCache() {
+ lastWeighedAt = -1;
+ }
+
+ double getWeight(BalancerClusterState cluster) {
+ boolean hasCandidate = false;
+
+ // Candidate generation is expensive, so for re-weighing generators we
will cache
+ // the value for a bit
+ if (EnvironmentEdgeManager.currentTime() - lastWeighedAt <
WEIGHT_CACHE_TTL.toMillis()) {
+ return lastWeight;
+ } else {
+ hasCandidate = generateCandidate(cluster, true) !=
BalanceAction.NULL_ACTION;
+ lastWeighedAt = EnvironmentEdgeManager.currentTime();
+ }
+
+ if (hasCandidate) {
+ // If this generator has something to do, then it's important
+ lastWeight = CandidateGenerator.MAX_WEIGHT;
+ } else {
+ lastWeight = 0;
+ }
+ return lastWeight;
+ }
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SlopFixingCandidateGenerator.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SlopFixingCandidateGenerator.java
new file mode 100644
index 00000000000..070e4903394
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SlopFixingCandidateGenerator.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A simple candidate generator that attempts to move regions from the
most-loaded servers to the
+ * least-loaded servers.
+ */
[email protected]
+final class SlopFixingCandidateGenerator extends
RegionPlanConditionalCandidateGenerator {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SlopFixingCandidateGenerator.class);
+
+ private final float slop;
+
+ SlopFixingCandidateGenerator(BalancerConditionals balancerConditionals,
float slop) {
+ super(balancerConditionals);
+ this.slop = slop;
+ }
+
+ @Override
+ BalanceAction generateCandidate(BalancerClusterState cluster, boolean
isWeighing) {
+ ClusterLoadState cs = new ClusterLoadState(cluster.clusterState);
+ float average = cs.getLoadAverage();
+ int ceiling = (int) Math.ceil(average * (1 + slop));
+ Set<Integer> sloppyServerIndices = new HashSet<>();
+ for (int i = 0; i < cluster.numServers; i++) {
+ int regionCount = cluster.regionsPerServer[i].length;
+ if (regionCount > ceiling) {
+ sloppyServerIndices.add(i);
+ }
+ }
+
+ if (sloppyServerIndices.isEmpty()) {
+ LOG.trace("No action to take because no sloppy servers exist.");
+ return BalanceAction.NULL_ACTION;
+ }
+
+ List<MoveRegionAction> moves = new ArrayList<>();
+ Set<ServerAndLoad> fixedServers = new HashSet<>();
+ for (int sourceServer : sloppyServerIndices) {
+ for (int regionIdx : cluster.regionsPerServer[sourceServer]) {
+ boolean regionFoundMove = false;
+ for (ServerAndLoad serverAndLoad : cs.getServersByLoad().keySet()) {
+ ServerName destinationServer = serverAndLoad.getServerName();
+ int destinationServerIdx =
cluster.serversToIndex.get(destinationServer.getAddress());
+ int regionsOnDestination =
cluster.regionsPerServer[destinationServerIdx].length;
+ if (regionsOnDestination < average) {
+ MoveRegionAction move =
+ new MoveRegionAction(regionIdx, sourceServer,
destinationServerIdx);
+ if (willBeAccepted(cluster, move)) {
+ if (isWeighing) {
+ // Fast exit for weighing candidate
+ return move;
+ }
+ moves.add(move);
+ cluster.doAction(move);
+ regionFoundMove = true;
+ break;
+ }
+ } else {
+ fixedServers.add(serverAndLoad);
+ }
+ }
+ fixedServers.forEach(s -> cs.getServersByLoad().remove(s));
+ fixedServers.clear();
+ if (!regionFoundMove) {
+ LOG.debug("Could not find a destination for region {} from server
{}.", regionIdx,
+ sourceServer);
+ }
+ if (cluster.regionsPerServer[sourceServer].length <= ceiling) {
+ break;
+ }
+ }
+ }
+
+ MoveBatchAction batch = new MoveBatchAction(moves);
+ undoBatchAction(cluster, batch);
+ return batch;
+ }
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
index fca4ef95207..42784ea4440 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
@@ -54,7 +54,6 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.base.Suppliers;
/**
@@ -192,6 +191,8 @@ public class StochasticLoadBalancer extends
BaseLoadBalancer {
return shuffled;
}, 5, TimeUnit.SECONDS);
+ private final BalancerConditionals balancerConditionals =
BalancerConditionals.create();
+
/**
* The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer
to replace its
* default MetricsBalancer
@@ -244,16 +245,24 @@ public class StochasticLoadBalancer extends
BaseLoadBalancer {
}
protected Map<Class<? extends CandidateGenerator>, CandidateGenerator>
- createCandidateGenerators() {
- Map<Class<? extends CandidateGenerator>, CandidateGenerator>
candidateGenerators =
- new HashMap<>(5);
- candidateGenerators.put(RandomCandidateGenerator.class, new
RandomCandidateGenerator());
- candidateGenerators.put(LoadCandidateGenerator.class, new
LoadCandidateGenerator());
- candidateGenerators.put(LocalityBasedCandidateGenerator.class,
localityCandidateGenerator);
- candidateGenerators.put(RegionReplicaCandidateGenerator.class,
- new RegionReplicaCandidateGenerator());
- candidateGenerators.put(RegionReplicaRackCandidateGenerator.class,
- new RegionReplicaRackCandidateGenerator());
+ createCandidateGenerators(Configuration conf) {
+ balancerConditionals.setConf(conf);
+ Map<Class<? extends CandidateGenerator>, CandidateGenerator>
candidateGenerators;
+ if (balancerConditionals.isReplicaDistributionEnabled()) {
+ candidateGenerators = new HashMap<>(3);
+ candidateGenerators.put(RandomCandidateGenerator.class, new
RandomCandidateGenerator());
+ candidateGenerators.put(LoadCandidateGenerator.class, new
LoadCandidateGenerator());
+ candidateGenerators.put(LocalityBasedCandidateGenerator.class,
localityCandidateGenerator);
+ } else {
+ candidateGenerators = new HashMap<>(5);
+ candidateGenerators.put(RandomCandidateGenerator.class, new
RandomCandidateGenerator());
+ candidateGenerators.put(LoadCandidateGenerator.class, new
LoadCandidateGenerator());
+ candidateGenerators.put(LocalityBasedCandidateGenerator.class,
localityCandidateGenerator);
+ candidateGenerators.put(RegionReplicaCandidateGenerator.class,
+ new RegionReplicaCandidateGenerator());
+ candidateGenerators.put(RegionReplicaRackCandidateGenerator.class,
+ new RegionReplicaRackCandidateGenerator());
+ }
return candidateGenerators;
}
@@ -288,7 +297,8 @@ public class StochasticLoadBalancer extends
BaseLoadBalancer {
localityCost = new ServerLocalityCostFunction(conf);
rackLocalityCost = new RackLocalityCostFunction(conf);
- this.candidateGenerators = createCandidateGenerators();
+ balancerConditionals.setConf(conf);
+ this.candidateGenerators = createCandidateGenerators(conf);
regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
@@ -377,6 +387,11 @@ public class StochasticLoadBalancer extends
BaseLoadBalancer {
}
private boolean areSomeRegionReplicasColocatedOnHost(BalancerClusterState c)
{
+ if (!c.hasRegionReplicas ||
balancerConditionals.isReplicaDistributionEnabled()) {
+ // This check is unnecessary without replicas, or with conditional
replica distribution
+ // The balancer will auto-run if conditional replica distribution
candidates are available
+ return false;
+ }
if (c.numHosts >= c.maxReplicas) {
regionReplicaHostCostFunction.prepare(c);
double hostCost = Math.abs(regionReplicaHostCostFunction.cost());
@@ -390,6 +405,11 @@ public class StochasticLoadBalancer extends
BaseLoadBalancer {
}
private boolean areSomeRegionReplicasColocatedOnRack(BalancerClusterState c)
{
+ if (!c.hasRegionReplicas ||
balancerConditionals.isReplicaDistributionEnabled()) {
+ // This check is unnecessary without replicas, or with conditional
replica distribution
+ // The balancer will auto-run if conditional replica distribution
candidates are available
+ return false;
+ }
if (c.numRacks >= c.maxReplicas) {
regionReplicaRackCostFunction.prepare(c);
double rackCost = Math.abs(regionReplicaRackCostFunction.cost());
@@ -441,6 +461,11 @@ public class StochasticLoadBalancer extends
BaseLoadBalancer {
return true;
}
+ if (balancerConditionals.shouldRunBalancer(cluster)) {
+ LOG.info("Running balancer because conditional candidate generators have
important moves");
+ return true;
+ }
+
double total = 0.0;
float localSumMultiplier = 0; // in case this.sumMultiplier is not
initialized
for (CostFunction c : costFunctions) {
@@ -470,14 +495,17 @@ public class StochasticLoadBalancer extends
BaseLoadBalancer {
}
LOG.info(
"{} - skipping load balancing because weighted average imbalance={} <=
"
- + "threshold({}). If you want more aggressive balancing, either
lower "
+ + "threshold({}) and conditionals do not have opinionated move
candidates. "
+ + "If you want more aggressive balancing, either lower "
+ "hbase.master.balancer.stochastic.minCostNeedBalance from {} or
increase the relative "
+ "multiplier(s) of the specific cost function(s). functionCost={}",
isByTable ? "Table specific (" + tableName + ")" : "Cluster wide",
total / sumMultiplier,
minCostNeedBalance, minCostNeedBalance, functionCost());
} else {
- LOG.info("{} - Calculating plan. may take up to {}ms to complete.",
- isByTable ? "Table specific (" + tableName + ")" : "Cluster wide",
maxRunningTime);
+ LOG.info(
+ "{} - Calculating plan. may take up to {}ms to complete.
currentCost={}, targetCost={}",
+ isByTable ? "Table specific (" + tableName + ")" : "Cluster wide",
maxRunningTime, total,
+ minCostNeedBalance);
}
return !balanced;
}
@@ -485,7 +513,7 @@ public class StochasticLoadBalancer extends
BaseLoadBalancer {
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
Pair<CandidateGenerator, BalanceAction> nextAction(BalancerClusterState
cluster) {
- CandidateGenerator generator = getRandomGenerator();
+ CandidateGenerator generator = getRandomGenerator(cluster);
return Pair.newPair(generator, generator.generate(cluster));
}
@@ -494,8 +522,20 @@ public class StochasticLoadBalancer extends
BaseLoadBalancer {
* selecting a candidate generator is proportional to the share of cost of
all cost functions
* among all cost functions that benefit from it.
*/
- protected CandidateGenerator getRandomGenerator() {
- Preconditions.checkState(!candidateGenerators.isEmpty(), "No candidate
generators available.");
+ protected CandidateGenerator getRandomGenerator(BalancerClusterState
cluster) {
+ // Prefer conditional generators if they have moves to make
+ if (balancerConditionals.isConditionalBalancingEnabled()) {
+ for (RegionPlanConditional conditional :
balancerConditionals.getConditionals()) {
+ List<RegionPlanConditionalCandidateGenerator> generators =
+ conditional.getCandidateGenerators();
+ for (RegionPlanConditionalCandidateGenerator generator : generators) {
+ if (generator.getWeight(cluster) > 0) {
+ return generator;
+ }
+ }
+ }
+ }
+
List<Class<? extends CandidateGenerator>> generatorClasses =
shuffledGeneratorClasses.get();
List<Double> partialSums = new ArrayList<>(generatorClasses.size());
double sum = 0.0;
@@ -583,8 +623,12 @@ public class StochasticLoadBalancer extends
BaseLoadBalancer {
rackManager, regionCacheRatioOnOldServerMap);
long startTime = EnvironmentEdgeManager.currentTime();
+ cluster.setStopRequestedAt(startTime + maxRunningTime);
initCosts(cluster);
+ balancerConditionals.loadClusterState(cluster);
+ balancerConditionals.clearConditionalWeightCaches();
+
float localSumMultiplier = 0;
for (CostFunction c : costFunctions) {
if (c.isNeeded()) {
@@ -632,6 +676,7 @@ public class StochasticLoadBalancer extends
BaseLoadBalancer {
final String initFunctionTotalCosts = totalCostsPerFunc();
// Perform a stochastic walk to see if we can get a good fit.
long step;
+ boolean planImprovedConditionals = false;
Map<Class<? extends CandidateGenerator>, Long> generatorToStepCount = new
HashMap<>();
Map<Class<? extends CandidateGenerator>, Long>
generatorToApprovedActionCount = new HashMap<>();
for (step = 0; step < computedMaxSteps; step++) {
@@ -643,16 +688,53 @@ public class StochasticLoadBalancer extends
BaseLoadBalancer {
continue;
}
- cluster.doAction(action);
+ int conditionalViolationsChange = 0;
+ boolean isViolatingConditionals = false;
+ boolean moveImprovedConditionals = false;
+ // Only check conditionals if they are enabled
+ if (balancerConditionals.isConditionalBalancingEnabled()) {
+ // Always accept a conditional generator output. Sometimes conditional
generators
+ // may need to make controversial moves in order to break what would
otherwise
+ // be a deadlocked situation.
+ // Otherwise, for normal moves, evaluate the action.
+ if
(RegionPlanConditionalCandidateGenerator.class.isAssignableFrom(generator.getClass()))
{
+ conditionalViolationsChange = -1;
+ } else {
+ conditionalViolationsChange =
+ balancerConditionals.getViolationCountChange(cluster, action);
+ isViolatingConditionals = balancerConditionals.isViolating(cluster,
action);
+ }
+ moveImprovedConditionals = conditionalViolationsChange < 0;
+ if (moveImprovedConditionals) {
+ planImprovedConditionals = true;
+ }
+ }
+
+ // Change state and evaluate costs
+ try {
+ cluster.doAction(action);
+ } catch (IllegalStateException | ArrayIndexOutOfBoundsException e) {
+ LOG.warn(
+ "Generator {} produced invalid action! "
+ + "Debug your candidate generator as this is likely a bug, "
+ + "and may cause a balancer deadlock. {}",
+ generator.getClass().getSimpleName(), action, e);
+ continue;
+ }
updateCostsAndWeightsWithAction(cluster, action);
- generatorToStepCount.merge(generator.getClass(), 1L, Long::sum);
+ generatorToStepCount.merge(generator.getClass(), action.getStepCount(),
Long::sum);
newCost = computeCost(cluster, currentCost);
- // Should this be kept?
- if (newCost < currentCost) {
+ boolean conditionalsSimilarCostsImproved =
+ (newCost < currentCost && conditionalViolationsChange == 0 &&
!isViolatingConditionals);
+ // Our first priority is to reduce conditional violations
+ // Our second priority is to reduce balancer cost
+ // change, regardless of cost change
+ if (moveImprovedConditionals || conditionalsSimilarCostsImproved) {
currentCost = newCost;
- generatorToApprovedActionCount.merge(generator.getClass(), 1L,
Long::sum);
+ generatorToApprovedActionCount.merge(generator.getClass(),
action.getStepCount(),
+ Long::sum);
// save for JMX
curOverallCost = currentCost;
@@ -665,7 +747,7 @@ public class StochasticLoadBalancer extends
BaseLoadBalancer {
updateCostsAndWeightsWithAction(cluster, undoAction);
}
- if (EnvironmentEdgeManager.currentTime() - startTime > maxRunningTime) {
+ if (EnvironmentEdgeManager.currentTime() > cluster.getStopRequestedAt())
{
break;
}
}
@@ -682,7 +764,7 @@ public class StochasticLoadBalancer extends
BaseLoadBalancer {
metricsBalancer.balanceCluster(endTime - startTime);
- if (initCost > currentCost) {
+ if (planImprovedConditionals || (initCost > currentCost)) {
updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
plans = createRegionPlans(cluster);
LOG.info(
@@ -697,7 +779,8 @@ public class StochasticLoadBalancer extends
BaseLoadBalancer {
}
LOG.info(
"Could not find a better moving plan. Tried {} different configurations
in "
- + "{} ms, and did not find anything with an imbalance score less than
{}",
+ + "{} ms, and did not find anything with an imbalance score less than
{} "
+ + "and could not improve conditional violations",
step, endTime - startTime, initCost / sumMultiplier);
return null;
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SwapRegionsAction.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SwapRegionsAction.java
index 6f83d2bc930..c99de022f03 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SwapRegionsAction.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SwapRegionsAction.java
@@ -17,8 +17,12 @@
*/
package org.apache.hadoop.hbase.master.balancer;
+import java.util.List;
+import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+
@InterfaceAudience.Private
public class SwapRegionsAction extends BalanceAction {
private final int fromServer;
@@ -55,6 +59,15 @@ public class SwapRegionsAction extends BalanceAction {
return new SwapRegionsAction(fromServer, toRegion, toServer, fromRegion);
}
+ @Override
+ List<RegionPlan> toRegionPlans(BalancerClusterState cluster) {
+ return ImmutableList.of(
+ new RegionPlan(cluster.regions[getFromRegion()],
cluster.servers[getFromServer()],
+ cluster.servers[getToServer()]),
+ new RegionPlan(cluster.regions[getToRegion()],
cluster.servers[getToServer()],
+ cluster.servers[getFromServer()]));
+ }
+
@Override
public String toString() {
return getType() + ": " + fromRegion + ":" + fromServer + " <-> " +
toRegion + ":" + toServer;
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/replicas/ReplicaKey.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/replicas/ReplicaKey.java
new file mode 100644
index 00000000000..f43df965da3
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/replicas/ReplicaKey.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer.replicas;
+
+import java.util.Arrays;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.yetus.audience.InterfaceAudience;
+
[email protected]
+public final class ReplicaKey {
+ private final TableName tableName;
+ private final byte[] start;
+ private final byte[] stop;
+
+ public ReplicaKey(RegionInfo regionInfo) {
+ this.tableName = regionInfo.getTable();
+ this.start = regionInfo.getStartKey();
+ this.stop = regionInfo.getEndKey();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof ReplicaKey)) {
+ return false;
+ }
+ ReplicaKey other = (ReplicaKey) o;
+ return Arrays.equals(this.start, other.start) && Arrays.equals(this.stop,
other.stop)
+ && this.tableName.equals(other.tableName);
+ }
+
+ @Override
+ public int hashCode() {
+ return new
HashCodeBuilder().append(tableName).append(start).append(stop).toHashCode();
+ }
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/replicas/ReplicaKeyCache.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/replicas/ReplicaKeyCache.java
new file mode 100644
index 00000000000..a40e5f9a2f2
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/replicas/ReplicaKeyCache.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer.replicas;
+
+import java.time.Duration;
+import java.util.function.Supplier;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Suppliers;
+import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
+import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
+import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
+
[email protected]
+public final class ReplicaKeyCache implements Configurable {
+ /**
+ * ReplicaKey creation is expensive if you have lots of regions. If your
HMaster has adequate
+ * memory, and you would like balancing to be faster, then you can turn on
this flag to cache
+ * ReplicaKey objects.
+ */
+ public static final String CACHE_REPLICA_KEYS_KEY =
+ "hbase.replica.distribution.conditional.cacheReplicaKeys";
+ public static final boolean CACHE_REPLICA_KEYS_DEFAULT = false;
+
+ /**
+ * If memory is available, then set this to a value greater than your region
count to maximize
+ * replica distribution performance.
+ */
+ public static final String REPLICA_KEY_CACHE_SIZE_KEY =
+ "hbase.replica.distribution.conditional.replicaKeyCacheSize";
+ public static final int REPLICA_KEY_CACHE_SIZE_DEFAULT = 1000;
+
+ private static final Supplier<ReplicaKeyCache> INSTANCE =
Suppliers.memoize(ReplicaKeyCache::new);
+
+ private volatile LoadingCache<RegionInfo, ReplicaKey> replicaKeyCache = null;
+
+ private Configuration conf;
+
+ public static ReplicaKeyCache getInstance() {
+ return INSTANCE.get();
+ }
+
+ private ReplicaKeyCache() {
+ }
+
+ public ReplicaKey getReplicaKey(RegionInfo regionInfo) {
+ return replicaKeyCache == null
+ ? new ReplicaKey(regionInfo)
+ : replicaKeyCache.getUnchecked(regionInfo);
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ boolean cacheKeys = conf.getBoolean(CACHE_REPLICA_KEYS_KEY,
CACHE_REPLICA_KEYS_DEFAULT);
+ if (cacheKeys && replicaKeyCache == null) {
+ int replicaKeyCacheSize =
+ conf.getInt(REPLICA_KEY_CACHE_SIZE_KEY,
REPLICA_KEY_CACHE_SIZE_DEFAULT);
+ replicaKeyCache =
CacheBuilder.newBuilder().maximumSize(replicaKeyCacheSize)
+ .expireAfterAccess(Duration.ofMinutes(30)).build(new
CacheLoader<RegionInfo, ReplicaKey>() {
+ @Override
+ public ReplicaKey load(RegionInfo regionInfo) {
+ return new ReplicaKey(regionInfo);
+ }
+ });
+ } else if (!cacheKeys) {
+ replicaKeyCache = null;
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerConditionalsTestUtil.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerConditionalsTestUtil.java
new file mode 100644
index 00000000000..0678cc3b67f
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerConditionalsTestUtil.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.quotas.QuotaUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
+
+public final class BalancerConditionalsTestUtil {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(BalancerConditionalsTestUtil.class);
+
+ private BalancerConditionalsTestUtil() {
+ }
+
+ static byte[][] generateSplits(int numRegions) {
+ byte[][] splitKeys = new byte[numRegions - 1][];
+ for (int i = 0; i < numRegions - 1; i++) {
+ splitKeys[i] =
+ Bytes.toBytes(String.format("%09d", (i + 1) * (Integer.MAX_VALUE /
numRegions)));
+ }
+ return splitKeys;
+ }
+
+ static void printRegionLocations(Connection connection) throws IOException {
+ Admin admin = connection.getAdmin();
+
+ // Get all table names in the cluster
+ Set<TableName> tableNames = admin.listTableDescriptors().stream()
+ .map(TableDescriptor::getTableName).collect(Collectors.toSet());
+
+ // Group regions by server
+ Map<ServerName, Map<TableName, List<RegionInfo>>> serverToRegions =
+ admin.getClusterMetrics().getLiveServerMetrics().keySet().stream()
+ .collect(Collectors.toMap(server -> server, server -> {
+ try {
+ return listRegionsByTable(connection, server, tableNames);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }));
+
+ // Pretty print region locations
+ StringBuilder regionLocationOutput = new StringBuilder();
+ regionLocationOutput.append("Pretty printing region locations...\n");
+ serverToRegions.forEach((server, tableRegions) -> {
+ regionLocationOutput.append("Server: " + server.getServerName() + "\n");
+ tableRegions.forEach((table, regions) -> {
+ if (regions.isEmpty()) {
+ return;
+ }
+ regionLocationOutput.append(" Table: " + table.getNameAsString() +
"\n");
+ regions.forEach(region -> regionLocationOutput
+ .append(String.format(" Region: %s, start: %s, end: %s, replica:
%s\n",
+ region.getEncodedName(), Bytes.toString(region.getStartKey()),
+ Bytes.toString(region.getEndKey()), region.getReplicaId())));
+ });
+ });
+ LOG.info(regionLocationOutput.toString());
+ }
+
+ private static Map<TableName, List<RegionInfo>>
listRegionsByTable(Connection connection,
+ ServerName server, Set<TableName> tableNames) throws IOException {
+ Admin admin = connection.getAdmin();
+
+ // Find regions for each table
+ return tableNames.stream().collect(Collectors.toMap(tableName ->
tableName, tableName -> {
+ List<RegionInfo> allRegions = null;
+ try {
+ allRegions = admin.getRegions(server);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return allRegions.stream().filter(region ->
region.getTable().equals(tableName))
+ .collect(Collectors.toList());
+ }));
+ }
+
+ static void validateReplicaDistribution(Connection connection, TableName
tableName,
+ boolean shouldBeDistributed) {
+ Map<ServerName, List<RegionInfo>> serverToRegions = null;
+ try {
+ serverToRegions =
connection.getRegionLocator(tableName).getAllRegionLocations().stream()
+ .collect(Collectors.groupingBy(location -> location.getServerName(),
+ Collectors.mapping(location -> location.getRegion(),
Collectors.toList())));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ if (shouldBeDistributed) {
+ // Ensure no server hosts more than one replica of any region
+ for (Map.Entry<ServerName, List<RegionInfo>> serverAndRegions :
serverToRegions.entrySet()) {
+ List<RegionInfo> regionInfos = serverAndRegions.getValue();
+ Set<byte[]> startKeys = new HashSet<>();
+ for (RegionInfo regionInfo : regionInfos) {
+ // each region should have a distinct start key
+ assertFalse(
+ "Each region should have its own start key, "
+ + "demonstrating it is not a replica of any others on this host",
+ startKeys.contains(regionInfo.getStartKey()));
+ startKeys.add(regionInfo.getStartKey());
+ }
+ }
+ } else {
+ // Ensure all replicas are on the same server
+ assertEquals("All regions should share one server", 1,
serverToRegions.size());
+ }
+ }
+
+ static void validateRegionLocations(Map<TableName, Set<ServerName>>
tableToServers,
+ TableName productTableName, boolean shouldBeBalanced) {
+ ServerName metaServer =
+ tableToServers.get(TableName.META_TABLE_NAME).stream().findFirst().get();
+ ServerName quotaServer =
+
tableToServers.get(QuotaUtil.QUOTA_TABLE_NAME).stream().findFirst().get();
+ Set<ServerName> productServers = tableToServers.get(productTableName);
+
+ if (shouldBeBalanced) {
+ for (ServerName server : productServers) {
+ assertNotEquals("Meta table and product table should not share
servers", server,
+ metaServer);
+ assertNotEquals("Quota table and product table should not share
servers", server,
+ quotaServer);
+ }
+ assertNotEquals("The meta server and quotas server should be different",
metaServer,
+ quotaServer);
+ } else {
+ for (ServerName server : productServers) {
+ assertEquals("Meta table and product table must share servers",
server, metaServer);
+ assertEquals("Quota table and product table must share servers",
server, quotaServer);
+ }
+ assertEquals("The meta server and quotas server must be the same",
metaServer, quotaServer);
+ }
+ }
+
+ static Map<TableName, Set<ServerName>> getTableToServers(Connection
connection,
+ Set<TableName> tableNames) {
+ return tableNames.stream().collect(Collectors.toMap(t -> t, t -> {
+ try {
+ return connection.getRegionLocator(t).getAllRegionLocations().stream()
+ .map(HRegionLocation::getServerName).collect(Collectors.toSet());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }));
+ }
+
+ @FunctionalInterface
+ interface AssertionRunnable {
+ void run() throws AssertionError;
+ }
+
+ static void validateAssertionsWithRetries(HBaseTestingUtility testUtil,
+ boolean runBalancerOnFailure, AssertionRunnable assertion) {
+ validateAssertionsWithRetries(testUtil, runBalancerOnFailure,
ImmutableSet.of(assertion));
+ }
+
+ static void validateAssertionsWithRetries(HBaseTestingUtility testUtil,
+ boolean runBalancerOnFailure, Set<AssertionRunnable> assertions) {
+ int maxAttempts = 50;
+ for (int i = 0; i < maxAttempts; i++) {
+ try {
+ for (AssertionRunnable assertion : assertions) {
+ assertion.run();
+ }
+ } catch (AssertionError e) {
+ if (i == maxAttempts - 1) {
+ throw e;
+ }
+ try {
+ LOG.warn("Failed to validate region locations. Will retry", e);
+ Thread.sleep(1000);
+
BalancerConditionalsTestUtil.printRegionLocations(testUtil.getConnection());
+ if (runBalancerOnFailure) {
+ testUtil.getAdmin().balance();
+ }
+ Thread.sleep(1000);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+ }
+
+}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/CandidateGeneratorTestUtil.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/CandidateGeneratorTestUtil.java
new file mode 100644
index 00000000000..4f6e8f70f30
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/CandidateGeneratorTestUtil.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import static
org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer.MAX_RUNNING_TIME_KEY;
+import static
org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer.MIN_COST_NEED_BALANCE_KEY;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionPlan;
+import org.apache.hadoop.hbase.master.balancer.replicas.ReplicaKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class CandidateGeneratorTestUtil {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CandidateGeneratorTestUtil.class);
+
+ private static final MasterServices MOCK_MASTER_SERVICES =
mock(MasterServices.class);
+
+ private CandidateGeneratorTestUtil() {
+ }
+
+ static void runBalancerToExhaustion(Configuration conf,
+ Map<ServerName, List<RegionInfo>> serverToRegions,
+ Set<Function<BalancerClusterState, Boolean>> expectations, float
targetMaxBalancerCost) {
+ // Do the full plan. We're testing with a lot of regions
+ conf.setBoolean("hbase.master.balancer.stochastic.runMaxSteps", true);
+ conf.setLong(MAX_RUNNING_TIME_KEY, 15000);
+
+ conf.setFloat(MIN_COST_NEED_BALANCE_KEY, targetMaxBalancerCost);
+
+ BalancerClusterState cluster =
createMockBalancerClusterState(serverToRegions);
+ StochasticLoadBalancer stochasticLoadBalancer =
buildStochasticLoadBalancer(cluster, conf);
+ printClusterDistribution(cluster, 0);
+ int balancerRuns = 0;
+ int actionsTaken = 0;
+ long balancingMillis = 0;
+ boolean isBalanced = false;
+ while (!isBalanced) {
+ balancerRuns++;
+ if (balancerRuns > 1000) {
+ throw new RuntimeException("Balancer failed to find balance & meet
expectations");
+ }
+ long start = System.currentTimeMillis();
+ List<RegionPlan> regionPlans =
+
stochasticLoadBalancer.balanceCluster(partitionRegionsByTable(serverToRegions));
+ balancingMillis += System.currentTimeMillis() - start;
+ actionsTaken++;
+ if (regionPlans != null) {
+ // Apply all plans to serverToRegions
+ for (RegionPlan rp : regionPlans) {
+ ServerName source = rp.getSource();
+ ServerName dest = rp.getDestination();
+ RegionInfo region = rp.getRegionInfo();
+
+ // Update serverToRegions
+ serverToRegions.get(source).remove(region);
+ serverToRegions.get(dest).add(region);
+ actionsTaken++;
+ }
+
+ // Now rebuild cluster and balancer from updated serverToRegions
+ cluster = createMockBalancerClusterState(serverToRegions);
+ stochasticLoadBalancer = buildStochasticLoadBalancer(cluster, conf);
+ }
+ printClusterDistribution(cluster, actionsTaken);
+ isBalanced = true;
+ for (Function<BalancerClusterState, Boolean> condition : expectations) {
+ // Check if we've met all expectations for the candidate generator
+ if (!condition.apply(cluster)) {
+ isBalanced = false;
+ break;
+ }
+ }
+ if (isBalanced) { // Check if the balancer thinks we're done too
+ LOG.info("All balancer conditions passed. Checking if balancer thinks
it's done.");
+ if
(stochasticLoadBalancer.needsBalance(HConstants.ENSEMBLE_TABLE_NAME, cluster)) {
+ LOG.info("Balancer would still like to run");
+ isBalanced = false;
+ } else {
+ LOG.info("Balancer is done");
+ }
+ }
+ }
+ LOG.info("Balancing took {}sec",
Duration.ofMillis(balancingMillis).toMinutes());
+ }
+
+ /**
+ * Prints the current cluster distribution of regions per table per server
+ */
+ static void printClusterDistribution(BalancerClusterState cluster, long
actionsTaken) {
+ LOG.info("=== Cluster Distribution after {} balancer actions taken ===",
actionsTaken);
+
+ for (int i = 0; i < cluster.numServers; i++) {
+ int[] regions = cluster.regionsPerServer[i];
+ int regionCount = (regions == null) ? 0 : regions.length;
+
+ LOG.info("Server {}: {} regions", cluster.servers[i].getServerName(),
regionCount);
+
+ if (regionCount > 0) {
+ Map<TableName, Integer> tableRegionCounts = new HashMap<>();
+
+ for (int regionIndex : regions) {
+ RegionInfo regionInfo = cluster.regions[regionIndex];
+ TableName tableName = regionInfo.getTable();
+ tableRegionCounts.put(tableName,
tableRegionCounts.getOrDefault(tableName, 0) + 1);
+ }
+
+ tableRegionCounts
+ .forEach((table, count) -> LOG.info(" - Table {}: {} regions",
table, count));
+ }
+ }
+
+ LOG.info("===========================================");
+ }
+
+ /**
+ * Partitions the given serverToRegions map by table The tables are derived
from the RegionInfo
+ * objects found in serverToRegions.
+ * @param serverToRegions The map of servers to their assigned regions.
+ * @return A map of tables to their server-to-region assignments.
+ */
+ public static Map<TableName, Map<ServerName, List<RegionInfo>>>
+ partitionRegionsByTable(Map<ServerName, List<RegionInfo>> serverToRegions)
{
+
+ // First, gather all tables from the regions
+ Set<TableName> allTables = new HashSet<>();
+ for (List<RegionInfo> regions : serverToRegions.values()) {
+ for (RegionInfo region : regions) {
+ allTables.add(region.getTable());
+ }
+ }
+
+ Map<TableName, Map<ServerName, List<RegionInfo>>> tablesToServersToRegions
= new HashMap<>();
+
+ // Initialize each table with all servers mapped to empty lists
+ for (TableName table : allTables) {
+ Map<ServerName, List<RegionInfo>> serverMap = new HashMap<>();
+ for (ServerName server : serverToRegions.keySet()) {
+ serverMap.put(server, new ArrayList<>());
+ }
+ tablesToServersToRegions.put(table, serverMap);
+ }
+
+ // Distribute regions to their respective tables
+ for (Map.Entry<ServerName, List<RegionInfo>> serverAndRegions :
serverToRegions.entrySet()) {
+ ServerName server = serverAndRegions.getKey();
+ List<RegionInfo> regions = serverAndRegions.getValue();
+
+ for (RegionInfo region : regions) {
+ TableName regionTable = region.getTable();
+ // Now we know for sure regionTable is in allTables
+ Map<ServerName, List<RegionInfo>> tableServerMap =
+ tablesToServersToRegions.get(regionTable);
+ tableServerMap.get(server).add(region);
+ }
+ }
+
+ return tablesToServersToRegions;
+ }
+
+ static StochasticLoadBalancer
buildStochasticLoadBalancer(BalancerClusterState cluster,
+ Configuration conf) {
+ StochasticLoadBalancer stochasticLoadBalancer =
+ new StochasticLoadBalancer(new DummyMetricsStochasticBalancer());
+ when(MOCK_MASTER_SERVICES.getConfiguration()).thenReturn(conf);
+ stochasticLoadBalancer.setMasterServices(MOCK_MASTER_SERVICES);
+ stochasticLoadBalancer.loadConf(conf);
+ stochasticLoadBalancer.initCosts(cluster);
+ return stochasticLoadBalancer;
+ }
+
+ static BalancerClusterState
+ createMockBalancerClusterState(Map<ServerName, List<RegionInfo>>
serverToRegions) {
+ return new BalancerClusterState(serverToRegions, null, null, null, null);
+ }
+
+ /**
+ * Validates that each replica is isolated from its others. Ensures that no
server hosts more than
+ * one replica of the same region (i.e., regions with identical start and
end keys).
+ * @param cluster The current state of the cluster.
+ * @return true if all replicas are properly isolated, false otherwise.
+ */
+ static boolean areAllReplicasDistributed(BalancerClusterState cluster) {
+ // Iterate over each server
+ for (int[] regionsPerServer : cluster.regionsPerServer) {
+ if (regionsPerServer == null || regionsPerServer.length == 0) {
+ continue; // Skip empty servers
+ }
+
+ Set<ReplicaKey> foundKeys = new HashSet<>();
+ for (int regionIndex : regionsPerServer) {
+ RegionInfo regionInfo = cluster.regions[regionIndex];
+ ReplicaKey replicaKey = new ReplicaKey(regionInfo);
+ if (foundKeys.contains(replicaKey)) {
+ // Violation: Multiple replicas of the same region on the same server
+ LOG.warn("Replica isolation violated: one server hosts multiple
replicas of key [{}].",
+ generateRegionKey(regionInfo));
+ return false;
+ }
+
+ foundKeys.add(replicaKey);
+ }
+ }
+
+ LOG.info(
+ "Replica isolation validation passed: No server hosts multiple replicas
of the same region.");
+ return true;
+ }
+
+ /**
+ * Generates a unique key for a region based on its start and end keys. This
method ensures that
+ * regions with identical start and end keys have the same key.
+ * @param regionInfo The RegionInfo object.
+ * @return A string representing the unique key of the region.
+ */
+ private static String generateRegionKey(RegionInfo regionInfo) {
+ // Using Base64 encoding for byte arrays to ensure uniqueness and
readability
+ String startKey =
Base64.getEncoder().encodeToString(regionInfo.getStartKey());
+ String endKey = Base64.getEncoder().encodeToString(regionInfo.getEndKey());
+
+ return regionInfo.getTable().getNameAsString() + ":" + startKey + ":" +
endKey;
+ }
+
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalanceAction.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/DistributeReplicasTestConditional.java
similarity index 57%
copy from
hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalanceAction.java
copy to
hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/DistributeReplicasTestConditional.java
index 56b473ae710..5a8fa2524fe 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalanceAction.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/DistributeReplicasTestConditional.java
@@ -17,42 +17,23 @@
*/
package org.apache.hadoop.hbase.master.balancer;
-import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
-/**
- * An action to move or swap a region
- */
[email protected]
-abstract class BalanceAction {
- enum Type {
- ASSIGN_REGION,
- MOVE_REGION,
- SWAP_REGIONS,
- NULL,
- }
-
- static final BalanceAction NULL_ACTION = new BalanceAction(Type.NULL) {
- };
-
- private final Type type;
-
- BalanceAction(Type type) {
- this.type = type;
- }
+public class DistributeReplicasTestConditional extends
DistributeReplicasConditional {
- /**
- * Returns an Action which would undo this action
- */
- BalanceAction undoAction() {
- return this;
+ static void enableConditionalReplicaDistributionForTest(Configuration conf) {
+ conf.set(BalancerConditionals.ADDITIONAL_CONDITIONALS_KEY,
+ DistributeReplicasTestConditional.class.getCanonicalName());
}
- Type getType() {
- return type;
+ public DistributeReplicasTestConditional(BalancerConditionals
balancerConditionals,
+ BalancerClusterState cluster) {
+ super(balancerConditionals, cluster);
}
@Override
- public String toString() {
- return type + ":";
+ public ValidationLevel getValidationLevel() {
+ // Mini-cluster tests can't validate at host/rack levels
+ return ValidationLevel.SERVER;
}
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/LoadOnlyFavoredStochasticBalancer.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/LoadOnlyFavoredStochasticBalancer.java
index d658f7cfa16..dfacad1a747 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/LoadOnlyFavoredStochasticBalancer.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/LoadOnlyFavoredStochasticBalancer.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master.balancer;
import java.util.HashMap;
import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
/**
* Used for FavoredNode unit tests
@@ -27,7 +28,7 @@ public class LoadOnlyFavoredStochasticBalancer extends
FavoredStochasticBalancer
@Override
protected Map<Class<? extends CandidateGenerator>, CandidateGenerator>
- createCandidateGenerators() {
+ createCandidateGenerators(Configuration conf) {
Map<Class<? extends CandidateGenerator>, CandidateGenerator> fnPickers =
new HashMap<>(1);
fnPickers.put(FavoredNodeLoadPicker.class, new FavoredNodeLoadPicker());
return fnPickers;
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBalancerConditionals.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBalancerConditionals.java
new file mode 100644
index 00000000000..884331f161a
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBalancerConditionals.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ SmallTests.class, MasterTests.class })
+public class TestBalancerConditionals extends BalancerTestBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestBalancerConditionals.class);
+
+ private BalancerConditionals balancerConditionals;
+ private BalancerClusterState mockCluster;
+
+ @Before
+ public void setUp() {
+ balancerConditionals = BalancerConditionals.create();
+ mockCluster = mockCluster(new int[] { 0, 1, 2 });
+ }
+
+ @Test
+ public void testDefaultConfiguration() {
+ Configuration conf = new Configuration();
+ balancerConditionals.setConf(conf);
+ balancerConditionals.loadClusterState(mockCluster);
+
+ assertEquals("No conditionals should be loaded by default", 0,
+ balancerConditionals.getConditionalClasses().size());
+ }
+
+ @Test
+ public void testCustomConditionalsViaConfiguration() {
+ Configuration conf = new Configuration();
+ conf.set(BalancerConditionals.ADDITIONAL_CONDITIONALS_KEY,
+ DistributeReplicasConditional.class.getName());
+
+ balancerConditionals.setConf(conf);
+ balancerConditionals.loadClusterState(mockCluster);
+
+ assertTrue("Custom conditionals should be loaded",
+ balancerConditionals.shouldSkipSloppyServerEvaluation());
+ }
+
+ @Test
+ public void testInvalidCustomConditionalClass() {
+ Configuration conf = new Configuration();
+ conf.set(BalancerConditionals.ADDITIONAL_CONDITIONALS_KEY,
"java.lang.String");
+
+ balancerConditionals.setConf(conf);
+ balancerConditionals.loadClusterState(mockCluster);
+
+ assertEquals("Invalid classes should not be loaded as conditionals", 0,
+ balancerConditionals.getConditionalClasses().size());
+ }
+
+}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestLargeClusterBalancingConditionalReplicaDistribution.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestLargeClusterBalancingConditionalReplicaDistribution.java
new file mode 100644
index 00000000000..9e0a6f24e10
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestLargeClusterBalancingConditionalReplicaDistribution.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import static
org.apache.hadoop.hbase.master.balancer.CandidateGeneratorTestUtil.runBalancerToExhaustion;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.master.balancer.replicas.ReplicaKeyCache;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
+
+@Category({ MediumTests.class, MasterTests.class })
+public class TestLargeClusterBalancingConditionalReplicaDistribution {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+
HBaseClassTestRule.forClass(TestLargeClusterBalancingConditionalReplicaDistribution.class);
+
+ private static final Logger LOG =
+
LoggerFactory.getLogger(TestLargeClusterBalancingConditionalReplicaDistribution.class);
+
+ private static final int NUM_SERVERS = 1000;
+ private static final int NUM_REGIONS = 20_000;
+ private static final int NUM_REPLICAS = 3;
+ private static final int NUM_TABLES = 100;
+
+ private static final ServerName[] servers = new ServerName[NUM_SERVERS];
+ private static final Map<ServerName, List<RegionInfo>> serverToRegions = new
HashMap<>();
+
+ @BeforeClass
+ public static void setup() {
+ // Initialize servers
+ for (int i = 0; i < NUM_SERVERS; i++) {
+ servers[i] = ServerName.valueOf("server" + i, i,
System.currentTimeMillis());
+ serverToRegions.put(servers[i], new ArrayList<>());
+ }
+
+ // Create primary regions and their replicas
+ List<RegionInfo> allRegions = new ArrayList<>();
+ for (int i = 0; i < NUM_REGIONS; i++) {
+ TableName tableName = getTableName(i);
+ // Define startKey and endKey for the region
+ byte[] startKey = Bytes.toBytes(i);
+ byte[] endKey = Bytes.toBytes(i + 1);
+
+ // Create 3 replicas for each primary region
+ for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) {
+ RegionInfo regionInfo =
RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey)
+ .setEndKey(endKey).setReplicaId(replicaId).build();
+ allRegions.add(regionInfo);
+ }
+ }
+
+ // Assign all regions to one server
+ for (RegionInfo regionInfo : allRegions) {
+ serverToRegions.get(servers[0]).add(regionInfo);
+ }
+ }
+
+ private static TableName getTableName(int i) {
+ return TableName.valueOf("userTable" + i % NUM_TABLES);
+ }
+
+ @Test
+ public void testReplicaDistribution() {
+ Configuration conf = new Configuration();
+
DistributeReplicasTestConditional.enableConditionalReplicaDistributionForTest(conf);
+ conf.setBoolean(ReplicaKeyCache.CACHE_REPLICA_KEYS_KEY, true);
+ conf.setInt(ReplicaKeyCache.REPLICA_KEY_CACHE_SIZE_KEY, Integer.MAX_VALUE);
+ conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 30_000);
+
+ // turn off replica cost functions
+ conf.setLong("hbase.master.balancer.stochastic.regionReplicaRackCostKey",
0);
+ conf.setLong("hbase.master.balancer.stochastic.regionReplicaHostCostKey",
0);
+
+ runBalancerToExhaustion(conf, serverToRegions,
+ ImmutableSet.of(CandidateGeneratorTestUtil::areAllReplicasDistributed),
10.0f);
+ LOG.info("Meta table and system table regions are successfully isolated, "
+ + "meanwhile region replicas are appropriately distributed across
RegionServers.");
+ }
+}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestReplicaDistributionBalancerConditional.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestReplicaDistributionBalancerConditional.java
new file mode 100644
index 00000000000..7807b07e74f
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestReplicaDistributionBalancerConditional.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import static
org.apache.hadoop.hbase.master.balancer.BalancerConditionalsTestUtil.validateAssertionsWithRetries;
+
+import java.util.List;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ LargeTests.class, MasterTests.class })
+public class TestReplicaDistributionBalancerConditional {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+
HBaseClassTestRule.forClass(TestReplicaDistributionBalancerConditional.class);
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestReplicaDistributionBalancerConditional.class);
+ private static final HBaseTestingUtility TEST_UTIL = new
HBaseTestingUtility();
+ private static final int REPLICAS = 3;
+ private static final int NUM_SERVERS = REPLICAS;
+ private static final int REGIONS_PER_SERVER = 5;
+
+ @Before
+ public void setUp() throws Exception {
+ DistributeReplicasTestConditional
+
.enableConditionalReplicaDistributionForTest(TEST_UTIL.getConfiguration());
+ TEST_UTIL.getConfiguration()
+ .setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY,
true);
+ TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_BALANCER_PERIOD,
1000L);
+
TEST_UTIL.getConfiguration().setBoolean("hbase.master.balancer.stochastic.runMaxSteps",
true);
+
+ // turn off replica cost functions
+ TEST_UTIL.getConfiguration()
+ .setLong("hbase.master.balancer.stochastic.regionReplicaRackCostKey", 0);
+ TEST_UTIL.getConfiguration()
+ .setLong("hbase.master.balancer.stochastic.regionReplicaHostCostKey", 0);
+
+ TEST_UTIL.startMiniCluster(NUM_SERVERS);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testReplicaDistribution() throws Exception {
+ Connection connection = TEST_UTIL.getConnection();
+ Admin admin = connection.getAdmin();
+
+ // Create a "replicated_table" with region replicas
+ TableName replicatedTableName = TableName.valueOf("replicated_table");
+ TableDescriptor replicatedTableDescriptor =
+ TableDescriptorBuilder.newBuilder(replicatedTableName)
+
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("0")).build())
+ .setRegionReplication(REPLICAS).build();
+ admin.createTable(replicatedTableDescriptor,
+ BalancerConditionalsTestUtil.generateSplits(REGIONS_PER_SERVER *
NUM_SERVERS));
+
+ // Pause the balancer
+ admin.balancerSwitch(false, true);
+
+ // Collect all region replicas and place them on one RegionServer
+ List<RegionInfo> allRegions = admin.getRegions(replicatedTableName);
+ String targetServer =
+
TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName().getServerName();
+
+ for (RegionInfo region : allRegions) {
+ admin.move(region.getEncodedNameAsBytes(), Bytes.toBytes(targetServer));
+ }
+
+
BalancerConditionalsTestUtil.printRegionLocations(TEST_UTIL.getConnection());
+ validateAssertionsWithRetries(TEST_UTIL, false, () ->
BalancerConditionalsTestUtil
+ .validateReplicaDistribution(connection, replicatedTableName, false));
+
+ // Unpause the balancer and trigger balancing
+ admin.balancerSwitch(true, true);
+ admin.balance();
+
+ validateAssertionsWithRetries(TEST_UTIL, true, () ->
BalancerConditionalsTestUtil
+ .validateReplicaDistribution(connection, replicatedTableName, true));
+
BalancerConditionalsTestUtil.printRegionLocations(TEST_UTIL.getConnection());
+ }
+}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerHeterogeneousCost.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerHeterogeneousCost.java
index 960783a8467..188efa64dcd 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerHeterogeneousCost.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerHeterogeneousCost.java
@@ -254,7 +254,7 @@ public class TestStochasticLoadBalancerHeterogeneousCost
extends BalancerTestBas
}
@Override
- protected CandidateGenerator getRandomGenerator() {
+ protected CandidateGenerator getRandomGenerator(BalancerClusterState
cluster) {
return fairRandomCandidateGenerator;
}
}