This is an automated email from the ASF dual-hosted git repository. jiajunwang pushed a commit to branch wagedImprove in repository https://gitbox.apache.org/repos/asf/helix.git
commit 89877b8f7fe2ce8ca6471a425edd77500c9458db Author: Neal Sun <[email protected]> AuthorDate: Mon Mar 8 16:44:31 2021 -0800 New PartitionMovementConstraint and BaselineInfluenceConstraint for Waged (#1658) This PR splits PartitionMovementConstraint into separate constraints that control baseline convergence and best possible movements respectively. Co-authored-by: Neal Sun <[email protected]> --- .../AbstractPartitionMovementConstraint.java | 86 ++++++++++++++++ .../constraints/BaselineInfluenceConstraint.java | 50 ++++++++++ .../ConstraintBasedAlgorithmFactory.java | 39 ++++++-- .../constraints/PartitionMovementConstraint.java | 83 ++------------- .../java/org/apache/helix/model/ClusterConfig.java | 31 ++++-- .../TestPartitionMovementConstraint.java | 111 +++++++-------------- .../org/apache/helix/model/TestClusterConfig.java | 23 ++++- 7 files changed, 253 insertions(+), 170 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/AbstractPartitionMovementConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/AbstractPartitionMovementConstraint.java new file mode 100644 index 0000000..913e042 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/AbstractPartitionMovementConstraint.java @@ -0,0 +1,86 @@ +package org.apache.helix.controller.rebalancer.waged.constraints; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Collections; +import java.util.Map; + +import org.apache.helix.controller.rebalancer.waged.model.AssignableNode; +import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica; +import org.apache.helix.controller.rebalancer.waged.model.ClusterContext; +import org.apache.helix.model.Partition; +import org.apache.helix.model.ResourceAssignment; + +/** + * Evaluate the proposed assignment according to the potential partition movements cost. + * The cost is evaluated based on the difference between the old assignment and the new assignment. + * Any change from the old assignment will increase the partition movements cost, so that the + * evaluated score will become lower. + */ +abstract class AbstractPartitionMovementConstraint extends SoftConstraint { + protected static final double MAX_SCORE = 1f; + protected static final double MIN_SCORE = 0f; + + private static final double STATE_TRANSITION_COST_FACTOR = 0.5; + + AbstractPartitionMovementConstraint() { + super(MAX_SCORE, MIN_SCORE); + } + + /** + * @return MAX_SCORE if the proposed assignment completely matches the previous assignment. + * StateTransitionCostFactor if the proposed assignment's allocation matches the + * previous assignment but state does not match. + * MIN_SCORE if the proposed assignment completely doesn't match the previous one. + */ + @Override + protected abstract double getAssignmentScore(AssignableNode node, AssignableReplica replica, + ClusterContext clusterContext); + + protected Map<String, String> getStateMap(AssignableReplica replica, + Map<String, ResourceAssignment> assignment) { + String resourceName = replica.getResourceName(); + String partitionName = replica.getPartitionName(); + if (assignment == null || !assignment.containsKey(resourceName)) { + return Collections.emptyMap(); + } + return assignment.get(resourceName).getReplicaMap(new Partition(partitionName)); + } + + protected double calculateAssignmentScore(String nodeName, String state, + Map<String, String> instanceToStateMap) { + if (instanceToStateMap.containsKey(nodeName)) { + // The score when the proposed allocation partially matches the assignment plan but will + // require a state transition. + double scoreWithStateTransitionCost = + MIN_SCORE + (MAX_SCORE - MIN_SCORE) * STATE_TRANSITION_COST_FACTOR; + // if state matches, no state transition required for the proposed assignment; if state does + // not match, then the proposed assignment requires state transition. + return state.equals(instanceToStateMap.get(nodeName)) ? MAX_SCORE + : scoreWithStateTransitionCost; + } + return MIN_SCORE; + } + + @Override + protected NormalizeFunction getNormalizeFunction() { + return (score) -> score; + } +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/BaselineInfluenceConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/BaselineInfluenceConstraint.java new file mode 100644 index 0000000..5e3fcd2 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/BaselineInfluenceConstraint.java @@ -0,0 +1,50 @@ +package org.apache.helix.controller.rebalancer.waged.constraints; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Map; + +import org.apache.helix.controller.rebalancer.waged.model.AssignableNode; +import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica; +import org.apache.helix.controller.rebalancer.waged.model.ClusterContext; + + +/** + * Evaluate the proposed assignment according to the potential partition movements cost based on + * the baseline assignment's influence. + * This constraint promotes movements for evenness. If best possible doesn't exist, baseline will be + * used to restrict movements, so this constraint should give no score in that case. + */ +public class BaselineInfluenceConstraint extends AbstractPartitionMovementConstraint { + @Override + protected double getAssignmentScore(AssignableNode node, AssignableReplica replica, + ClusterContext clusterContext) { + Map<String, String> bestPossibleAssignment = + getStateMap(replica, clusterContext.getBestPossibleAssignment()); + if (bestPossibleAssignment.isEmpty()) { + return getMinScore(); + } + + Map<String, String> baselineAssignment = + getStateMap(replica, clusterContext.getBaselineAssignment()); + return calculateAssignmentScore(node.getInstanceName(), replica.getReplicaState(), + baselineAssignment); + } +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java index 33aa6c8..032c7b5 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java @@ -39,12 +39,17 @@ public class ConstraintBasedAlgorithmFactory { { // The default setting put(PartitionMovementConstraint.class.getSimpleName(), 2f); + put(BaselineInfluenceConstraint.class.getSimpleName(), 0.5f); put(InstancePartitionsCountConstraint.class.getSimpleName(), 1f); put(ResourcePartitionAntiAffinityConstraint.class.getSimpleName(), 1f); put(TopStateMaxCapacityUsageInstanceConstraint.class.getSimpleName(), 3f); put(MaxCapacityUsageInstanceConstraint.class.getSimpleName(), 6f); } }; + // The weight for BaselineInfluenceConstraint used when we are forcing a baseline converge. This + // number, multiplied by the max score returned by BaselineInfluenceConstraint, must be greater + // than the total maximum sum of all other constraints, in order to overpower other constraints. + private static final float FORCE_BASELINE_CONVERGE_WEIGHT = 10000f; static { Properties properties = @@ -61,23 +66,37 @@ public class ConstraintBasedAlgorithmFactory { new ReplicaActivateConstraint(), new NodeMaxPartitionLimitConstraint(), new ValidGroupTagConstraint(), new SamePartitionOnInstanceConstraint()); - int evennessPreference = - preferences.getOrDefault(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, 1); - int movementPreference = - preferences.getOrDefault(ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT, 1); + int evennessPreference = preferences + .getOrDefault(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, + ClusterConfig.DEFAULT_GLOBAL_REBALANCE_PREFERENCE + .get(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS)); + int movementPreference = preferences + .getOrDefault(ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT, + ClusterConfig.DEFAULT_GLOBAL_REBALANCE_PREFERENCE + .get(ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT)); + boolean forceBaselineConverge = preferences + .getOrDefault(ClusterConfig.GlobalRebalancePreferenceKey.FORCE_BASELINE_CONVERGE, 0) + > 0; List<SoftConstraint> softConstraints = ImmutableList - .of(new PartitionMovementConstraint(), new InstancePartitionsCountConstraint(), - new ResourcePartitionAntiAffinityConstraint(), + .of(new PartitionMovementConstraint(), new BaselineInfluenceConstraint(), + new InstancePartitionsCountConstraint(), new ResourcePartitionAntiAffinityConstraint(), new TopStateMaxCapacityUsageInstanceConstraint(), new MaxCapacityUsageInstanceConstraint()); Map<SoftConstraint, Float> softConstraintsWithWeight = Maps.toMap(softConstraints, key -> { - String name = key.getClass().getSimpleName(); - float weight = MODEL.get(name); - return name.equals(PartitionMovementConstraint.class.getSimpleName()) ? - movementPreference * weight : evennessPreference * weight; + if (key instanceof BaselineInfluenceConstraint && forceBaselineConverge) { + return FORCE_BASELINE_CONVERGE_WEIGHT; + } + + float weight = MODEL.get(key.getClass().getSimpleName()); + // Note that BaselineInfluenceConstraint is a constraint that promotes movement for evenness, + // and is therefore controlled by the evenness preference. Only PartitionMovementConstraint + // contributes to less movement. + return key instanceof PartitionMovementConstraint ? movementPreference * weight + : evennessPreference * weight; }); + return new ConstraintBasedAlgorithm(hardConstraints, softConstraintsWithWeight); } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/PartitionMovementConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/PartitionMovementConstraint.java index 351e33d..08c135d 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/PartitionMovementConstraint.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/PartitionMovementConstraint.java @@ -19,47 +19,20 @@ package org.apache.helix.controller.rebalancer.waged.constraints; * under the License. */ -import java.util.Collections; import java.util.Map; import org.apache.helix.controller.rebalancer.waged.model.AssignableNode; import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica; import org.apache.helix.controller.rebalancer.waged.model.ClusterContext; -import org.apache.helix.model.Partition; -import org.apache.helix.model.ResourceAssignment; + /** - * Evaluate the proposed assignment according to the potential partition movements cost. - * The cost is evaluated based on the difference between the old assignment and the new assignment. - * In detail, we consider the following two previous assignments as the base. - * - Baseline assignment that is calculated regardless of the node state (online/offline). - * - Previous Best Possible assignment. - * Any change to these two assignments will increase the partition movements cost, so that the - * evaluated score will become lower. + * Evaluate the proposed assignment according to the potential partition movements cost based on + * the previous best possible assignment. + * The previous best possible assignment is the sole reference; if it's missing, it means the + * replica belongs to a newly added resource, so baseline assignment should be used instead. */ -class PartitionMovementConstraint extends SoftConstraint { - private static final double MAX_SCORE = 1f; - private static final double MIN_SCORE = 0f; - // The scale factor to adjust score when the proposed allocation partially matches the assignment - // plan but will require a state transition (with partition movement). - // TODO: these factors will be tuned based on user's preference - private static final double STATE_TRANSITION_COST_FACTOR = 0.5; - private static final double MOVEMENT_COST_FACTOR = 0.25; - - PartitionMovementConstraint() { - super(MAX_SCORE, MIN_SCORE); - } - - /** - * @return 1 if the proposed assignment completely matches the previous best possible assignment - * (or baseline assignment if the replica is newly added). - * STATE_TRANSITION_COST_FACTOR if the proposed assignment's allocation matches the - * previous Best Possible assignment (or baseline assignment if the replica is newly - * added) but state does not match. - * MOVEMENT_COST_FACTOR if the proposed assignment's allocation matches the baseline - * assignment only, but not matches the previous best possible assignment. - * 0 if the proposed assignment is a pure random movement. - */ +public class PartitionMovementConstraint extends AbstractPartitionMovementConstraint { @Override protected double getAssignmentScore(AssignableNode node, AssignableReplica replica, ClusterContext clusterContext) { @@ -71,48 +44,10 @@ class PartitionMovementConstraint extends SoftConstraint { String state = replica.getReplicaState(); if (bestPossibleAssignment.isEmpty()) { - // If bestPossibleAssignment of the replica is empty, indicating this is a new replica. - // Then the baseline is the only reference. + // if best possible is missing, it means the replica belongs to a newly added resource, so + // baseline assignment should be used instead. return calculateAssignmentScore(nodeName, state, baselineAssignment); - } else { - // Else, for minimizing partition movements or state transitions, prioritize the proposed - // assignment that matches the previous Best Possible assignment. - double score = calculateAssignmentScore(nodeName, state, bestPossibleAssignment); - // If no Best Possible assignment matches, check the baseline assignment. - if (score == 0 && baselineAssignment.containsKey(nodeName)) { - // Although not desired, the proposed assignment that matches the baseline is still better - // than a random movement. So try to evaluate the score with the MOVEMENT_COST_FACTOR - // punishment. - score = MOVEMENT_COST_FACTOR; - } - return score; - } - } - - private Map<String, String> getStateMap(AssignableReplica replica, - Map<String, ResourceAssignment> assignment) { - String resourceName = replica.getResourceName(); - String partitionName = replica.getPartitionName(); - if (assignment == null || !assignment.containsKey(resourceName)) { - return Collections.emptyMap(); } - return assignment.get(resourceName).getReplicaMap(new Partition(partitionName)); - } - - private double calculateAssignmentScore(String nodeName, String state, - Map<String, String> instanceToStateMap) { - if (instanceToStateMap.containsKey(nodeName)) { - return state.equals(instanceToStateMap.get(nodeName)) ? - 1 : // if state matches, no state transition required for the proposed assignment - STATE_TRANSITION_COST_FACTOR; // if state does not match, - // then the proposed assignment requires state transition. - } - return 0; - } - - @Override - protected NormalizeFunction getNormalizeFunction() { - // PartitionMovementConstraint already scale the score properly. - return (score) -> score; + return calculateAssignmentScore(nodeName, state, bestPossibleAssignment); } } diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java index 492ac7f..ccb1684 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java @@ -138,8 +138,10 @@ public class ClusterConfig extends HelixProperty { } public enum GlobalRebalancePreferenceKey { + // EVENNESS and LESS_MOVEMENT must be both specified EVENNESS, - LESS_MOVEMENT + LESS_MOVEMENT, + FORCE_BASELINE_CONVERGE, } private final static int DEFAULT_MAX_CONCURRENT_TASK_PER_INSTANCE = 40; @@ -160,7 +162,8 @@ public class ClusterConfig extends HelixProperty { DEFAULT_GLOBAL_REBALANCE_PREFERENCE = ImmutableMap.<GlobalRebalancePreferenceKey, Integer>builder() .put(GlobalRebalancePreferenceKey.EVENNESS, 1) - .put(GlobalRebalancePreferenceKey.LESS_MOVEMENT, 1).build(); + .put(GlobalRebalancePreferenceKey.LESS_MOVEMENT, 1) + .put(GlobalRebalancePreferenceKey.FORCE_BASELINE_CONVERGE, 0).build(); private final static int MAX_REBALANCE_PREFERENCE = 10; private final static int MIN_REBALANCE_PREFERENCE = 0; public final static boolean DEFAULT_GLOBAL_REBALANCE_ASYNC_MODE_ENABLED = true; @@ -862,14 +865,22 @@ public class ClusterConfig extends HelixProperty { /** * Set the global rebalancer's assignment preference. - * @param preference A map of the GlobalRebalancePreferenceKey and the corresponding weight. - * The ratio of the configured weights will determine the rebalancer's behavior. + * @param preference A map of the GlobalRebalancePreferenceKey and the corresponding weights. + * The weights will determine the rebalancer's behavior. Note that + * GlobalRebalancePreferenceKey.EVENNESS and + * GlobalRebalancePreferenceKey.LESS_MOVEMENT must be both specified or not + * specified, or an exception will be thrown. * If null, the preference item will be removed from the config. */ public void setGlobalRebalancePreference(Map<GlobalRebalancePreferenceKey, Integer> preference) { if (preference == null) { _record.getMapFields().remove(ClusterConfigProperty.REBALANCE_PREFERENCE.name()); } else { + if (preference.containsKey(GlobalRebalancePreferenceKey.EVENNESS) != preference + .containsKey(GlobalRebalancePreferenceKey.LESS_MOVEMENT)) { + throw new IllegalArgumentException("GlobalRebalancePreferenceKey.EVENNESS and " + + "GlobalRebalancePreferenceKey.LESS_MOVEMENT must be both specified or not specified"); + } Map<String, String> preferenceMap = new HashMap<>(); preference.entrySet().stream().forEach(entry -> { if (entry.getValue() > MAX_REBALANCE_PREFERENCE @@ -893,11 +904,15 @@ public class ClusterConfig extends HelixProperty { if (preferenceStrMap != null && !preferenceStrMap.isEmpty()) { Map<GlobalRebalancePreferenceKey, Integer> preference = new HashMap<>(); for (GlobalRebalancePreferenceKey key : GlobalRebalancePreferenceKey.values()) { - if (!preferenceStrMap.containsKey(key.name())) { - // If any key is not configured with a value, return the default config. - return DEFAULT_GLOBAL_REBALANCE_PREFERENCE; + if (preferenceStrMap.containsKey(key.name())) { + preference.put(key, Integer.parseInt(preferenceStrMap.get(key.name()))); } - preference.put(key, Integer.parseInt(preferenceStrMap.get(key.name()))); + } + // In case this map is set incorrectly, check for both attributes to ensure strong pairing + if (preference.containsKey(GlobalRebalancePreferenceKey.EVENNESS) != preference + .containsKey(GlobalRebalancePreferenceKey.LESS_MOVEMENT)) { + preference.remove(GlobalRebalancePreferenceKey.EVENNESS); + preference.remove(GlobalRebalancePreferenceKey.LESS_MOVEMENT); } return preference; } diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestPartitionMovementConstraint.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestPartitionMovementConstraint.java index d36f629..16c1994 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestPartitionMovementConstraint.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestPartitionMovementConstraint.java @@ -42,7 +42,8 @@ public class TestPartitionMovementConstraint { private AssignableNode _testNode; private AssignableReplica _testReplica; private ClusterContext _clusterContext; - private SoftConstraint _constraint = new PartitionMovementConstraint(); + private SoftConstraint _baselineInfluenceConstraint = new BaselineInfluenceConstraint(); + private SoftConstraint _partitionMovementConstraint = new PartitionMovementConstraint(); @BeforeMethod public void init() { @@ -58,42 +59,33 @@ public class TestPartitionMovementConstraint { public void testGetAssignmentScoreWhenBestPossibleBaselineMissing() { when(_clusterContext.getBaselineAssignment()).thenReturn(Collections.emptyMap()); when(_clusterContext.getBestPossibleAssignment()).thenReturn(Collections.emptyMap()); - double score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext); - double normalizedScore = - _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext); - Assert.assertEquals(score, 0.0); - Assert.assertEquals(normalizedScore, 0.0); + + verifyScore(_baselineInfluenceConstraint, _testNode, _testReplica, _clusterContext, 0.0, 0.0); + verifyScore(_partitionMovementConstraint, _testNode, _testReplica, _clusterContext, 0.0, 0.0); } @Test - public void testGetAssignmentScoreWhenBestPossibleBaselineSame() { + public void testGetAssignmentScoreWhenBestPossibleMissing() { ResourceAssignment mockResourceAssignment = mock(ResourceAssignment.class); when(mockResourceAssignment.getReplicaMap(new Partition(PARTITION))) .thenReturn(ImmutableMap.of(INSTANCE, "Master")); Map<String, ResourceAssignment> assignmentMap = ImmutableMap.of(RESOURCE, mockResourceAssignment); when(_clusterContext.getBaselineAssignment()).thenReturn(assignmentMap); - when(_clusterContext.getBestPossibleAssignment()).thenReturn(assignmentMap); + when(_clusterContext.getBestPossibleAssignment()).thenReturn(Collections.emptyMap()); // when the calculated states are both equal to the replica's current state when(_testReplica.getReplicaState()).thenReturn("Master"); - double score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext); - double normalizedScore = - _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext); + verifyScore(_baselineInfluenceConstraint, _testNode, _testReplica, _clusterContext, 0.0, 0.0); + verifyScore(_partitionMovementConstraint, _testNode, _testReplica, _clusterContext, 1.0, 1.0); - Assert.assertEquals(score, 1.0); - Assert.assertEquals(normalizedScore, 1.0); // when the calculated states are both different from the replica's current state when(_testReplica.getReplicaState()).thenReturn("Slave"); - score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext); - normalizedScore = - _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext); - - Assert.assertEquals(score, 0.5); - Assert.assertEquals(normalizedScore, 0.5); + verifyScore(_baselineInfluenceConstraint, _testNode, _testReplica, _clusterContext, 0.0, 0.0); + verifyScore(_partitionMovementConstraint, _testNode, _testReplica, _clusterContext, 0.5, 0.5); } @Test - public void testGetAssignmentScoreWhenBestPossibleBaselineOpposite() { + public void testGetAssignmentScore() { String instanceNameA = INSTANCE + "A"; String instanceNameB = INSTANCE + "B"; String instanceNameC = INSTANCE + "C"; @@ -110,72 +102,45 @@ public class TestPartitionMovementConstraint { when(_clusterContext.getBaselineAssignment()) .thenReturn(ImmutableMap.of(RESOURCE, baselineResourceAssignment)); - // when the replica's state matches with best possible + // when the replica's state matches with best possible, allocation matches with baseline when(testAssignableNode.getInstanceName()).thenReturn(instanceNameA); when(_testReplica.getReplicaState()).thenReturn("Master"); - double score = - _constraint.getAssignmentScore(testAssignableNode, _testReplica, _clusterContext); - double normalizedScore = - _constraint.getAssignmentNormalizedScore(testAssignableNode, _testReplica, _clusterContext); - Assert.assertEquals(score, 1.0); - Assert.assertEquals(normalizedScore, 1.0); - - // when the replica's allocation matches with best possible + verifyScore(_baselineInfluenceConstraint, testAssignableNode, _testReplica, _clusterContext, + 0.5, 0.5); + verifyScore(_partitionMovementConstraint, testAssignableNode, _testReplica, _clusterContext, + 1.0, 1.0); + + // when the replica's allocation matches with best possible only when(testAssignableNode.getInstanceName()).thenReturn(instanceNameB); when(_testReplica.getReplicaState()).thenReturn("Master"); - score = _constraint.getAssignmentScore(testAssignableNode, _testReplica, _clusterContext); - normalizedScore = - _constraint.getAssignmentNormalizedScore(testAssignableNode, _testReplica, _clusterContext); - Assert.assertEquals(score, 0.5); - Assert.assertEquals(normalizedScore, 0.5); + verifyScore(_baselineInfluenceConstraint, testAssignableNode, _testReplica, _clusterContext, + 0.0, 0.0); + verifyScore(_partitionMovementConstraint, testAssignableNode, _testReplica, _clusterContext, + 0.5, 0.5); // when the replica's state matches with baseline only when(testAssignableNode.getInstanceName()).thenReturn(instanceNameC); when(_testReplica.getReplicaState()).thenReturn("Master"); - score = _constraint.getAssignmentScore(testAssignableNode, _testReplica, _clusterContext); - normalizedScore = - _constraint.getAssignmentNormalizedScore(testAssignableNode, _testReplica, _clusterContext); - // The calculated score is lower than previous value cause the replica's state matches with - // best possible is preferred - Assert.assertEquals(score, 0.25); - Assert.assertEquals(normalizedScore, 0.25); + verifyScore(_baselineInfluenceConstraint, testAssignableNode, _testReplica, _clusterContext, + 1.0, 1.0); + verifyScore(_partitionMovementConstraint, testAssignableNode, _testReplica, _clusterContext, + 0.0, 0.0); // when the replica's allocation matches with baseline only when(testAssignableNode.getInstanceName()).thenReturn(instanceNameC); when(_testReplica.getReplicaState()).thenReturn("Slave"); - score = _constraint.getAssignmentScore(testAssignableNode, _testReplica, _clusterContext); - normalizedScore = - _constraint.getAssignmentNormalizedScore(testAssignableNode, _testReplica, _clusterContext); - // The calculated score is lower than previous value cause the replica's state matches with - // best possible is preferred - Assert.assertEquals(score, 0.25); - Assert.assertEquals(normalizedScore, 0.25); + verifyScore(_baselineInfluenceConstraint, testAssignableNode, _testReplica, _clusterContext, + 0.5, 0.5); + verifyScore(_partitionMovementConstraint, testAssignableNode, _testReplica, _clusterContext, + 0.0, 0.0); } - @Test - public void testGetAssignmentScoreWhenBestPossibleMissing() { - ResourceAssignment mockResourceAssignment = mock(ResourceAssignment.class); - when(mockResourceAssignment.getReplicaMap(new Partition(PARTITION))) - .thenReturn(ImmutableMap.of(INSTANCE, "Master")); - Map<String, ResourceAssignment> assignmentMap = - ImmutableMap.of(RESOURCE, mockResourceAssignment); - when(_clusterContext.getBaselineAssignment()).thenReturn(assignmentMap); - when(_clusterContext.getBestPossibleAssignment()).thenReturn(Collections.emptyMap()); - // when the calculated states are both equal to the replica's current state - when(_testReplica.getReplicaState()).thenReturn("Master"); - double score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext); - double normalizedScore = - _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext); - - Assert.assertEquals(score, 1.0); - Assert.assertEquals(normalizedScore, 1.0); - // when the calculated states are both different from the replica's current state - when(_testReplica.getReplicaState()).thenReturn("Slave"); - score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext); - normalizedScore = - _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext); - - Assert.assertEquals(score, 0.5); - Assert.assertEquals(normalizedScore, 0.5); + private static void verifyScore(SoftConstraint constraint, AssignableNode node, + AssignableReplica replica, ClusterContext clusterContext, double expectedScore, + double expectedNormalizedScore) { + double score = constraint.getAssignmentScore(node, replica, clusterContext); + double normalizedScore = constraint.getAssignmentNormalizedScore(node, replica, clusterContext); + Assert.assertEquals(score, expectedScore); + Assert.assertEquals(normalizedScore, expectedNormalizedScore); } } diff --git a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java index 8d6b0a2..3690ca4 100644 --- a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java +++ b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java @@ -122,13 +122,17 @@ public class TestClusterConfig { ClusterConfig testConfig = new ClusterConfig("testId"); Assert.assertEquals(testConfig.getGlobalRebalancePreference(), ClusterConfig.DEFAULT_GLOBAL_REBALANCE_PREFERENCE); + } - Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference = new HashMap<>(); - preference.put(EVENNESS, 5); - testConfig.setGlobalRebalancePreference(preference); + @Test + public void testGetRebalancePreferenceMissingKey() { + ClusterConfig testConfig = new ClusterConfig("testId"); + Map<String, String> preference = new HashMap<>(); + preference.put(EVENNESS.name(), String.valueOf(5)); + testConfig.getRecord() + .setMapField(ClusterConfig.ClusterConfigProperty.REBALANCE_PREFERENCE.name(), preference); - Assert.assertEquals(testConfig.getGlobalRebalancePreference(), - ClusterConfig.DEFAULT_GLOBAL_REBALANCE_PREFERENCE); + Assert.assertEquals(testConfig.getGlobalRebalancePreference(), Collections.emptyMap()); } @Test @@ -171,6 +175,15 @@ public class TestClusterConfig { testConfig.setGlobalRebalancePreference(preference); } + @Test(expectedExceptions = IllegalArgumentException.class) + public void testSetRebalancePreferenceMissingKey() { + Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference = new HashMap<>(); + preference.put(EVENNESS, 1); + + ClusterConfig testConfig = new ClusterConfig("testId"); + testConfig.setGlobalRebalancePreference(preference); + } + @Test public void testGetInstanceCapacityMap() { Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1, "item2", 2, "item3", 3);
