This is an automated email from the ASF dual-hosted git repository. jiajunwang pushed a commit to branch wagedRebalancer2 in repository https://gitbox.apache.org/repos/asf/helix.git
commit 86f1a000c2742ecca2a0dfec36f2f04525175dfc Author: Yi Wang <[email protected]> AuthorDate: Tue Sep 17 15:41:08 2019 -0700 Add the remaining implementation of ConstraintBasedAlgorithmFactory (#478) Implementation of ConstraintBasedAlgorithmFactory and the soft constraint weight model. Remove SoftConstraintWeightModel class. Get the rebalance preference and adjust the corresponding weight. Pass the preference keys instead of cluster config. --- .../rebalancer/waged/WagedRebalancer.java | 19 ++++--- .../constraints/ConstraintBasedAlgorithm.java | 21 +++++--- .../ConstraintBasedAlgorithmFactory.java | 38 ++++++++++---- .../constraints/SoftConstraintWeightModel.java | 58 ---------------------- .../stages/BestPossibleStateCalcStage.java | 5 +- .../constraints/TestConstraintBasedAlgorithm.java | 8 ++- 6 files changed, 58 insertions(+), 91 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java index 22cac7e..551239d 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java @@ -20,7 +20,6 @@ package org.apache.helix.controller.rebalancer.waged; */ import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -29,7 +28,6 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; -import com.google.common.annotations.VisibleForTesting; import org.apache.helix.HelixConstants; import org.apache.helix.HelixManager; import org.apache.helix.HelixRebalanceException; @@ -42,6 +40,7 @@ import org.apache.helix.controller.rebalancer.waged.model.ClusterModel; import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider; import org.apache.helix.controller.rebalancer.waged.model.OptimalAssignment; import org.apache.helix.controller.stages.CurrentStateOutput; +import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.IdealState; import org.apache.helix.model.Partition; import org.apache.helix.model.Resource; @@ -49,6 +48,9 @@ import org.apache.helix.model.ResourceAssignment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; + /** * Weight-Aware Globally-Even Distribute Rebalancer. * @see <a @@ -62,10 +64,8 @@ public class WagedRebalancer { // When any of the following change happens, the rebalancer needs to do a global rebalance which // contains 1. baseline recalculate, 2. partial rebalance that is based on the new baseline. private static final Set<HelixConstants.ChangeType> GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES = - Collections - .unmodifiableSet(new HashSet<>(Arrays.asList(HelixConstants.ChangeType.RESOURCE_CONFIG, - HelixConstants.ChangeType.CLUSTER_CONFIG, - HelixConstants.ChangeType.INSTANCE_CONFIG))); + ImmutableSet.of(HelixConstants.ChangeType.RESOURCE_CONFIG, + HelixConstants.ChangeType.CLUSTER_CONFIG, HelixConstants.ChangeType.INSTANCE_CONFIG); // The cluster change detector is a stateful object. // Make it static to avoid unnecessary reinitialization. private static final ThreadLocal<ResourceChangeDetector> CHANGE_DETECTOR_THREAD_LOCAL = @@ -79,13 +79,12 @@ public class WagedRebalancer { private final RebalanceAlgorithm _rebalanceAlgorithm; // ------------------------------------------------------------------------------------// - public WagedRebalancer(HelixManager helixManager) { + public WagedRebalancer(HelixManager helixManager, + Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) { this( // TODO init the metadata store according to their requirement when integrate, // or change to final static method if possible. - new AssignmentMetadataStore(helixManager), - // TODO parse the cluster setting - ConstraintBasedAlgorithmFactory.getInstance(), + new AssignmentMetadataStore(helixManager), ConstraintBasedAlgorithmFactory.getInstance(preferences), // Use DelayedAutoRebalancer as the mapping calculator for the final assignment output. // Mapping calculator will translate the best possible assignment into the applicable state // mapping based on the current states. diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java index 479fb78..89a3f29 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java @@ -53,14 +53,12 @@ import com.google.common.collect.Maps; class ConstraintBasedAlgorithm implements RebalanceAlgorithm { private static final Logger LOG = LoggerFactory.getLogger(ConstraintBasedAlgorithm.class); private final List<HardConstraint> _hardConstraints; - private final List<SoftConstraint> _softConstraints; - private final SoftConstraintWeightModel _softConstraintsWeightModel; + private final Map<SoftConstraint, Float> _softConstraints; ConstraintBasedAlgorithm(List<HardConstraint> hardConstraints, - List<SoftConstraint> softConstraints, SoftConstraintWeightModel softConstraintWeightModel) { + Map<SoftConstraint, Float> softConstraints) { _hardConstraints = hardConstraints; _softConstraints = softConstraints; - _softConstraintsWeightModel = softConstraintWeightModel; } @Override @@ -115,13 +113,22 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm { } Function<AssignableNode, Float> calculatePoints = - (candidateNode) -> _softConstraintsWeightModel.getSumOfScores(_softConstraints.stream() - .collect(Collectors.toMap(Function.identity(), softConstraint -> softConstraint - .getAssignmentNormalizedScore(candidateNode, replica, clusterContext)))); + (candidateNode) -> getAssignmentNormalizedScore(candidateNode, replica, clusterContext); return candidateNodes.stream().max(Comparator.comparing(calculatePoints)); } + private float getAssignmentNormalizedScore(AssignableNode node, AssignableReplica replica, + ClusterContext clusterContext) { + float sum = 0; + for (Map.Entry<SoftConstraint, Float> softConstraintEntry : _softConstraints.entrySet()) { + SoftConstraint softConstraint = softConstraintEntry.getKey(); + float weight = softConstraintEntry.getValue(); + sum += weight * softConstraint.getAssignmentNormalizedScore(node, replica, clusterContext); + } + return sum; + } + private List<String> convertFailureReasons(List<HardConstraint> hardConstraints) { return hardConstraints.stream().map(HardConstraint::getDescription) .collect(Collectors.toList()); diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java index 895fa61..8568444 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java @@ -19,23 +19,41 @@ package org.apache.helix.controller.rebalancer.waged.constraints; * under the License. */ -import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm; import org.apache.helix.model.ClusterConfig; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +/** + * The factory class to create an instance of {@link ConstraintBasedAlgorithm} + */ public class ConstraintBasedAlgorithmFactory { - // TODO: the parameter comes from cluster config, will tune how these 2 integers will change the - // soft constraint weight model - public static RebalanceAlgorithm getInstance() { - // TODO initialize constraints, depending on constraints implementations PRs - List<HardConstraint> hardConstraints = new ArrayList<>(); - List<SoftConstraint> softConstraints = new ArrayList<>(); - SoftConstraintWeightModel softConstraintWeightModel = new SoftConstraintWeightModel(); + public static RebalanceAlgorithm getInstance( + Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) { + List<HardConstraint> hardConstraints = + ImmutableList.of(new FaultZoneAwareConstraint(), new NodeCapacityConstraint(), + new ReplicaActivateConstraint(), new NodeMaxPartitionLimitConstraint(), + new ValidGroupTagConstraint(), new SamePartitionOnInstanceConstraint()); + + int evennessPreference = + preferences.getOrDefault(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, 1); + int movementPreference = + preferences.getOrDefault(ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT, 1); + float evennessRatio = (float) evennessPreference / (evennessPreference + movementPreference); + float movementRatio = (float) movementPreference / (evennessPreference + movementPreference); + + Map<SoftConstraint, Float> softConstraints = ImmutableMap.<SoftConstraint, Float> builder() + .put(new PartitionMovementConstraint(), movementRatio) + .put(new InstancePartitionsCountConstraint(), 0.3f * evennessRatio) + .put(new ResourcePartitionAntiAffinityConstraint(), 0.1f * evennessRatio) + .put(new ResourceTopStateAntiAffinityConstraint(), 0.1f * evennessRatio) + .put(new MaxCapacityUsageInstanceConstraint(), 0.5f * evennessRatio).build(); - return new ConstraintBasedAlgorithm(hardConstraints, softConstraints, - softConstraintWeightModel); + return new ConstraintBasedAlgorithm(hardConstraints, softConstraints); } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/SoftConstraintWeightModel.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/SoftConstraintWeightModel.java deleted file mode 100644 index 953005c..0000000 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/SoftConstraintWeightModel.java +++ /dev/null @@ -1,58 +0,0 @@ -package org.apache.helix.controller.rebalancer.waged.constraints; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.Map; - -import com.google.common.collect.ImmutableMap; - -/** - * The class retrieves the offline model that defines the relative importance of soft constraints. - */ -class SoftConstraintWeightModel { - private static Map<Class, Float> MODEL; - - // TODO either define the weights in property files or zookeeper node or static human input - SoftConstraintWeightModel() { - - } - - static { - // TODO update the weight - MODEL = ImmutableMap.<Class, Float> builder().put(InstancePartitionsCountConstraint.class, 1.0f) - .build(); - } - - /** - * Get the sum of normalized scores, given calculated scores map of soft constraints - * @param originScoresMap The origin scores by soft constraints - * @return The sum of soft constraints scores - */ - float getSumOfScores(Map<SoftConstraint, Float> originScoresMap) { - float sum = 0; - for (Map.Entry<SoftConstraint, Float> softConstraintScoreEntry : originScoresMap.entrySet()) { - SoftConstraint softConstraint = softConstraintScoreEntry.getKey(); - float weight = MODEL.get(softConstraint.getClass()); - sum += softConstraintScoreEntry.getValue() * weight; - } - - return sum; - } -} diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java index ba4da88..806ef85 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java @@ -40,6 +40,7 @@ import org.apache.helix.controller.rebalancer.Rebalancer; import org.apache.helix.controller.rebalancer.SemiAutoRebalancer; import org.apache.helix.controller.rebalancer.internal.MappingCalculator; import org.apache.helix.controller.rebalancer.waged.WagedRebalancer; +import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.MaintenanceSignal; @@ -121,7 +122,9 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage { // configured to use the WAGED rebalancer. // For the other resources, the legacy rebalancers will be triggered in the next step. Map<String, IdealState> newIdealStates = new HashMap<>(); - WagedRebalancer wagedRebalancer = new WagedRebalancer(helixManager); + Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences = cache.getClusterConfig() + .getGlobalRebalancePreference(); + WagedRebalancer wagedRebalancer = new WagedRebalancer(helixManager, preferences); try { newIdealStates .putAll(wagedRebalancer.computeNewIdealStates(cache, resourceMap, currentStateOutput)); diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestConstraintBasedAlgorithm.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestConstraintBasedAlgorithm.java index 0e61eb3..b2deaef 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestConstraintBasedAlgorithm.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestConstraintBasedAlgorithm.java @@ -34,6 +34,7 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; public class TestConstraintBasedAlgorithm { private ConstraintBasedAlgorithm _algorithm; @@ -42,12 +43,11 @@ public class TestConstraintBasedAlgorithm { public void beforeMethod() { HardConstraint mockHardConstraint = mock(HardConstraint.class); SoftConstraint mockSoftConstraint = mock(SoftConstraint.class); - SoftConstraintWeightModel mockSoftConstraintWeightModel = mock(SoftConstraintWeightModel.class); when(mockHardConstraint.isAssignmentValid(any(), any(), any())).thenReturn(false); when(mockSoftConstraint.getAssignmentNormalizedScore(any(), any(), any())).thenReturn(1.0f); _algorithm = new ConstraintBasedAlgorithm(ImmutableList.of(mockHardConstraint), - ImmutableList.of(mockSoftConstraint), mockSoftConstraintWeightModel); + ImmutableMap.of(mockSoftConstraint, 1f)); } @Test(expectedExceptions = HelixRebalanceException.class) @@ -60,12 +60,10 @@ public class TestConstraintBasedAlgorithm { public void testCalculateWithValidAssignment() throws IOException, HelixRebalanceException { HardConstraint mockHardConstraint = mock(HardConstraint.class); SoftConstraint mockSoftConstraint = mock(SoftConstraint.class); - SoftConstraintWeightModel mockSoftConstraintWeightModel = mock(SoftConstraintWeightModel.class); when(mockHardConstraint.isAssignmentValid(any(), any(), any())).thenReturn(true); when(mockSoftConstraint.getAssignmentNormalizedScore(any(), any(), any())).thenReturn(1.0f); - when(mockSoftConstraintWeightModel.getSumOfScores(any())).thenReturn(1.0f); _algorithm = new ConstraintBasedAlgorithm(ImmutableList.of(mockHardConstraint), - ImmutableList.of(mockSoftConstraint), mockSoftConstraintWeightModel); + ImmutableMap.of(mockSoftConstraint, 1f)); ClusterModel clusterModel = new ClusterModelTestHelper().getDefaultClusterModel(); OptimalAssignment optimalAssignment = _algorithm.calculate(clusterModel);
