This is an automated email from the ASF dual-hosted git repository. rmattingly pushed a commit to branch HBASE-28513-branch-2 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit f006a74a66f462b6b8b11d4d8bb049ac062e9e01 Author: Ray Mattingly <rmattin...@apache.org> AuthorDate: Mon Feb 24 14:29:06 2025 -0500 HBASE-28513 The StochasticLoadBalancer should support discrete evaluations (#6651) Co-authored-by: Ray Mattingly <rmattin...@hubspot.com> Signed-off-by: Nick Dimiduk <ndimi...@apache.org> --- .../hbase/master/balancer/AssignRegionAction.java | 10 + .../hbase/master/balancer/BalanceAction.java | 36 ++- .../master/balancer/BalancerClusterState.java | 91 +++++++- .../master/balancer/BalancerConditionals.java | 212 +++++++++++++++++ .../hbase/master/balancer/BaseLoadBalancer.java | 12 +- .../master/balancer/CacheAwareLoadBalancer.java | 2 +- .../hbase/master/balancer/CandidateGenerator.java | 2 + .../hadoop/hbase/master/balancer/CostFunction.java | 7 + .../DistributeReplicasCandidateGenerator.java | 115 ++++++++++ .../balancer/DistributeReplicasConditional.java | 97 ++++++++ .../master/balancer/FavoredStochasticBalancer.java | 4 +- .../hbase/master/balancer/MoveBatchAction.java | 77 +++++++ .../hbase/master/balancer/MoveRegionAction.java | 10 + .../master/balancer/RegionPlanConditional.java | 133 +++++++++++ .../RegionPlanConditionalCandidateGenerator.java | 113 ++++++++++ .../balancer/SlopFixingCandidateGenerator.java | 105 +++++++++ .../master/balancer/StochasticLoadBalancer.java | 135 ++++++++--- .../hbase/master/balancer/SwapRegionsAction.java | 13 ++ .../hbase/master/balancer/replicas/ReplicaKey.java | 54 +++++ .../master/balancer/replicas/ReplicaKeyCache.java | 93 ++++++++ .../balancer/BalancerConditionalsTestUtil.java | 221 ++++++++++++++++++ .../balancer/CandidateGeneratorTestUtil.java | 250 +++++++++++++++++++++ .../DistributeReplicasTestConditional.java} | 41 +--- .../LoadOnlyFavoredStochasticBalancer.java | 3 +- .../master/balancer/TestBalancerConditionals.java | 83 +++++++ ...terBalancingConditionalReplicaDistribution.java | 113 ++++++++++ ...TestReplicaDistributionBalancerConditional.java | 120 ++++++++++ ...estStochasticLoadBalancerHeterogeneousCost.java | 2 +- 28 files changed, 2083 insertions(+), 71 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/AssignRegionAction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/AssignRegionAction.java index c99ae092d77..8a79b64142e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/AssignRegionAction.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/AssignRegionAction.java @@ -17,9 +17,13 @@ */ package org.apache.hadoop.hbase.master.balancer; +import java.util.List; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; + @InterfaceAudience.Private class AssignRegionAction extends BalanceAction { private final int region; @@ -46,6 +50,12 @@ class AssignRegionAction extends BalanceAction { throw new UnsupportedOperationException(HConstants.NOT_IMPLEMENTED); } + @Override + List<RegionPlan> toRegionPlans(BalancerClusterState cluster) { + return ImmutableList + .of(new RegionPlan(cluster.regions[getRegion()], null, cluster.servers[getServer()])); + } + @Override public String toString() { return getType() + ": " + region + ":" + server; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalanceAction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalanceAction.java index 56b473ae710..a65b5253907 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalanceAction.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalanceAction.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hbase.master.balancer; +import java.util.Collections; +import java.util.List; +import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.yetus.audience.InterfaceAudience; /** @@ -28,11 +31,11 @@ abstract class BalanceAction { ASSIGN_REGION, MOVE_REGION, SWAP_REGIONS, + MOVE_BATCH, NULL, } - static final BalanceAction NULL_ACTION = new BalanceAction(Type.NULL) { - }; + static final BalanceAction NULL_ACTION = new NullBalanceAction(); private final Type type; @@ -43,16 +46,39 @@ abstract class BalanceAction { /** * Returns an Action which would undo this action */ - BalanceAction undoAction() { - return this; - } + abstract BalanceAction undoAction(); + + /** + * Returns the Action represented as RegionPlans + */ + abstract List<RegionPlan> toRegionPlans(BalancerClusterState cluster); Type getType() { return type; } + long getStepCount() { + return 1; + } + @Override public String toString() { return type + ":"; } + + private static final class NullBalanceAction extends BalanceAction { + private NullBalanceAction() { + super(Type.NULL); + } + + @Override + BalanceAction undoAction() { + return this; + } + + @Override + List<RegionPlan> toRegionPlans(BalancerClusterState cluster) { + return Collections.emptyList(); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java index b857055fb3a..67755fc317c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java @@ -26,6 +26,9 @@ import java.util.Deque; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import org.agrona.collections.Hashing; import org.agrona.collections.Int2IntCounterMap; import org.apache.hadoop.hbase.HDFSBlocksDistribution; @@ -39,6 +42,8 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.base.Suppliers; + /** * An efficient array based implementation similar to ClusterState for keeping the status of the * cluster in terms of region assignment and distribution. LoadBalancers, such as @@ -123,6 +128,15 @@ class BalancerClusterState { // Maps regionName -> oldServerName -> cache ratio of the region on the old server Map<String, Pair<ServerName, Float>> regionCacheRatioOnOldServerMap; + private final Supplier<List<Integer>> shuffledServerIndicesSupplier = + Suppliers.memoizeWithExpiration(() -> { + Collection<Integer> serverIndices = serversToIndex.values(); + List<Integer> shuffledServerIndices = new ArrayList<>(serverIndices); + Collections.shuffle(shuffledServerIndices); + return shuffledServerIndices; + }, 5, TimeUnit.SECONDS); + private long stopRequestedAt = Long.MAX_VALUE; + static class DefaultRackManager extends RackManager { @Override public String getRack(ServerName server) { @@ -728,8 +742,25 @@ class BalancerClusterState { regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer()); regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer()); break; + case MOVE_BATCH: + assert action instanceof MoveBatchAction : action.getClass(); + MoveBatchAction mba = (MoveBatchAction) action; + for (int serverIndex : mba.getServerToRegionsToRemove().keySet()) { + Set<Integer> regionsToRemove = mba.getServerToRegionsToRemove().get(serverIndex); + regionsPerServer[serverIndex] = + removeRegions(regionsPerServer[serverIndex], regionsToRemove); + } + for (int serverIndex : mba.getServerToRegionsToAdd().keySet()) { + Set<Integer> regionsToAdd = mba.getServerToRegionsToAdd().get(serverIndex); + regionsPerServer[serverIndex] = addRegions(regionsPerServer[serverIndex], regionsToAdd); + } + for (MoveRegionAction moveRegionAction : mba.getMoveActions()) { + regionMoved(moveRegionAction.getRegion(), moveRegionAction.getFromServer(), + moveRegionAction.getToServer()); + } + break; default: - throw new RuntimeException("Uknown action:" + action.getType()); + throw new RuntimeException("Unknown action:" + action.getType()); } } @@ -891,6 +922,52 @@ class BalancerClusterState { return newRegions; } + int[] removeRegions(int[] regions, Set<Integer> regionIndicesToRemove) { + // Calculate the size of the new regions array + int newSize = regions.length - regionIndicesToRemove.size(); + if (newSize < 0) { + throw new IllegalStateException( + "Region indices mismatch: more regions to remove than in the regions array"); + } + + int[] newRegions = new int[newSize]; + int newIndex = 0; + + // Copy only the regions not in the removal set + for (int region : regions) { + if (!regionIndicesToRemove.contains(region)) { + newRegions[newIndex++] = region; + } + } + + // If the newIndex is smaller than newSize, some regions were missing from the input array + if (newIndex != newSize) { + throw new IllegalStateException("Region indices mismatch: some regions in the removal " + + "set were not found in the regions array"); + } + + return newRegions; + } + + int[] addRegions(int[] regions, Set<Integer> regionIndicesToAdd) { + int[] newRegions = new int[regions.length + regionIndicesToAdd.size()]; + + // Copy the existing regions to the new array + System.arraycopy(regions, 0, newRegions, 0, regions.length); + + // Add the new regions at the end of the array + int newIndex = regions.length; + for (int regionIndex : regionIndicesToAdd) { + newRegions[newIndex++] = regionIndex; + } + + return newRegions; + } + + List<Integer> getShuffledServerIndices() { + return shuffledServerIndicesSupplier.get(); + } + int[] addRegionSorted(int[] regions, int regionIndex) { int[] newRegions = new int[regions.length + 1]; int i = 0; @@ -990,6 +1067,18 @@ class BalancerClusterState { this.numMovedRegions = numMovedRegions; } + public int getMaxReplicas() { + return maxReplicas; + } + + void setStopRequestedAt(long stopRequestedAt) { + this.stopRequestedAt = stopRequestedAt; + } + + long getStopRequestedAt() { + return stopRequestedAt; + } + @Override public String toString() { StringBuilder desc = new StringBuilder("Cluster={servers=["); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerConditionals.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerConditionals.java new file mode 100644 index 00000000000..6ad09519e82 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerConditionals.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import java.lang.reflect.Constructor; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.master.balancer.replicas.ReplicaKeyCache; +import org.apache.hadoop.hbase.util.ReflectionUtils; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; + +/** + * Balancer conditionals supplement cost functions in the {@link StochasticLoadBalancer}. Cost + * functions are insufficient and difficult to work with when making discrete decisions; this is + * because they operate on a continuous scale, and each cost function's multiplier affects the + * relative importance of every other cost function. So it is difficult to meaningfully and clearly + * value many aspects of your region distribution via cost functions alone. Conditionals allow you + * to very clearly define discrete rules that your balancer would ideally follow. To clarify, a + * conditional violation will not block a region assignment because we would prefer to have uptime + * than have perfectly intentional balance. But conditionals allow you to, for example, define that + * a region's primary and secondary should not live on the same rack. Another example, conditionals + * make it easy to define that system tables will ideally be isolated on their own RegionServer + * (without needing to manage distinct RegionServer groups). + */ +@InterfaceAudience.Private +final class BalancerConditionals implements Configurable { + + private static final Logger LOG = LoggerFactory.getLogger(BalancerConditionals.class); + + public static final String DISTRIBUTE_REPLICAS_KEY = + "hbase.master.balancer.stochastic.conditionals.distributeReplicas"; + public static final boolean DISTRIBUTE_REPLICAS_DEFAULT = false; + + public static final String ADDITIONAL_CONDITIONALS_KEY = + "hbase.master.balancer.stochastic.additionalConditionals"; + + private Set<Class<? extends RegionPlanConditional>> conditionalClasses = Collections.emptySet(); + private Set<RegionPlanConditional> conditionals = Collections.emptySet(); + private Configuration conf; + + static BalancerConditionals create() { + return new BalancerConditionals(); + } + + private BalancerConditionals() { + } + + boolean shouldRunBalancer(BalancerClusterState cluster) { + return isConditionalBalancingEnabled() && conditionals.stream() + .map(RegionPlanConditional::getCandidateGenerators).flatMap(Collection::stream) + .map(generator -> generator.getWeight(cluster)).anyMatch(weight -> weight > 0); + } + + Set<Class<? extends RegionPlanConditional>> getConditionalClasses() { + return Set.copyOf(conditionalClasses); + } + + Collection<RegionPlanConditional> getConditionals() { + return conditionals; + } + + boolean isReplicaDistributionEnabled() { + return conditionalClasses.stream() + .anyMatch(DistributeReplicasConditional.class::isAssignableFrom); + } + + boolean shouldSkipSloppyServerEvaluation() { + return isConditionalBalancingEnabled(); + } + + boolean isConditionalBalancingEnabled() { + return !conditionalClasses.isEmpty(); + } + + void clearConditionalWeightCaches() { + conditionals.stream().map(RegionPlanConditional::getCandidateGenerators) + .flatMap(Collection::stream) + .forEach(RegionPlanConditionalCandidateGenerator::clearWeightCache); + } + + void loadClusterState(BalancerClusterState cluster) { + conditionals = conditionalClasses.stream().map(clazz -> createConditional(clazz, cluster)) + .filter(Objects::nonNull).collect(Collectors.toSet()); + } + + /** + * Indicates whether the action is good for our conditional compliance. + * @param cluster The cluster state + * @param action The proposed action + * @return -1 if conditionals improve, 0 if neutral, 1 if conditionals degrade + */ + int getViolationCountChange(BalancerClusterState cluster, BalanceAction action) { + // Cluster is in pre-move state, so figure out the proposed violations + boolean isViolatingPost = isViolating(cluster, action); + cluster.doAction(action); + + // Cluster is in post-move state, so figure out the original violations + BalanceAction undoAction = action.undoAction(); + boolean isViolatingPre = isViolating(cluster, undoAction); + + // Reset cluster + cluster.doAction(undoAction); + + if (isViolatingPre && isViolatingPost) { + return 0; + } else if (!isViolatingPre && isViolatingPost) { + return 1; + } else { + return -1; + } + } + + /** + * Check if the proposed action violates conditionals + * @param cluster The cluster state + * @param action The proposed action + */ + boolean isViolating(BalancerClusterState cluster, BalanceAction action) { + conditionals.forEach(conditional -> conditional.setClusterState(cluster)); + if (conditionals.isEmpty()) { + return false; + } + List<RegionPlan> regionPlans = action.toRegionPlans(cluster); + for (RegionPlan regionPlan : regionPlans) { + if (isViolating(regionPlan)) { + return true; + } + } + return false; + } + + private boolean isViolating(RegionPlan regionPlan) { + for (RegionPlanConditional conditional : conditionals) { + if (conditional.isViolating(regionPlan)) { + return true; + } + } + return false; + } + + private RegionPlanConditional createConditional(Class<? extends RegionPlanConditional> clazz, + BalancerClusterState cluster) { + if (cluster == null) { + cluster = new BalancerClusterState(Collections.emptyMap(), null, null, null, null); + } + try { + Constructor<? extends RegionPlanConditional> ctor = + clazz.getDeclaredConstructor(BalancerConditionals.class, BalancerClusterState.class); + return ReflectionUtils.instantiate(clazz.getName(), ctor, this, cluster); + } catch (NoSuchMethodException e) { + LOG.warn("Cannot find constructor with Configuration and " + + "BalancerClusterState parameters for class '{}': {}", clazz.getName(), e.getMessage()); + } + return null; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + ImmutableSet.Builder<Class<? extends RegionPlanConditional>> conditionalClasses = + ImmutableSet.builder(); + + boolean distributeReplicas = + conf.getBoolean(DISTRIBUTE_REPLICAS_KEY, DISTRIBUTE_REPLICAS_DEFAULT); + if (distributeReplicas) { + conditionalClasses.add(DistributeReplicasConditional.class); + } + + Class<?>[] classes = conf.getClasses(ADDITIONAL_CONDITIONALS_KEY); + for (Class<?> clazz : classes) { + if (!RegionPlanConditional.class.isAssignableFrom(clazz)) { + LOG.warn("Class {} is not a RegionPlanConditional", clazz.getName()); + continue; + } + conditionalClasses.add(clazz.asSubclass(RegionPlanConditional.class)); + } + this.conditionalClasses = conditionalClasses.build(); + ReplicaKeyCache.getInstance().setConf(conf); + loadClusterState(null); + } + + @Override + public Configuration getConf() { + return conf; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java index 07cd5892086..fac0d82fe01 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java @@ -77,6 +77,9 @@ public abstract class BaseLoadBalancer implements LoadBalancer { public static final boolean DEFAULT_HBASE_MASTER_LOADBALANCE_BYTABLE = false; + public static final String REGIONS_SLOP_KEY = "hbase.regions.slop"; + public static final float REGIONS_SLOP_DEFAULT = 0.2f; + protected static final int MIN_SERVER_BALANCE = 2; private volatile boolean stopped = false; @@ -256,7 +259,9 @@ public abstract class BaseLoadBalancer implements LoadBalancer { float average = cs.getLoadAverage(); // for logging int floor = (int) Math.floor(average * (1 - slop)); int ceiling = (int) Math.ceil(average * (1 + slop)); - if (!(cs.getMaxLoad() > ceiling || cs.getMinLoad() < floor)) { + int maxLoad = cs.getMaxLoad(); + int minLoad = cs.getMinLoad(); + if (!(maxLoad > ceiling || minLoad < floor)) { NavigableMap<ServerAndLoad, List<RegionInfo>> serversByLoad = cs.getServersByLoad(); if (LOG.isTraceEnabled()) { // If nothing to balance, then don't say anything unless trace-level logging. @@ -549,7 +554,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } protected float getDefaultSlop() { - return 0.2f; + return REGIONS_SLOP_DEFAULT; } private RegionLocationFinder createRegionLocationFinder(Configuration conf) { @@ -560,9 +565,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } protected void loadConf(Configuration conf) { - this.slop = conf.getFloat("hbase.regions.slop", getDefaultSlop()); + this.slop = conf.getFloat(REGIONS_SLOP_KEY, getDefaultSlop()); this.rackManager = new RackManager(getConf()); - this.onlySystemTablesOnMaster = LoadBalancer.isSystemTablesOnlyOnMaster(conf); useRegionFinder = conf.getBoolean("hbase.master.balancer.uselocality", true); if (useRegionFinder) { regionFinder = createRegionLocationFinder(conf); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java index 385ed564c7d..d651cd49d1d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java @@ -69,7 +69,7 @@ public class CacheAwareLoadBalancer extends StochasticLoadBalancer { @Override protected Map<Class<? extends CandidateGenerator>, CandidateGenerator> - createCandidateGenerators() { + createCandidateGenerators(Configuration conf) { Map<Class<? extends CandidateGenerator>, CandidateGenerator> candidateGenerators = new HashMap<>(2); candidateGenerators.put(CacheAwareSkewnessCandidateGenerator.class, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CandidateGenerator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CandidateGenerator.java index d9245495e20..642e8162fff 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CandidateGenerator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CandidateGenerator.java @@ -28,6 +28,8 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private abstract class CandidateGenerator { + protected static final double MAX_WEIGHT = 1.0; + abstract BalanceAction generate(BalancerClusterState cluster); /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CostFunction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CostFunction.java index 1dcd4580b1a..ee2fc2b6a5e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CostFunction.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CostFunction.java @@ -76,6 +76,13 @@ abstract class CostFunction { regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer()); regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer()); break; + case MOVE_BATCH: + MoveBatchAction mba = (MoveBatchAction) action; + for (MoveRegionAction moveRegionAction : mba.getMoveActions()) { + regionMoved(moveRegionAction.getRegion(), moveRegionAction.getFromServer(), + moveRegionAction.getToServer()); + } + break; default: throw new RuntimeException("Uknown action:" + action.getType()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/DistributeReplicasCandidateGenerator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/DistributeReplicasCandidateGenerator.java new file mode 100644 index 00000000000..38fbcc4a0fb --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/DistributeReplicasCandidateGenerator.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import static org.apache.hadoop.hbase.master.balancer.DistributeReplicasConditional.getReplicaKey; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.hadoop.hbase.master.balancer.replicas.ReplicaKey; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * CandidateGenerator to distribute colocated replicas across different servers. + */ +@InterfaceAudience.Private +final class DistributeReplicasCandidateGenerator extends RegionPlanConditionalCandidateGenerator { + + private static final Logger LOG = + LoggerFactory.getLogger(DistributeReplicasCandidateGenerator.class); + private static final int BATCH_SIZE = 100_000; + + DistributeReplicasCandidateGenerator(BalancerConditionals balancerConditionals) { + super(balancerConditionals); + } + + @Override + BalanceAction generateCandidate(BalancerClusterState cluster, boolean isWeighing) { + return generateCandidate(cluster, isWeighing, false); + } + + BalanceAction generateCandidate(BalancerClusterState cluster, boolean isWeighing, + boolean isForced) { + if (cluster.getMaxReplicas() < cluster.numRacks) { + LOG.trace("Skipping replica distribution as there are not enough racks to distribute them."); + return BalanceAction.NULL_ACTION; + } + + // Iterate through shuffled servers to find colocated replicas + boolean foundColocatedReplicas = false; + List<MoveRegionAction> moveRegionActions = new ArrayList<>(); + List<Integer> shuffledServerIndices = cluster.getShuffledServerIndices(); + for (int sourceIndex : shuffledServerIndices) { + if ( + moveRegionActions.size() >= BATCH_SIZE + || EnvironmentEdgeManager.currentTime() > cluster.getStopRequestedAt() + ) { + break; + } + int[] serverRegions = cluster.regionsPerServer[sourceIndex]; + Set<ReplicaKey> replicaKeys = new HashSet<>(serverRegions.length); + for (int regionIndex : serverRegions) { + ReplicaKey replicaKey = getReplicaKey(cluster.regions[regionIndex]); + if (replicaKeys.contains(replicaKey)) { + foundColocatedReplicas = true; + if (isWeighing) { + // If weighing, fast exit with an actionable move + return getAction(sourceIndex, regionIndex, pickOtherRandomServer(cluster, sourceIndex), + -1); + } + // If not weighing, pick a good move + for (int i = 0; i < cluster.numServers; i++) { + // Randomize destination ordering so we aren't overloading one destination + int destinationIndex = pickOtherRandomServer(cluster, sourceIndex); + if (destinationIndex == sourceIndex) { + continue; + } + MoveRegionAction possibleAction = + new MoveRegionAction(regionIndex, sourceIndex, destinationIndex); + if (isForced) { + return possibleAction; + } + if (willBeAccepted(cluster, possibleAction)) { + cluster.doAction(possibleAction); // Update cluster state to reflect move + moveRegionActions.add(possibleAction); + break; + } + } + } else { + replicaKeys.add(replicaKey); + } + } + } + + if (!moveRegionActions.isEmpty()) { + return batchMovesAndResetClusterState(cluster, moveRegionActions); + } + // If no colocated replicas are found, return NULL_ACTION + if (foundColocatedReplicas) { + LOG.warn("Could not find a place to put a colocated replica! We will force a move."); + return generateCandidate(cluster, isWeighing, true); + } + LOG.trace("No colocated replicas found. No balancing action required."); + return BalanceAction.NULL_ACTION; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/DistributeReplicasConditional.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/DistributeReplicasConditional.java new file mode 100644 index 00000000000..2cd27615e5f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/DistributeReplicasConditional.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import java.util.List; +import java.util.Set; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.master.balancer.replicas.ReplicaKey; +import org.apache.hadoop.hbase.master.balancer.replicas.ReplicaKeyCache; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; + +/** + * If enabled, this class will help the balancer ensure that replicas aren't placed on the same + * servers or racks as their primary. Configure this via + * {@link BalancerConditionals#DISTRIBUTE_REPLICAS_KEY} + */ +@InterfaceAudience.Private +public class DistributeReplicasConditional extends RegionPlanConditional { + + private final List<RegionPlanConditionalCandidateGenerator> candidateGenerators; + + public DistributeReplicasConditional(BalancerConditionals balancerConditionals, + BalancerClusterState cluster) { + super(balancerConditionals.getConf(), cluster); + Configuration conf = balancerConditionals.getConf(); + float slop = + conf.getFloat(BaseLoadBalancer.REGIONS_SLOP_KEY, BaseLoadBalancer.REGIONS_SLOP_DEFAULT); + this.candidateGenerators = + ImmutableList.of(new DistributeReplicasCandidateGenerator(balancerConditionals), + new SlopFixingCandidateGenerator(balancerConditionals, slop)); + } + + @Override + public ValidationLevel getValidationLevel() { + return ValidationLevel.SERVER_HOST_RACK; + } + + @Override + List<RegionPlanConditionalCandidateGenerator> getCandidateGenerators() { + return candidateGenerators; + } + + @Override + boolean isViolatingServer(RegionPlan regionPlan, Set<RegionInfo> serverRegions) { + return checkViolation(regionPlan.getRegionInfo(), getReplicaKey(regionPlan.getRegionInfo()), + serverRegions); + } + + @Override + boolean isViolatingHost(RegionPlan regionPlan, Set<RegionInfo> hostRegions) { + return checkViolation(regionPlan.getRegionInfo(), getReplicaKey(regionPlan.getRegionInfo()), + hostRegions); + } + + @Override + boolean isViolatingRack(RegionPlan regionPlan, Set<RegionInfo> rackRegions) { + return checkViolation(regionPlan.getRegionInfo(), getReplicaKey(regionPlan.getRegionInfo()), + rackRegions); + } + + private boolean checkViolation(RegionInfo movingRegion, ReplicaKey movingReplicaKey, + Set<RegionInfo> destinationRegions) { + for (RegionInfo regionInfo : destinationRegions) { + if (regionInfo.equals(movingRegion)) { + continue; + } + if (getReplicaKey(regionInfo).equals(movingReplicaKey)) { + return true; + } + } + return false; + } + + static ReplicaKey getReplicaKey(RegionInfo regionInfo) { + return ReplicaKeyCache.getInstance().getReplicaKey(regionInfo); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java index db4c7c95b65..98ad3beac8d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java @@ -81,7 +81,7 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer @Override protected Map<Class<? extends CandidateGenerator>, CandidateGenerator> - createCandidateGenerators() { + createCandidateGenerators(Configuration conf) { Map<Class<? extends CandidateGenerator>, CandidateGenerator> fnPickers = new HashMap<>(2); fnPickers.put(FavoredNodeLoadPicker.class, new FavoredNodeLoadPicker()); fnPickers.put(FavoredNodeLocalityPicker.class, new FavoredNodeLocalityPicker()); @@ -90,7 +90,7 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer /** Returns any candidate generator in random */ @Override - protected CandidateGenerator getRandomGenerator() { + protected CandidateGenerator getRandomGenerator(BalancerClusterState cluster) { Class<? extends CandidateGenerator> clazz = shuffledGeneratorClasses.get() .get(ThreadLocalRandom.current().nextInt(candidateGenerators.size())); return candidateGenerators.get(clazz); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/MoveBatchAction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/MoveBatchAction.java new file mode 100644 index 00000000000..e7ea3ed15e1 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/MoveBatchAction.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap; +import org.apache.hbase.thirdparty.com.google.common.collect.Multimaps; + +@InterfaceAudience.Private +public class MoveBatchAction extends BalanceAction { + private final List<MoveRegionAction> moveActions; + + MoveBatchAction(List<MoveRegionAction> moveActions) { + super(Type.MOVE_BATCH); + this.moveActions = moveActions; + } + + @Override + BalanceAction undoAction() { + List<MoveRegionAction> undoMoves = new ArrayList<>(getMoveActions().size()); + for (int i = getMoveActions().size() - 1; i >= 0; i--) { + MoveRegionAction move = getMoveActions().get(i); + undoMoves + .add(new MoveRegionAction(move.getRegion(), move.getToServer(), move.getFromServer())); + } + return new MoveBatchAction(undoMoves); + } + + @Override + List<RegionPlan> toRegionPlans(BalancerClusterState cluster) { + List<RegionPlan> mbRegionPlans = new ArrayList<>(getMoveActions().size()); + for (MoveRegionAction moveRegionAction : getMoveActions()) { + mbRegionPlans.add(new RegionPlan(cluster.regions[moveRegionAction.getRegion()], + cluster.servers[moveRegionAction.getFromServer()], + cluster.servers[moveRegionAction.getToServer()])); + } + return mbRegionPlans; + } + + @Override + long getStepCount() { + return moveActions.size(); + } + + public HashMultimap<Integer, Integer> getServerToRegionsToRemove() { + return moveActions.stream().collect(Multimaps.toMultimap(MoveRegionAction::getFromServer, + MoveRegionAction::getRegion, HashMultimap::create)); + } + + public HashMultimap<Integer, Integer> getServerToRegionsToAdd() { + return moveActions.stream().collect(Multimaps.toMultimap(MoveRegionAction::getToServer, + MoveRegionAction::getRegion, HashMultimap::create)); + } + + List<MoveRegionAction> getMoveActions() { + return moveActions; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/MoveRegionAction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/MoveRegionAction.java index 547c9c5b28e..9798e9cebe8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/MoveRegionAction.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/MoveRegionAction.java @@ -17,8 +17,12 @@ */ package org.apache.hadoop.hbase.master.balancer; +import java.util.List; +import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; + @InterfaceAudience.Private class MoveRegionAction extends BalanceAction { private final int region; @@ -49,6 +53,12 @@ class MoveRegionAction extends BalanceAction { return new MoveRegionAction(region, toServer, fromServer); } + @Override + List<RegionPlan> toRegionPlans(BalancerClusterState cluster) { + return ImmutableList.of(new RegionPlan(cluster.regions[getRegion()], + cluster.servers[getFromServer()], cluster.servers[getToServer()])); + } + @Override public String toString() { return getType() + ": " + region + ":" + fromServer + " -> " + toServer; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionPlanConditional.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionPlanConditional.java new file mode 100644 index 00000000000..8de371d341c --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionPlanConditional.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +@InterfaceStability.Evolving +public abstract class RegionPlanConditional { + private static final Logger LOG = LoggerFactory.getLogger(RegionPlanConditional.class); + private BalancerClusterState cluster; + + RegionPlanConditional(Configuration conf, BalancerClusterState cluster) { + this.cluster = cluster; + } + + public enum ValidationLevel { + /** + * Just check the server. + */ + SERVER, + /** + * Check the server and the host. + */ + SERVER_HOST, + /** + * Check the server, host, and rack. + */ + SERVER_HOST_RACK + } + + void setClusterState(BalancerClusterState cluster) { + this.cluster = cluster; + } + + /** + * Returns a {@link ValidationLevel} that is appropriate for this conditional. + * @return the validation level + */ + abstract ValidationLevel getValidationLevel(); + + /** + * Get the candidate generator(s) for this conditional. This can be useful to provide the balancer + * with hints that will appease your conditional. Your conditionals will be triggered in order. + * @return the candidate generator for this conditional + */ + abstract List<RegionPlanConditionalCandidateGenerator> getCandidateGenerators(); + + /** + * Check if the conditional is violated by the given region plan. + * @param regionPlan the region plan to check + * @return true if the conditional is violated + */ + boolean isViolating(RegionPlan regionPlan) { + if (regionPlan == null) { + return false; + } + int destinationServerIdx = cluster.serversToIndex.get(regionPlan.getDestination().getAddress()); + + // Check Server + int[] destinationRegionIndices = cluster.regionsPerServer[destinationServerIdx]; + Set<RegionInfo> serverRegions = Arrays.stream(cluster.regionsPerServer[destinationServerIdx]) + .mapToObj(idx -> cluster.regions[idx]).collect(Collectors.toSet()); + for (int regionIdx : destinationRegionIndices) { + serverRegions.add(cluster.regions[regionIdx]); + } + if (isViolatingServer(regionPlan, serverRegions)) { + return true; + } + + if (getValidationLevel() == ValidationLevel.SERVER) { + return false; + } + + // Check Host + int hostIdx = cluster.serverIndexToHostIndex[destinationServerIdx]; + Set<RegionInfo> hostRegions = Arrays.stream(cluster.regionsPerHost[hostIdx]) + .mapToObj(idx -> cluster.regions[idx]).collect(Collectors.toSet()); + if (isViolatingHost(regionPlan, hostRegions)) { + return true; + } + + if (getValidationLevel() == ValidationLevel.SERVER_HOST) { + return false; + } + + // Check Rack + int rackIdx = cluster.serverIndexToRackIndex[destinationServerIdx]; + Set<RegionInfo> rackRegions = Arrays.stream(cluster.regionsPerRack[rackIdx]) + .mapToObj(idx -> cluster.regions[idx]).collect(Collectors.toSet()); + if (isViolatingRack(regionPlan, rackRegions)) { + return true; + } + + return false; + } + + abstract boolean isViolatingServer(RegionPlan regionPlan, Set<RegionInfo> destinationRegions); + + boolean isViolatingHost(RegionPlan regionPlan, Set<RegionInfo> destinationRegions) { + return false; + } + + boolean isViolatingRack(RegionPlan regionPlan, Set<RegionInfo> destinationRegions) { + return false; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionPlanConditionalCandidateGenerator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionPlanConditionalCandidateGenerator.java new file mode 100644 index 00000000000..f8274841f72 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionPlanConditionalCandidateGenerator.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import java.time.Duration; +import java.util.List; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public abstract class RegionPlanConditionalCandidateGenerator extends CandidateGenerator { + + private static final Logger LOG = + LoggerFactory.getLogger(RegionPlanConditionalCandidateGenerator.class); + + private static final Duration WEIGHT_CACHE_TTL = Duration.ofMinutes(1); + private long lastWeighedAt = -1; + private double lastWeight = 0.0; + + private final BalancerConditionals balancerConditionals; + + RegionPlanConditionalCandidateGenerator(BalancerConditionals balancerConditionals) { + this.balancerConditionals = balancerConditionals; + } + + BalancerConditionals getBalancerConditionals() { + return this.balancerConditionals; + } + + /** + * Generates a balancing action to appease the conditional. + * @param cluster Current state of the cluster. + * @param isWeighing Flag indicating if the generator is being used for weighing. + * @return A BalanceAction, or NULL_ACTION if no action is needed. + */ + abstract BalanceAction generateCandidate(BalancerClusterState cluster, boolean isWeighing); + + @Override + BalanceAction generate(BalancerClusterState cluster) { + BalanceAction balanceAction = generateCandidate(cluster, false); + if (!willBeAccepted(cluster, balanceAction)) { + LOG.debug("Generated action is not widely accepted by all conditionals. " + + "Likely we are finding our way out of a deadlock. balanceAction={}", balanceAction); + } + return balanceAction; + } + + MoveBatchAction batchMovesAndResetClusterState(BalancerClusterState cluster, + List<MoveRegionAction> moves) { + MoveBatchAction batchAction = new MoveBatchAction(moves); + undoBatchAction(cluster, batchAction); + return batchAction; + } + + boolean willBeAccepted(BalancerClusterState cluster, BalanceAction action) { + BalancerConditionals balancerConditionals = getBalancerConditionals(); + if (balancerConditionals == null) { + return true; + } + return !balancerConditionals.isViolating(cluster, action); + } + + void undoBatchAction(BalancerClusterState cluster, MoveBatchAction batchAction) { + for (int i = batchAction.getMoveActions().size() - 1; i >= 0; i--) { + MoveRegionAction action = batchAction.getMoveActions().get(i); + cluster.doAction(action.undoAction()); + } + } + + void clearWeightCache() { + lastWeighedAt = -1; + } + + double getWeight(BalancerClusterState cluster) { + boolean hasCandidate = false; + + // Candidate generation is expensive, so for re-weighing generators we will cache + // the value for a bit + if (EnvironmentEdgeManager.currentTime() - lastWeighedAt < WEIGHT_CACHE_TTL.toMillis()) { + return lastWeight; + } else { + hasCandidate = generateCandidate(cluster, true) != BalanceAction.NULL_ACTION; + lastWeighedAt = EnvironmentEdgeManager.currentTime(); + } + + if (hasCandidate) { + // If this generator has something to do, then it's important + lastWeight = CandidateGenerator.MAX_WEIGHT; + } else { + lastWeight = 0; + } + return lastWeight; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SlopFixingCandidateGenerator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SlopFixingCandidateGenerator.java new file mode 100644 index 00000000000..070e4903394 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SlopFixingCandidateGenerator.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.hadoop.hbase.ServerName; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A simple candidate generator that attempts to move regions from the most-loaded servers to the + * least-loaded servers. + */ +@InterfaceAudience.Private +final class SlopFixingCandidateGenerator extends RegionPlanConditionalCandidateGenerator { + + private static final Logger LOG = LoggerFactory.getLogger(SlopFixingCandidateGenerator.class); + + private final float slop; + + SlopFixingCandidateGenerator(BalancerConditionals balancerConditionals, float slop) { + super(balancerConditionals); + this.slop = slop; + } + + @Override + BalanceAction generateCandidate(BalancerClusterState cluster, boolean isWeighing) { + ClusterLoadState cs = new ClusterLoadState(cluster.clusterState); + float average = cs.getLoadAverage(); + int ceiling = (int) Math.ceil(average * (1 + slop)); + Set<Integer> sloppyServerIndices = new HashSet<>(); + for (int i = 0; i < cluster.numServers; i++) { + int regionCount = cluster.regionsPerServer[i].length; + if (regionCount > ceiling) { + sloppyServerIndices.add(i); + } + } + + if (sloppyServerIndices.isEmpty()) { + LOG.trace("No action to take because no sloppy servers exist."); + return BalanceAction.NULL_ACTION; + } + + List<MoveRegionAction> moves = new ArrayList<>(); + Set<ServerAndLoad> fixedServers = new HashSet<>(); + for (int sourceServer : sloppyServerIndices) { + for (int regionIdx : cluster.regionsPerServer[sourceServer]) { + boolean regionFoundMove = false; + for (ServerAndLoad serverAndLoad : cs.getServersByLoad().keySet()) { + ServerName destinationServer = serverAndLoad.getServerName(); + int destinationServerIdx = cluster.serversToIndex.get(destinationServer.getAddress()); + int regionsOnDestination = cluster.regionsPerServer[destinationServerIdx].length; + if (regionsOnDestination < average) { + MoveRegionAction move = + new MoveRegionAction(regionIdx, sourceServer, destinationServerIdx); + if (willBeAccepted(cluster, move)) { + if (isWeighing) { + // Fast exit for weighing candidate + return move; + } + moves.add(move); + cluster.doAction(move); + regionFoundMove = true; + break; + } + } else { + fixedServers.add(serverAndLoad); + } + } + fixedServers.forEach(s -> cs.getServersByLoad().remove(s)); + fixedServers.clear(); + if (!regionFoundMove) { + LOG.debug("Could not find a destination for region {} from server {}.", regionIdx, + sourceServer); + } + if (cluster.regionsPerServer[sourceServer].length <= ceiling) { + break; + } + } + } + + MoveBatchAction batch = new MoveBatchAction(moves); + undoBatchAction(cluster, batch); + return batch; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java index fca4ef95207..42784ea4440 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java @@ -54,7 +54,6 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.base.Suppliers; /** @@ -192,6 +191,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { return shuffled; }, 5, TimeUnit.SECONDS); + private final BalancerConditionals balancerConditionals = BalancerConditionals.create(); + /** * The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its * default MetricsBalancer @@ -244,16 +245,24 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } protected Map<Class<? extends CandidateGenerator>, CandidateGenerator> - createCandidateGenerators() { - Map<Class<? extends CandidateGenerator>, CandidateGenerator> candidateGenerators = - new HashMap<>(5); - candidateGenerators.put(RandomCandidateGenerator.class, new RandomCandidateGenerator()); - candidateGenerators.put(LoadCandidateGenerator.class, new LoadCandidateGenerator()); - candidateGenerators.put(LocalityBasedCandidateGenerator.class, localityCandidateGenerator); - candidateGenerators.put(RegionReplicaCandidateGenerator.class, - new RegionReplicaCandidateGenerator()); - candidateGenerators.put(RegionReplicaRackCandidateGenerator.class, - new RegionReplicaRackCandidateGenerator()); + createCandidateGenerators(Configuration conf) { + balancerConditionals.setConf(conf); + Map<Class<? extends CandidateGenerator>, CandidateGenerator> candidateGenerators; + if (balancerConditionals.isReplicaDistributionEnabled()) { + candidateGenerators = new HashMap<>(3); + candidateGenerators.put(RandomCandidateGenerator.class, new RandomCandidateGenerator()); + candidateGenerators.put(LoadCandidateGenerator.class, new LoadCandidateGenerator()); + candidateGenerators.put(LocalityBasedCandidateGenerator.class, localityCandidateGenerator); + } else { + candidateGenerators = new HashMap<>(5); + candidateGenerators.put(RandomCandidateGenerator.class, new RandomCandidateGenerator()); + candidateGenerators.put(LoadCandidateGenerator.class, new LoadCandidateGenerator()); + candidateGenerators.put(LocalityBasedCandidateGenerator.class, localityCandidateGenerator); + candidateGenerators.put(RegionReplicaCandidateGenerator.class, + new RegionReplicaCandidateGenerator()); + candidateGenerators.put(RegionReplicaRackCandidateGenerator.class, + new RegionReplicaRackCandidateGenerator()); + } return candidateGenerators; } @@ -288,7 +297,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { localityCost = new ServerLocalityCostFunction(conf); rackLocalityCost = new RackLocalityCostFunction(conf); - this.candidateGenerators = createCandidateGenerators(); + balancerConditionals.setConf(conf); + this.candidateGenerators = createCandidateGenerators(conf); regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf); regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf); @@ -377,6 +387,11 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } private boolean areSomeRegionReplicasColocatedOnHost(BalancerClusterState c) { + if (!c.hasRegionReplicas || balancerConditionals.isReplicaDistributionEnabled()) { + // This check is unnecessary without replicas, or with conditional replica distribution + // The balancer will auto-run if conditional replica distribution candidates are available + return false; + } if (c.numHosts >= c.maxReplicas) { regionReplicaHostCostFunction.prepare(c); double hostCost = Math.abs(regionReplicaHostCostFunction.cost()); @@ -390,6 +405,11 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } private boolean areSomeRegionReplicasColocatedOnRack(BalancerClusterState c) { + if (!c.hasRegionReplicas || balancerConditionals.isReplicaDistributionEnabled()) { + // This check is unnecessary without replicas, or with conditional replica distribution + // The balancer will auto-run if conditional replica distribution candidates are available + return false; + } if (c.numRacks >= c.maxReplicas) { regionReplicaRackCostFunction.prepare(c); double rackCost = Math.abs(regionReplicaRackCostFunction.cost()); @@ -441,6 +461,11 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { return true; } + if (balancerConditionals.shouldRunBalancer(cluster)) { + LOG.info("Running balancer because conditional candidate generators have important moves"); + return true; + } + double total = 0.0; float localSumMultiplier = 0; // in case this.sumMultiplier is not initialized for (CostFunction c : costFunctions) { @@ -470,14 +495,17 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } LOG.info( "{} - skipping load balancing because weighted average imbalance={} <= " - + "threshold({}). If you want more aggressive balancing, either lower " + + "threshold({}) and conditionals do not have opinionated move candidates. " + + "If you want more aggressive balancing, either lower " + "hbase.master.balancer.stochastic.minCostNeedBalance from {} or increase the relative " + "multiplier(s) of the specific cost function(s). functionCost={}", isByTable ? "Table specific (" + tableName + ")" : "Cluster wide", total / sumMultiplier, minCostNeedBalance, minCostNeedBalance, functionCost()); } else { - LOG.info("{} - Calculating plan. may take up to {}ms to complete.", - isByTable ? "Table specific (" + tableName + ")" : "Cluster wide", maxRunningTime); + LOG.info( + "{} - Calculating plan. may take up to {}ms to complete. currentCost={}, targetCost={}", + isByTable ? "Table specific (" + tableName + ")" : "Cluster wide", maxRunningTime, total, + minCostNeedBalance); } return !balanced; } @@ -485,7 +513,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { @RestrictedApi(explanation = "Should only be called in tests", link = "", allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") Pair<CandidateGenerator, BalanceAction> nextAction(BalancerClusterState cluster) { - CandidateGenerator generator = getRandomGenerator(); + CandidateGenerator generator = getRandomGenerator(cluster); return Pair.newPair(generator, generator.generate(cluster)); } @@ -494,8 +522,20 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { * selecting a candidate generator is proportional to the share of cost of all cost functions * among all cost functions that benefit from it. */ - protected CandidateGenerator getRandomGenerator() { - Preconditions.checkState(!candidateGenerators.isEmpty(), "No candidate generators available."); + protected CandidateGenerator getRandomGenerator(BalancerClusterState cluster) { + // Prefer conditional generators if they have moves to make + if (balancerConditionals.isConditionalBalancingEnabled()) { + for (RegionPlanConditional conditional : balancerConditionals.getConditionals()) { + List<RegionPlanConditionalCandidateGenerator> generators = + conditional.getCandidateGenerators(); + for (RegionPlanConditionalCandidateGenerator generator : generators) { + if (generator.getWeight(cluster) > 0) { + return generator; + } + } + } + } + List<Class<? extends CandidateGenerator>> generatorClasses = shuffledGeneratorClasses.get(); List<Double> partialSums = new ArrayList<>(generatorClasses.size()); double sum = 0.0; @@ -583,8 +623,12 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { rackManager, regionCacheRatioOnOldServerMap); long startTime = EnvironmentEdgeManager.currentTime(); + cluster.setStopRequestedAt(startTime + maxRunningTime); initCosts(cluster); + balancerConditionals.loadClusterState(cluster); + balancerConditionals.clearConditionalWeightCaches(); + float localSumMultiplier = 0; for (CostFunction c : costFunctions) { if (c.isNeeded()) { @@ -632,6 +676,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { final String initFunctionTotalCosts = totalCostsPerFunc(); // Perform a stochastic walk to see if we can get a good fit. long step; + boolean planImprovedConditionals = false; Map<Class<? extends CandidateGenerator>, Long> generatorToStepCount = new HashMap<>(); Map<Class<? extends CandidateGenerator>, Long> generatorToApprovedActionCount = new HashMap<>(); for (step = 0; step < computedMaxSteps; step++) { @@ -643,16 +688,53 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { continue; } - cluster.doAction(action); + int conditionalViolationsChange = 0; + boolean isViolatingConditionals = false; + boolean moveImprovedConditionals = false; + // Only check conditionals if they are enabled + if (balancerConditionals.isConditionalBalancingEnabled()) { + // Always accept a conditional generator output. Sometimes conditional generators + // may need to make controversial moves in order to break what would otherwise + // be a deadlocked situation. + // Otherwise, for normal moves, evaluate the action. + if (RegionPlanConditionalCandidateGenerator.class.isAssignableFrom(generator.getClass())) { + conditionalViolationsChange = -1; + } else { + conditionalViolationsChange = + balancerConditionals.getViolationCountChange(cluster, action); + isViolatingConditionals = balancerConditionals.isViolating(cluster, action); + } + moveImprovedConditionals = conditionalViolationsChange < 0; + if (moveImprovedConditionals) { + planImprovedConditionals = true; + } + } + + // Change state and evaluate costs + try { + cluster.doAction(action); + } catch (IllegalStateException | ArrayIndexOutOfBoundsException e) { + LOG.warn( + "Generator {} produced invalid action! " + + "Debug your candidate generator as this is likely a bug, " + + "and may cause a balancer deadlock. {}", + generator.getClass().getSimpleName(), action, e); + continue; + } updateCostsAndWeightsWithAction(cluster, action); - generatorToStepCount.merge(generator.getClass(), 1L, Long::sum); + generatorToStepCount.merge(generator.getClass(), action.getStepCount(), Long::sum); newCost = computeCost(cluster, currentCost); - // Should this be kept? - if (newCost < currentCost) { + boolean conditionalsSimilarCostsImproved = + (newCost < currentCost && conditionalViolationsChange == 0 && !isViolatingConditionals); + // Our first priority is to reduce conditional violations + // Our second priority is to reduce balancer cost + // change, regardless of cost change + if (moveImprovedConditionals || conditionalsSimilarCostsImproved) { currentCost = newCost; - generatorToApprovedActionCount.merge(generator.getClass(), 1L, Long::sum); + generatorToApprovedActionCount.merge(generator.getClass(), action.getStepCount(), + Long::sum); // save for JMX curOverallCost = currentCost; @@ -665,7 +747,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { updateCostsAndWeightsWithAction(cluster, undoAction); } - if (EnvironmentEdgeManager.currentTime() - startTime > maxRunningTime) { + if (EnvironmentEdgeManager.currentTime() > cluster.getStopRequestedAt()) { break; } } @@ -682,7 +764,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { metricsBalancer.balanceCluster(endTime - startTime); - if (initCost > currentCost) { + if (planImprovedConditionals || (initCost > currentCost)) { updateStochasticCosts(tableName, curOverallCost, curFunctionCosts); plans = createRegionPlans(cluster); LOG.info( @@ -697,7 +779,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } LOG.info( "Could not find a better moving plan. Tried {} different configurations in " - + "{} ms, and did not find anything with an imbalance score less than {}", + + "{} ms, and did not find anything with an imbalance score less than {} " + + "and could not improve conditional violations", step, endTime - startTime, initCost / sumMultiplier); return null; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SwapRegionsAction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SwapRegionsAction.java index 6f83d2bc930..c99de022f03 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SwapRegionsAction.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SwapRegionsAction.java @@ -17,8 +17,12 @@ */ package org.apache.hadoop.hbase.master.balancer; +import java.util.List; +import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; + @InterfaceAudience.Private public class SwapRegionsAction extends BalanceAction { private final int fromServer; @@ -55,6 +59,15 @@ public class SwapRegionsAction extends BalanceAction { return new SwapRegionsAction(fromServer, toRegion, toServer, fromRegion); } + @Override + List<RegionPlan> toRegionPlans(BalancerClusterState cluster) { + return ImmutableList.of( + new RegionPlan(cluster.regions[getFromRegion()], cluster.servers[getFromServer()], + cluster.servers[getToServer()]), + new RegionPlan(cluster.regions[getToRegion()], cluster.servers[getToServer()], + cluster.servers[getFromServer()])); + } + @Override public String toString() { return getType() + ": " + fromRegion + ":" + fromServer + " <-> " + toRegion + ":" + toServer; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/replicas/ReplicaKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/replicas/ReplicaKey.java new file mode 100644 index 00000000000..88cfa82dcc6 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/replicas/ReplicaKey.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer.replicas; + +import java.util.Arrays; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public final class ReplicaKey { + private final TableName tableName; + private final byte[] start; + private final byte[] stop; + + public ReplicaKey(RegionInfo regionInfo) { + this.tableName = regionInfo.getTable(); + this.start = regionInfo.getStartKey(); + this.stop = regionInfo.getEndKey(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof ReplicaKey other)) { + return false; + } + return Arrays.equals(this.start, other.start) && Arrays.equals(this.stop, other.stop) + && this.tableName.equals(other.tableName); + } + + @Override + public int hashCode() { + return new HashCodeBuilder().append(tableName).append(start).append(stop).toHashCode(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/replicas/ReplicaKeyCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/replicas/ReplicaKeyCache.java new file mode 100644 index 00000000000..a40e5f9a2f2 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/replicas/ReplicaKeyCache.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer.replicas; + +import java.time.Duration; +import java.util.function.Supplier; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.com.google.common.base.Suppliers; +import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; +import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader; +import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache; + +@InterfaceAudience.Private +public final class ReplicaKeyCache implements Configurable { + /** + * ReplicaKey creation is expensive if you have lots of regions. If your HMaster has adequate + * memory, and you would like balancing to be faster, then you can turn on this flag to cache + * ReplicaKey objects. + */ + public static final String CACHE_REPLICA_KEYS_KEY = + "hbase.replica.distribution.conditional.cacheReplicaKeys"; + public static final boolean CACHE_REPLICA_KEYS_DEFAULT = false; + + /** + * If memory is available, then set this to a value greater than your region count to maximize + * replica distribution performance. + */ + public static final String REPLICA_KEY_CACHE_SIZE_KEY = + "hbase.replica.distribution.conditional.replicaKeyCacheSize"; + public static final int REPLICA_KEY_CACHE_SIZE_DEFAULT = 1000; + + private static final Supplier<ReplicaKeyCache> INSTANCE = Suppliers.memoize(ReplicaKeyCache::new); + + private volatile LoadingCache<RegionInfo, ReplicaKey> replicaKeyCache = null; + + private Configuration conf; + + public static ReplicaKeyCache getInstance() { + return INSTANCE.get(); + } + + private ReplicaKeyCache() { + } + + public ReplicaKey getReplicaKey(RegionInfo regionInfo) { + return replicaKeyCache == null + ? new ReplicaKey(regionInfo) + : replicaKeyCache.getUnchecked(regionInfo); + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + boolean cacheKeys = conf.getBoolean(CACHE_REPLICA_KEYS_KEY, CACHE_REPLICA_KEYS_DEFAULT); + if (cacheKeys && replicaKeyCache == null) { + int replicaKeyCacheSize = + conf.getInt(REPLICA_KEY_CACHE_SIZE_KEY, REPLICA_KEY_CACHE_SIZE_DEFAULT); + replicaKeyCache = CacheBuilder.newBuilder().maximumSize(replicaKeyCacheSize) + .expireAfterAccess(Duration.ofMinutes(30)).build(new CacheLoader<RegionInfo, ReplicaKey>() { + @Override + public ReplicaKey load(RegionInfo regionInfo) { + return new ReplicaKey(regionInfo); + } + }); + } else if (!cacheKeys) { + replicaKeyCache = null; + } + } + + @Override + public Configuration getConf() { + return conf; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerConditionalsTestUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerConditionalsTestUtil.java new file mode 100644 index 00000000000..8a7169b0930 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerConditionalsTestUtil.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.quotas.QuotaUtil; +import org.apache.hadoop.hbase.util.Bytes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; + +public final class BalancerConditionalsTestUtil { + + private static final Logger LOG = LoggerFactory.getLogger(BalancerConditionalsTestUtil.class); + + private BalancerConditionalsTestUtil() { + } + + static byte[][] generateSplits(int numRegions) { + byte[][] splitKeys = new byte[numRegions - 1][]; + for (int i = 0; i < numRegions - 1; i++) { + splitKeys[i] = + Bytes.toBytes(String.format("%09d", (i + 1) * (Integer.MAX_VALUE / numRegions))); + } + return splitKeys; + } + + static void printRegionLocations(Connection connection) throws IOException { + Admin admin = connection.getAdmin(); + + // Get all table names in the cluster + Set<TableName> tableNames = admin.listTableDescriptors(true).stream() + .map(TableDescriptor::getTableName).collect(Collectors.toSet()); + + // Group regions by server + Map<ServerName, Map<TableName, List<RegionInfo>>> serverToRegions = + admin.getClusterMetrics().getLiveServerMetrics().keySet().stream() + .collect(Collectors.toMap(server -> server, server -> { + try { + return listRegionsByTable(connection, server, tableNames); + } catch (IOException e) { + throw new RuntimeException(e); + } + })); + + // Pretty print region locations + StringBuilder regionLocationOutput = new StringBuilder(); + regionLocationOutput.append("Pretty printing region locations...\n"); + serverToRegions.forEach((server, tableRegions) -> { + regionLocationOutput.append("Server: " + server.getServerName() + "\n"); + tableRegions.forEach((table, regions) -> { + if (regions.isEmpty()) { + return; + } + regionLocationOutput.append(" Table: " + table.getNameAsString() + "\n"); + regions.forEach(region -> regionLocationOutput + .append(String.format(" Region: %s, start: %s, end: %s, replica: %s\n", + region.getEncodedName(), Bytes.toString(region.getStartKey()), + Bytes.toString(region.getEndKey()), region.getReplicaId()))); + }); + }); + LOG.info(regionLocationOutput.toString()); + } + + private static Map<TableName, List<RegionInfo>> listRegionsByTable(Connection connection, + ServerName server, Set<TableName> tableNames) throws IOException { + Admin admin = connection.getAdmin(); + + // Find regions for each table + return tableNames.stream().collect(Collectors.toMap(tableName -> tableName, tableName -> { + List<RegionInfo> allRegions = null; + try { + allRegions = admin.getRegions(server); + } catch (IOException e) { + throw new RuntimeException(e); + } + return allRegions.stream().filter(region -> region.getTable().equals(tableName)) + .collect(Collectors.toList()); + })); + } + + static void validateReplicaDistribution(Connection connection, TableName tableName, + boolean shouldBeDistributed) { + Map<ServerName, List<RegionInfo>> serverToRegions = null; + try { + serverToRegions = connection.getRegionLocator(tableName).getAllRegionLocations().stream() + .collect(Collectors.groupingBy(location -> location.getServerName(), + Collectors.mapping(location -> location.getRegion(), Collectors.toList()))); + } catch (IOException e) { + throw new RuntimeException(e); + } + + if (shouldBeDistributed) { + // Ensure no server hosts more than one replica of any region + for (Map.Entry<ServerName, List<RegionInfo>> serverAndRegions : serverToRegions.entrySet()) { + List<RegionInfo> regionInfos = serverAndRegions.getValue(); + Set<byte[]> startKeys = new HashSet<>(); + for (RegionInfo regionInfo : regionInfos) { + // each region should have a distinct start key + assertFalse( + "Each region should have its own start key, " + + "demonstrating it is not a replica of any others on this host", + startKeys.contains(regionInfo.getStartKey())); + startKeys.add(regionInfo.getStartKey()); + } + } + } else { + // Ensure all replicas are on the same server + assertEquals("All regions should share one server", 1, serverToRegions.size()); + } + } + + static void validateRegionLocations(Map<TableName, Set<ServerName>> tableToServers, + TableName productTableName, boolean shouldBeBalanced) { + ServerName metaServer = + tableToServers.get(TableName.META_TABLE_NAME).stream().findFirst().orElseThrow(); + ServerName quotaServer = + tableToServers.get(QuotaUtil.QUOTA_TABLE_NAME).stream().findFirst().orElseThrow(); + Set<ServerName> productServers = tableToServers.get(productTableName); + + if (shouldBeBalanced) { + for (ServerName server : productServers) { + assertNotEquals("Meta table and product table should not share servers", server, + metaServer); + assertNotEquals("Quota table and product table should not share servers", server, + quotaServer); + } + assertNotEquals("The meta server and quotas server should be different", metaServer, + quotaServer); + } else { + for (ServerName server : productServers) { + assertEquals("Meta table and product table must share servers", server, metaServer); + assertEquals("Quota table and product table must share servers", server, quotaServer); + } + assertEquals("The meta server and quotas server must be the same", metaServer, quotaServer); + } + } + + static Map<TableName, Set<ServerName>> getTableToServers(Connection connection, + Set<TableName> tableNames) { + return tableNames.stream().collect(Collectors.toMap(t -> t, t -> { + try { + return connection.getRegionLocator(t).getAllRegionLocations().stream() + .map(HRegionLocation::getServerName).collect(Collectors.toSet()); + } catch (IOException e) { + throw new RuntimeException(e); + } + })); + } + + @FunctionalInterface + interface AssertionRunnable { + void run() throws AssertionError; + } + + static void validateAssertionsWithRetries(HBaseTestingUtil testUtil, boolean runBalancerOnFailure, + AssertionRunnable assertion) { + validateAssertionsWithRetries(testUtil, runBalancerOnFailure, ImmutableSet.of(assertion)); + } + + static void validateAssertionsWithRetries(HBaseTestingUtil testUtil, boolean runBalancerOnFailure, + Set<AssertionRunnable> assertions) { + int maxAttempts = 50; + for (int i = 0; i < maxAttempts; i++) { + try { + for (AssertionRunnable assertion : assertions) { + assertion.run(); + } + } catch (AssertionError e) { + if (i == maxAttempts - 1) { + throw e; + } + try { + LOG.warn("Failed to validate region locations. Will retry", e); + Thread.sleep(1000); + BalancerConditionalsTestUtil.printRegionLocations(testUtil.getConnection()); + if (runBalancerOnFailure) { + testUtil.getAdmin().balance(); + } + Thread.sleep(1000); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + } + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/CandidateGeneratorTestUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/CandidateGeneratorTestUtil.java new file mode 100644 index 00000000000..116ee4fc657 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/CandidateGeneratorTestUtil.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import static org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer.MAX_RUNNING_TIME_KEY; +import static org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer.MIN_COST_NEED_BALANCE_KEY; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Base64; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.master.balancer.replicas.ReplicaKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class CandidateGeneratorTestUtil { + + private static final Logger LOG = LoggerFactory.getLogger(CandidateGeneratorTestUtil.class); + + private CandidateGeneratorTestUtil() { + } + + static void runBalancerToExhaustion(Configuration conf, + Map<ServerName, List<RegionInfo>> serverToRegions, + Set<Function<BalancerClusterState, Boolean>> expectations, float targetMaxBalancerCost) { + // Do the full plan. We're testing with a lot of regions + conf.setBoolean("hbase.master.balancer.stochastic.runMaxSteps", true); + conf.setLong(MAX_RUNNING_TIME_KEY, 15000); + + conf.setFloat(MIN_COST_NEED_BALANCE_KEY, targetMaxBalancerCost); + + BalancerClusterState cluster = createMockBalancerClusterState(serverToRegions); + StochasticLoadBalancer stochasticLoadBalancer = buildStochasticLoadBalancer(cluster, conf); + printClusterDistribution(cluster, 0); + int balancerRuns = 0; + int actionsTaken = 0; + long balancingMillis = 0; + boolean isBalanced = false; + while (!isBalanced) { + balancerRuns++; + if (balancerRuns > 1000) { + throw new RuntimeException("Balancer failed to find balance & meet expectations"); + } + long start = System.currentTimeMillis(); + List<RegionPlan> regionPlans = + stochasticLoadBalancer.balanceCluster(partitionRegionsByTable(serverToRegions)); + balancingMillis += System.currentTimeMillis() - start; + actionsTaken++; + if (regionPlans != null) { + // Apply all plans to serverToRegions + for (RegionPlan rp : regionPlans) { + ServerName source = rp.getSource(); + ServerName dest = rp.getDestination(); + RegionInfo region = rp.getRegionInfo(); + + // Update serverToRegions + serverToRegions.get(source).remove(region); + serverToRegions.get(dest).add(region); + actionsTaken++; + } + + // Now rebuild cluster and balancer from updated serverToRegions + cluster = createMockBalancerClusterState(serverToRegions); + stochasticLoadBalancer = buildStochasticLoadBalancer(cluster, conf); + } + printClusterDistribution(cluster, actionsTaken); + isBalanced = true; + for (Function<BalancerClusterState, Boolean> condition : expectations) { + // Check if we've met all expectations for the candidate generator + if (!condition.apply(cluster)) { + isBalanced = false; + break; + } + } + if (isBalanced) { // Check if the balancer thinks we're done too + LOG.info("All balancer conditions passed. Checking if balancer thinks it's done."); + if (stochasticLoadBalancer.needsBalance(HConstants.ENSEMBLE_TABLE_NAME, cluster)) { + LOG.info("Balancer would still like to run"); + isBalanced = false; + } else { + LOG.info("Balancer is done"); + } + } + } + LOG.info("Balancing took {}sec", Duration.ofMillis(balancingMillis).toMinutes()); + } + + /** + * Prints the current cluster distribution of regions per table per server + */ + static void printClusterDistribution(BalancerClusterState cluster, long actionsTaken) { + LOG.info("=== Cluster Distribution after {} balancer actions taken ===", actionsTaken); + + for (int i = 0; i < cluster.numServers; i++) { + int[] regions = cluster.regionsPerServer[i]; + int regionCount = (regions == null) ? 0 : regions.length; + + LOG.info("Server {}: {} regions", cluster.servers[i].getServerName(), regionCount); + + if (regionCount > 0) { + Map<TableName, Integer> tableRegionCounts = new HashMap<>(); + + for (int regionIndex : regions) { + RegionInfo regionInfo = cluster.regions[regionIndex]; + TableName tableName = regionInfo.getTable(); + tableRegionCounts.put(tableName, tableRegionCounts.getOrDefault(tableName, 0) + 1); + } + + tableRegionCounts + .forEach((table, count) -> LOG.info(" - Table {}: {} regions", table, count)); + } + } + + LOG.info("==========================================="); + } + + /** + * Partitions the given serverToRegions map by table The tables are derived from the RegionInfo + * objects found in serverToRegions. + * @param serverToRegions The map of servers to their assigned regions. + * @return A map of tables to their server-to-region assignments. + */ + public static Map<TableName, Map<ServerName, List<RegionInfo>>> + partitionRegionsByTable(Map<ServerName, List<RegionInfo>> serverToRegions) { + + // First, gather all tables from the regions + Set<TableName> allTables = new HashSet<>(); + for (List<RegionInfo> regions : serverToRegions.values()) { + for (RegionInfo region : regions) { + allTables.add(region.getTable()); + } + } + + Map<TableName, Map<ServerName, List<RegionInfo>>> tablesToServersToRegions = new HashMap<>(); + + // Initialize each table with all servers mapped to empty lists + for (TableName table : allTables) { + Map<ServerName, List<RegionInfo>> serverMap = new HashMap<>(); + for (ServerName server : serverToRegions.keySet()) { + serverMap.put(server, new ArrayList<>()); + } + tablesToServersToRegions.put(table, serverMap); + } + + // Distribute regions to their respective tables + for (Map.Entry<ServerName, List<RegionInfo>> serverAndRegions : serverToRegions.entrySet()) { + ServerName server = serverAndRegions.getKey(); + List<RegionInfo> regions = serverAndRegions.getValue(); + + for (RegionInfo region : regions) { + TableName regionTable = region.getTable(); + // Now we know for sure regionTable is in allTables + Map<ServerName, List<RegionInfo>> tableServerMap = + tablesToServersToRegions.get(regionTable); + tableServerMap.get(server).add(region); + } + } + + return tablesToServersToRegions; + } + + static StochasticLoadBalancer buildStochasticLoadBalancer(BalancerClusterState cluster, + Configuration conf) { + StochasticLoadBalancer stochasticLoadBalancer = + new StochasticLoadBalancer(new DummyMetricsStochasticBalancer()); + stochasticLoadBalancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf)); + stochasticLoadBalancer.loadConf(conf); + stochasticLoadBalancer.initCosts(cluster); + return stochasticLoadBalancer; + } + + static BalancerClusterState + createMockBalancerClusterState(Map<ServerName, List<RegionInfo>> serverToRegions) { + return new BalancerClusterState(serverToRegions, null, null, null, null); + } + + /** + * Validates that each replica is isolated from its others. Ensures that no server hosts more than + * one replica of the same region (i.e., regions with identical start and end keys). + * @param cluster The current state of the cluster. + * @return true if all replicas are properly isolated, false otherwise. + */ + static boolean areAllReplicasDistributed(BalancerClusterState cluster) { + // Iterate over each server + for (int[] regionsPerServer : cluster.regionsPerServer) { + if (regionsPerServer == null || regionsPerServer.length == 0) { + continue; // Skip empty servers + } + + Set<ReplicaKey> foundKeys = new HashSet<>(); + for (int regionIndex : regionsPerServer) { + RegionInfo regionInfo = cluster.regions[regionIndex]; + ReplicaKey replicaKey = new ReplicaKey(regionInfo); + if (foundKeys.contains(replicaKey)) { + // Violation: Multiple replicas of the same region on the same server + LOG.warn("Replica isolation violated: one server hosts multiple replicas of key [{}].", + generateRegionKey(regionInfo)); + return false; + } + + foundKeys.add(replicaKey); + } + } + + LOG.info( + "Replica isolation validation passed: No server hosts multiple replicas of the same region."); + return true; + } + + /** + * Generates a unique key for a region based on its start and end keys. This method ensures that + * regions with identical start and end keys have the same key. + * @param regionInfo The RegionInfo object. + * @return A string representing the unique key of the region. + */ + private static String generateRegionKey(RegionInfo regionInfo) { + // Using Base64 encoding for byte arrays to ensure uniqueness and readability + String startKey = Base64.getEncoder().encodeToString(regionInfo.getStartKey()); + String endKey = Base64.getEncoder().encodeToString(regionInfo.getEndKey()); + + return regionInfo.getTable().getNameAsString() + ":" + startKey + ":" + endKey; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalanceAction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/DistributeReplicasTestConditional.java similarity index 57% copy from hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalanceAction.java copy to hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/DistributeReplicasTestConditional.java index 56b473ae710..5a8fa2524fe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalanceAction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/DistributeReplicasTestConditional.java @@ -17,42 +17,23 @@ */ package org.apache.hadoop.hbase.master.balancer; -import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; -/** - * An action to move or swap a region - */ -@InterfaceAudience.Private -abstract class BalanceAction { - enum Type { - ASSIGN_REGION, - MOVE_REGION, - SWAP_REGIONS, - NULL, - } - - static final BalanceAction NULL_ACTION = new BalanceAction(Type.NULL) { - }; - - private final Type type; - - BalanceAction(Type type) { - this.type = type; - } +public class DistributeReplicasTestConditional extends DistributeReplicasConditional { - /** - * Returns an Action which would undo this action - */ - BalanceAction undoAction() { - return this; + static void enableConditionalReplicaDistributionForTest(Configuration conf) { + conf.set(BalancerConditionals.ADDITIONAL_CONDITIONALS_KEY, + DistributeReplicasTestConditional.class.getCanonicalName()); } - Type getType() { - return type; + public DistributeReplicasTestConditional(BalancerConditionals balancerConditionals, + BalancerClusterState cluster) { + super(balancerConditionals, cluster); } @Override - public String toString() { - return type + ":"; + public ValidationLevel getValidationLevel() { + // Mini-cluster tests can't validate at host/rack levels + return ValidationLevel.SERVER; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/LoadOnlyFavoredStochasticBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/LoadOnlyFavoredStochasticBalancer.java index d658f7cfa16..dfacad1a747 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/LoadOnlyFavoredStochasticBalancer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/LoadOnlyFavoredStochasticBalancer.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master.balancer; import java.util.HashMap; import java.util.Map; +import org.apache.hadoop.conf.Configuration; /** * Used for FavoredNode unit tests @@ -27,7 +28,7 @@ public class LoadOnlyFavoredStochasticBalancer extends FavoredStochasticBalancer @Override protected Map<Class<? extends CandidateGenerator>, CandidateGenerator> - createCandidateGenerators() { + createCandidateGenerators(Configuration conf) { Map<Class<? extends CandidateGenerator>, CandidateGenerator> fnPickers = new HashMap<>(1); fnPickers.put(FavoredNodeLoadPicker.class, new FavoredNodeLoadPicker()); return fnPickers; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBalancerConditionals.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBalancerConditionals.java new file mode 100644 index 00000000000..884331f161a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBalancerConditionals.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ SmallTests.class, MasterTests.class }) +public class TestBalancerConditionals extends BalancerTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestBalancerConditionals.class); + + private BalancerConditionals balancerConditionals; + private BalancerClusterState mockCluster; + + @Before + public void setUp() { + balancerConditionals = BalancerConditionals.create(); + mockCluster = mockCluster(new int[] { 0, 1, 2 }); + } + + @Test + public void testDefaultConfiguration() { + Configuration conf = new Configuration(); + balancerConditionals.setConf(conf); + balancerConditionals.loadClusterState(mockCluster); + + assertEquals("No conditionals should be loaded by default", 0, + balancerConditionals.getConditionalClasses().size()); + } + + @Test + public void testCustomConditionalsViaConfiguration() { + Configuration conf = new Configuration(); + conf.set(BalancerConditionals.ADDITIONAL_CONDITIONALS_KEY, + DistributeReplicasConditional.class.getName()); + + balancerConditionals.setConf(conf); + balancerConditionals.loadClusterState(mockCluster); + + assertTrue("Custom conditionals should be loaded", + balancerConditionals.shouldSkipSloppyServerEvaluation()); + } + + @Test + public void testInvalidCustomConditionalClass() { + Configuration conf = new Configuration(); + conf.set(BalancerConditionals.ADDITIONAL_CONDITIONALS_KEY, "java.lang.String"); + + balancerConditionals.setConf(conf); + balancerConditionals.loadClusterState(mockCluster); + + assertEquals("Invalid classes should not be loaded as conditionals", 0, + balancerConditionals.getConditionalClasses().size()); + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestLargeClusterBalancingConditionalReplicaDistribution.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestLargeClusterBalancingConditionalReplicaDistribution.java new file mode 100644 index 00000000000..e6cec7045e7 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestLargeClusterBalancingConditionalReplicaDistribution.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import static org.apache.hadoop.hbase.master.balancer.CandidateGeneratorTestUtil.runBalancerToExhaustion; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.master.balancer.replicas.ReplicaKeyCache; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({ MediumTests.class, MasterTests.class }) +public class TestLargeClusterBalancingConditionalReplicaDistribution { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestLargeClusterBalancingConditionalReplicaDistribution.class); + + private static final Logger LOG = + LoggerFactory.getLogger(TestLargeClusterBalancingConditionalReplicaDistribution.class); + + private static final int NUM_SERVERS = 1000; + private static final int NUM_REGIONS = 20_000; + private static final int NUM_REPLICAS = 3; + private static final int NUM_TABLES = 100; + + private static final ServerName[] servers = new ServerName[NUM_SERVERS]; + private static final Map<ServerName, List<RegionInfo>> serverToRegions = new HashMap<>(); + + @BeforeClass + public static void setup() { + // Initialize servers + for (int i = 0; i < NUM_SERVERS; i++) { + servers[i] = ServerName.valueOf("server" + i, i, System.currentTimeMillis()); + serverToRegions.put(servers[i], new ArrayList<>()); + } + + // Create primary regions and their replicas + List<RegionInfo> allRegions = new ArrayList<>(); + for (int i = 0; i < NUM_REGIONS; i++) { + TableName tableName = getTableName(i); + // Define startKey and endKey for the region + byte[] startKey = Bytes.toBytes(i); + byte[] endKey = Bytes.toBytes(i + 1); + + // Create 3 replicas for each primary region + for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) { + RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey) + .setEndKey(endKey).setReplicaId(replicaId).build(); + allRegions.add(regionInfo); + } + } + + // Assign all regions to one server + for (RegionInfo regionInfo : allRegions) { + serverToRegions.get(servers[0]).add(regionInfo); + } + } + + private static TableName getTableName(int i) { + return TableName.valueOf("userTable" + i % NUM_TABLES); + } + + @Test + public void testReplicaDistribution() { + Configuration conf = new Configuration(); + DistributeReplicasTestConditional.enableConditionalReplicaDistributionForTest(conf); + conf.setBoolean(ReplicaKeyCache.CACHE_REPLICA_KEYS_KEY, true); + conf.setInt(ReplicaKeyCache.REPLICA_KEY_CACHE_SIZE_KEY, Integer.MAX_VALUE); + conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 30_000); + + // turn off replica cost functions + conf.setLong("hbase.master.balancer.stochastic.regionReplicaRackCostKey", 0); + conf.setLong("hbase.master.balancer.stochastic.regionReplicaHostCostKey", 0); + + runBalancerToExhaustion(conf, serverToRegions, + Set.of(CandidateGeneratorTestUtil::areAllReplicasDistributed), 10.0f); + LOG.info("Meta table and system table regions are successfully isolated, " + + "meanwhile region replicas are appropriately distributed across RegionServers."); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestReplicaDistributionBalancerConditional.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestReplicaDistributionBalancerConditional.java new file mode 100644 index 00000000000..0aae720f838 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestReplicaDistributionBalancerConditional.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import static org.apache.hadoop.hbase.master.balancer.BalancerConditionalsTestUtil.validateAssertionsWithRetries; + +import java.util.List; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({ LargeTests.class, MasterTests.class }) +public class TestReplicaDistributionBalancerConditional { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestReplicaDistributionBalancerConditional.class); + + private static final Logger LOG = + LoggerFactory.getLogger(TestReplicaDistributionBalancerConditional.class); + private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + private static final int REPLICAS = 3; + private static final int NUM_SERVERS = REPLICAS; + private static final int REGIONS_PER_SERVER = 5; + + @Before + public void setUp() throws Exception { + DistributeReplicasTestConditional + .enableConditionalReplicaDistributionForTest(TEST_UTIL.getConfiguration()); + TEST_UTIL.getConfiguration() + .setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); + TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_BALANCER_PERIOD, 1000L); + TEST_UTIL.getConfiguration().setBoolean("hbase.master.balancer.stochastic.runMaxSteps", true); + + // turn off replica cost functions + TEST_UTIL.getConfiguration() + .setLong("hbase.master.balancer.stochastic.regionReplicaRackCostKey", 0); + TEST_UTIL.getConfiguration() + .setLong("hbase.master.balancer.stochastic.regionReplicaHostCostKey", 0); + + TEST_UTIL.startMiniCluster(NUM_SERVERS); + } + + @After + public void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testReplicaDistribution() throws Exception { + Connection connection = TEST_UTIL.getConnection(); + Admin admin = connection.getAdmin(); + + // Create a "replicated_table" with region replicas + TableName replicatedTableName = TableName.valueOf("replicated_table"); + TableDescriptor replicatedTableDescriptor = + TableDescriptorBuilder.newBuilder(replicatedTableName) + .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("0")).build()) + .setRegionReplication(REPLICAS).build(); + admin.createTable(replicatedTableDescriptor, + BalancerConditionalsTestUtil.generateSplits(REGIONS_PER_SERVER * NUM_SERVERS)); + + // Pause the balancer + admin.balancerSwitch(false, true); + + // Collect all region replicas and place them on one RegionServer + List<RegionInfo> allRegions = admin.getRegions(replicatedTableName); + String targetServer = + TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName().getServerName(); + + for (RegionInfo region : allRegions) { + admin.move(region.getEncodedNameAsBytes(), Bytes.toBytes(targetServer)); + } + + BalancerConditionalsTestUtil.printRegionLocations(TEST_UTIL.getConnection()); + validateAssertionsWithRetries(TEST_UTIL, false, () -> BalancerConditionalsTestUtil + .validateReplicaDistribution(connection, replicatedTableName, false)); + + // Unpause the balancer and trigger balancing + admin.balancerSwitch(true, true); + admin.balance(); + + validateAssertionsWithRetries(TEST_UTIL, true, () -> BalancerConditionalsTestUtil + .validateReplicaDistribution(connection, replicatedTableName, true)); + BalancerConditionalsTestUtil.printRegionLocations(TEST_UTIL.getConnection()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerHeterogeneousCost.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerHeterogeneousCost.java index 960783a8467..188efa64dcd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerHeterogeneousCost.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerHeterogeneousCost.java @@ -254,7 +254,7 @@ public class TestStochasticLoadBalancerHeterogeneousCost extends BalancerTestBas } @Override - protected CandidateGenerator getRandomGenerator() { + protected CandidateGenerator getRandomGenerator(BalancerClusterState cluster) { return fairRandomCandidateGenerator; } }