This is an automated email from the ASF dual-hosted git repository. jiajunwang pushed a commit to branch wagedImprove in repository https://gitbox.apache.org/repos/asf/helix.git
commit 5db7301f6eceed841f0fd44d0c7cc860cae2bc7f Author: Neal Sun <[email protected]> AuthorDate: Mon Mar 1 13:40:04 2021 -0800 Add TopStateUsage constraint to Waged (#1652) Add new top state weight-based constraint to Waged to ensure top state weight evenness. Co-authored-by: Neal Sun <[email protected]> --- .../ConstraintBasedAlgorithmFactory.java | 7 +-- .../MaxCapacityUsageInstanceConstraint.java | 3 +- ...opStateMaxCapacityUsageInstanceConstraint.java} | 19 ++++--- .../rebalancer/waged/model/AssignableNode.java | 61 ++++++++++++++++------ .../rebalancer/waged/model/ClusterContext.java | 36 ++++++++++--- .../stages/CurrentStateComputationStage.java | 2 +- .../TestMaxCapacityUsageInstanceConstraint.java | 2 +- ...opStateMaxCapacityUsageInstanceConstraint.java} | 12 +++-- .../rebalancer/waged/model/TestAssignableNode.java | 12 +++-- .../rebalancer/waged/model/TestClusterContext.java | 4 ++ .../WagedRebalancer/TestWagedRebalance.java | 10 ++++ 11 files changed, 123 insertions(+), 45 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java index 934bfa7..33aa6c8 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java @@ -41,8 +41,8 @@ public class ConstraintBasedAlgorithmFactory { put(PartitionMovementConstraint.class.getSimpleName(), 2f); put(InstancePartitionsCountConstraint.class.getSimpleName(), 1f); put(ResourcePartitionAntiAffinityConstraint.class.getSimpleName(), 1f); - put(ResourceTopStateAntiAffinityConstraint.class.getSimpleName(), 3f); - put(MaxCapacityUsageInstanceConstraint.class.getSimpleName(), 5f); + put(TopStateMaxCapacityUsageInstanceConstraint.class.getSimpleName(), 3f); + put(MaxCapacityUsageInstanceConstraint.class.getSimpleName(), 6f); } }; @@ -69,7 +69,8 @@ public class ConstraintBasedAlgorithmFactory { List<SoftConstraint> softConstraints = ImmutableList .of(new PartitionMovementConstraint(), new InstancePartitionsCountConstraint(), new ResourcePartitionAntiAffinityConstraint(), - new ResourceTopStateAntiAffinityConstraint(), new MaxCapacityUsageInstanceConstraint()); + new TopStateMaxCapacityUsageInstanceConstraint(), + new MaxCapacityUsageInstanceConstraint()); Map<SoftConstraint, Float> softConstraintsWithWeight = Maps.toMap(softConstraints, key -> { String name = key.getClass().getSimpleName(); float weight = MODEL.get(name); diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java index 8f41f5e..7d74c26 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java @@ -36,7 +36,8 @@ class MaxCapacityUsageInstanceConstraint extends UsageSoftConstraint { protected double getAssignmentScore(AssignableNode node, AssignableReplica replica, ClusterContext clusterContext) { float estimatedMaxUtilization = clusterContext.getEstimatedMaxUtilization(); - float projectedHighestUtilization = node.getProjectedHighestUtilization(replica.getCapacity()); + float projectedHighestUtilization = + node.getGeneralProjectedHighestUtilization(replica.getCapacity()); return computeUtilizationScore(estimatedMaxUtilization, projectedHighestUtilization); } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/TopStateMaxCapacityUsageInstanceConstraint.java similarity index 69% copy from helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java copy to helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/TopStateMaxCapacityUsageInstanceConstraint.java index 8f41f5e..1454253 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/MaxCapacityUsageInstanceConstraint.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/TopStateMaxCapacityUsageInstanceConstraint.java @@ -23,20 +23,25 @@ import org.apache.helix.controller.rebalancer.waged.model.AssignableNode; import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica; import org.apache.helix.controller.rebalancer.waged.model.ClusterContext; + /** - * The constraint evaluates the score by checking the max used capacity key out of all the capacity - * keys. + * Evaluate the proposed assignment according to the top state resource usage on the instance. * The higher the maximum usage value for the capacity key, the lower the score will be, implying * that it is that much less desirable to assign anything on the given node. * It is a greedy approach since it evaluates only on the most used capacity key. */ -class MaxCapacityUsageInstanceConstraint extends UsageSoftConstraint { - +class TopStateMaxCapacityUsageInstanceConstraint extends UsageSoftConstraint { @Override protected double getAssignmentScore(AssignableNode node, AssignableReplica replica, ClusterContext clusterContext) { - float estimatedMaxUtilization = clusterContext.getEstimatedMaxUtilization(); - float projectedHighestUtilization = node.getProjectedHighestUtilization(replica.getCapacity()); - return computeUtilizationScore(estimatedMaxUtilization, projectedHighestUtilization); + if (!replica.isReplicaTopState()) { + // For non top state replica, this constraint is not applicable. + // So return zero on any assignable node candidate. + return 0; + } + float estimatedTopStateMaxUtilization = clusterContext.getEstimatedTopStateMaxUtilization(); + float projectedHighestUtilization = + node.getTopStateProjectedHighestUtilization(replica.getCapacity()); + return computeUtilizationScore(estimatedTopStateMaxUtilization, projectedHighestUtilization); } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java index d3d014d..aae2328 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java @@ -62,6 +62,7 @@ public class AssignableNode implements Comparable<AssignableNode> { private Map<String, Map<String, AssignableReplica>> _currentAssignedReplicaMap; // A map of <capacity key, capacity value> that tracks the current available node capacity private Map<String, Integer> _remainingCapacity; + private Map<String, Integer> _remainingTopStateCapacity; /** * Update the node with a ClusterDataCache. This resets the current assignment and recalculates @@ -81,6 +82,7 @@ public class AssignableNode implements Comparable<AssignableNode> { // make a copy of max capacity _maxAllowedCapacity = ImmutableMap.copyOf(instanceCapacity); _remainingCapacity = new HashMap<>(instanceCapacity); + _remainingTopStateCapacity = new HashMap<>(instanceCapacity); _maxPartition = clusterConfig.getMaxPartitionsPerInstance(); _currentAssignedReplicaMap = new HashMap<>(); } @@ -92,12 +94,18 @@ public class AssignableNode implements Comparable<AssignableNode> { * Using this function avoids the overhead of updating capacity repeatedly. */ void assignInitBatch(Collection<AssignableReplica> replicas) { + Map<String, Integer> totalTopStatePartitionCapacity = new HashMap<>(); Map<String, Integer> totalPartitionCapacity = new HashMap<>(); for (AssignableReplica replica : replicas) { // TODO: the exception could occur in the middle of for loop and the previous added records cannot be reverted addToAssignmentRecord(replica); // increment the capacity requirement according to partition's capacity configuration. for (Map.Entry<String, Integer> capacity : replica.getCapacity().entrySet()) { + if (replica.isReplicaTopState()) { + totalTopStatePartitionCapacity.compute(capacity.getKey(), + (key, totalValue) -> (totalValue == null) ? capacity.getValue() + : totalValue + capacity.getValue()); + } totalPartitionCapacity.compute(capacity.getKey(), (key, totalValue) -> (totalValue == null) ? capacity.getValue() : totalValue + capacity.getValue()); @@ -105,9 +113,8 @@ public class AssignableNode implements Comparable<AssignableNode> { } // Update the global state after all single replications' calculation is done. - for (String capacityKey : totalPartitionCapacity.keySet()) { - updateRemainingCapacity(capacityKey, totalPartitionCapacity.get(capacityKey)); - } + updateRemainingCapacity(totalTopStatePartitionCapacity, _remainingTopStateCapacity, false); + updateRemainingCapacity(totalPartitionCapacity, _remainingCapacity, false); } /** @@ -116,8 +123,10 @@ public class AssignableNode implements Comparable<AssignableNode> { */ void assign(AssignableReplica assignableReplica) { addToAssignmentRecord(assignableReplica); - assignableReplica.getCapacity().entrySet().stream() - .forEach(capacity -> updateRemainingCapacity(capacity.getKey(), capacity.getValue())); + updateRemainingCapacity(assignableReplica.getCapacity(), _remainingCapacity, false); + if (assignableReplica.isReplicaTopState()) { + updateRemainingCapacity(assignableReplica.getCapacity(), _remainingTopStateCapacity, false); + } } /** @@ -146,8 +155,10 @@ public class AssignableNode implements Comparable<AssignableNode> { } AssignableReplica removedReplica = partitionMap.remove(partitionName); - removedReplica.getCapacity().entrySet().stream() - .forEach(entry -> updateRemainingCapacity(entry.getKey(), -1 * entry.getValue())); + updateRemainingCapacity(removedReplica.getCapacity(), _remainingCapacity, true); + if (removedReplica.isReplicaTopState()) { + updateRemainingCapacity(removedReplica.getCapacity(), _remainingTopStateCapacity, true); + } } /** @@ -228,11 +239,30 @@ public class AssignableNode implements Comparable<AssignableNode> { * @param newUsage the proposed new additional capacity usage. * @return The highest utilization number of the node among all the capacity category. */ - public float getProjectedHighestUtilization(Map<String, Integer> newUsage) { + public float getGeneralProjectedHighestUtilization(Map<String, Integer> newUsage) { + return getProjectedHighestUtilization(newUsage, _remainingCapacity); + } + + /** + * Return the most concerning capacity utilization number for evenly partition assignment. + * The method dynamically calculates the projected highest utilization number among all the + * capacity categories assuming the new capacity usage is added to the node. + * For example, if the current node usage is {CPU: 0.9, MEM: 0.4, DISK: 0.6}. Then this call shall + * return 0.9. + * This function returns projected highest utilization for only top state partitions. + * @param newUsage the proposed new additional capacity usage. + * @return The highest utilization number of the node among all the capacity category. + */ + public float getTopStateProjectedHighestUtilization(Map<String, Integer> newUsage) { + return getProjectedHighestUtilization(newUsage, _remainingTopStateCapacity); + } + + private float getProjectedHighestUtilization(Map<String, Integer> newUsage, + Map<String, Integer> remainingCapacity) { float highestCapacityUtilization = 0; for (String capacityKey : _maxAllowedCapacity.keySet()) { float capacityValue = _maxAllowedCapacity.get(capacityKey); - float utilization = (capacityValue - _remainingCapacity.get(capacityKey) + newUsage + float utilization = (capacityValue - remainingCapacity.get(capacityKey) + newUsage .getOrDefault(capacityKey, 0)) / capacityValue; highestCapacityUtilization = Math.max(highestCapacityUtilization, utilization); } @@ -311,13 +341,12 @@ public class AssignableNode implements Comparable<AssignableNode> { } } - private void updateRemainingCapacity(String capacityKey, int usage) { - if (!_remainingCapacity.containsKey(capacityKey)) { - //if the capacityKey belongs to replicas does not exist in the instance's capacity, - // it will be treated as if it has unlimited capacity of that capacityKey - return; - } - _remainingCapacity.put(capacityKey, _remainingCapacity.get(capacityKey) - usage); + private void updateRemainingCapacity(Map<String, Integer> usedCapacity, Map<String, Integer> remainingCapacity, + boolean isRelease) { + int multiplier = isRelease ? -1 : 1; + // if the used capacity key does not exist in the node's capacity, ignore it + usedCapacity.forEach((capacityKey, capacityValue) -> remainingCapacity.compute(capacityKey, + (key, value) -> value == null ? null : value - multiplier * capacityValue)); } /** diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java index 46392c9..5bfd4d0 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java @@ -43,6 +43,8 @@ public class ClusterContext { private final Map<String, Integer> _estimatedMaxPartitionByResource = new HashMap<>(); // This estimation helps to ensure global resource usage evenness. private final float _estimatedMaxUtilization; + // This estimation helps to ensure global resource top state usage evenness. + private final float _estimatedTopStateMaxUtilization; // map{zoneName : map{resourceName : set(partitionNames)}} private Map<String, Map<String, Set<String>>> _assignmentForFaultZoneMap = new HashMap<>(); @@ -63,6 +65,7 @@ public class ClusterContext { int totalReplicas = 0; int totalTopStateReplicas = 0; Map<String, Integer> totalUsage = new HashMap<>(); + Map<String, Integer> totalTopStateUsage = new HashMap<>(); Map<String, Integer> totalCapacity = new HashMap<>(); for (Map.Entry<String, List<AssignableReplica>> entry : replicaSet.stream() @@ -77,6 +80,9 @@ public class ClusterContext { for (AssignableReplica replica : entry.getValue()) { if (replica.isReplicaTopState()) { totalTopStateReplicas += 1; + replica.getCapacity().entrySet().stream().forEach(capacityEntry -> totalTopStateUsage + .compute(capacityEntry.getKey(), (k, v) -> (v == null) ? capacityEntry.getValue() + : (v + capacityEntry.getValue()))); } replica.getCapacity().entrySet().stream().forEach(capacityEntry -> totalUsage .compute(capacityEntry.getKey(), @@ -87,18 +93,15 @@ public class ClusterContext { capacityEntry -> totalCapacity.compute(capacityEntry.getKey(), (k, v) -> (v == null) ? capacityEntry.getValue() : (v + capacityEntry.getValue())))); + // TODO: these variables correspond to one constraint each, and may become unnecessary if the + // constraints are not used. A better design is to make them pluggable. if (totalCapacity.isEmpty()) { // If no capacity is configured, we treat the cluster as fully utilized. _estimatedMaxUtilization = 1f; + _estimatedTopStateMaxUtilization = 1f; } else { - float estimatedMaxUsage = 0; - for (String capacityKey : totalCapacity.keySet()) { - int maxCapacity = totalCapacity.get(capacityKey); - int usage = totalUsage.getOrDefault(capacityKey, 0); - float utilization = (maxCapacity == 0) ? 1 : (float) usage / maxCapacity; - estimatedMaxUsage = Math.max(estimatedMaxUsage, utilization); - } - _estimatedMaxUtilization = estimatedMaxUsage; + _estimatedMaxUtilization = estimateMaxUtilization(totalCapacity, totalUsage); + _estimatedTopStateMaxUtilization = estimateMaxUtilization(totalCapacity, totalTopStateUsage); } _estimatedMaxPartitionCount = estimateAvgReplicaCount(totalReplicas, instanceCount); _estimatedMaxTopStateCount = estimateAvgReplicaCount(totalTopStateReplicas, instanceCount); @@ -135,6 +138,10 @@ public class ClusterContext { return _estimatedMaxUtilization; } + public float getEstimatedTopStateMaxUtilization() { + return _estimatedTopStateMaxUtilization; + } + public Set<String> getPartitionsForResourceAndFaultZone(String resourceName, String faultZoneId) { return _assignmentForFaultZoneMap.getOrDefault(faultZoneId, Collections.emptyMap()) .getOrDefault(resourceName, Collections.emptySet()); @@ -169,4 +176,17 @@ public class ClusterContext { // partitions. The later scenario is what we want to achieve. return (int) Math.floor((float) replicaCount / instanceCount); } + + private float estimateMaxUtilization(Map<String, Integer> totalCapacity, + Map<String, Integer> totalUsage) { + float estimatedMaxUsage = 0; + for (String capacityKey : totalCapacity.keySet()) { + int maxCapacity = totalCapacity.get(capacityKey); + int usage = totalUsage.getOrDefault(capacityKey, 0); + float utilization = (maxCapacity == 0) ? 1 : (float) usage / maxCapacity; + estimatedMaxUsage = Math.max(estimatedMaxUsage, utilization); + } + + return estimatedMaxUsage; + } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java index 49e5d8f..bda56ba 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java @@ -298,7 +298,7 @@ public class CurrentStateComputationStage extends AbstractBaseStage { for (AssignableNode node : clusterModel.getAssignableNodes().values()) { String instanceName = node.getInstanceName(); // There is no new usage adding to this node, so an empty map is passed in. - double usage = node.getProjectedHighestUtilization(Collections.emptyMap()); + double usage = node.getGeneralProjectedHighestUtilization(Collections.emptyMap()); clusterStatusMonitor .updateInstanceCapacityStatus(instanceName, usage, node.getMaxCapacity()); } diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java index 5d52cb7..f08371a 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java @@ -45,7 +45,7 @@ public class TestMaxCapacityUsageInstanceConstraint { @Test public void testGetNormalizedScore() { - when(_testNode.getProjectedHighestUtilization(anyMap())).thenReturn(0.8f); + when(_testNode.getGeneralProjectedHighestUtilization(anyMap())).thenReturn(0.8f); when(_clusterContext.getEstimatedMaxUtilization()).thenReturn(1f); double score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext); // Convert to float so as to compare with equal. diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestTopStateMaxCapacityUsageInstanceConstraint.java similarity index 82% copy from helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java copy to helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestTopStateMaxCapacityUsageInstanceConstraint.java index 5d52cb7..947d0a1 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestMaxCapacityUsageInstanceConstraint.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestTopStateMaxCapacityUsageInstanceConstraint.java @@ -30,11 +30,12 @@ import static org.mockito.Matchers.anyMap; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class TestMaxCapacityUsageInstanceConstraint { + +public class TestTopStateMaxCapacityUsageInstanceConstraint { private AssignableReplica _testReplica; private AssignableNode _testNode; private ClusterContext _clusterContext; - private final SoftConstraint _constraint = new MaxCapacityUsageInstanceConstraint(); + private final SoftConstraint _constraint = new TopStateMaxCapacityUsageInstanceConstraint(); @BeforeMethod public void setUp() { @@ -45,11 +46,12 @@ public class TestMaxCapacityUsageInstanceConstraint { @Test public void testGetNormalizedScore() { - when(_testNode.getProjectedHighestUtilization(anyMap())).thenReturn(0.8f); - when(_clusterContext.getEstimatedMaxUtilization()).thenReturn(1f); + when(_testReplica.isReplicaTopState()).thenReturn(true); + when(_testNode.getTopStateProjectedHighestUtilization(anyMap())).thenReturn(0.8f); + when(_clusterContext.getEstimatedTopStateMaxUtilization()).thenReturn(1f); double score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext); // Convert to float so as to compare with equal. - Assert.assertEquals((float) score,0.8f); + Assert.assertEquals((float) score, 0.8f); double normalizedScore = _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext); Assert.assertTrue(normalizedScore > 0.99); diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java index 0245ffa..4570efd 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java @@ -68,8 +68,10 @@ public class TestAssignableNode extends AbstractTestClusterModel { assignableNode.assignInitBatch(assignmentSet); Assert.assertEquals(assignableNode.getAssignedPartitionsMap(), expectedAssignment); Assert.assertEquals(assignableNode.getAssignedReplicaCount(), 4); - Assert.assertEquals(assignableNode.getProjectedHighestUtilization(Collections.EMPTY_MAP), + Assert.assertEquals(assignableNode.getGeneralProjectedHighestUtilization(Collections.EMPTY_MAP), 16.0 / 20.0, 0.005); + Assert.assertEquals(assignableNode.getTopStateProjectedHighestUtilization(Collections.EMPTY_MAP), + 8.0 / 20.0, 0.005); Assert.assertEquals(assignableNode.getMaxCapacity(), _capacityDataMap); Assert.assertEquals(assignableNode.getMaxPartition(), 5); Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags); @@ -109,8 +111,10 @@ public class TestAssignableNode extends AbstractTestClusterModel { Assert.assertEquals(assignableNode.getAssignedPartitionsMap(), expectedAssignment); Assert.assertEquals(assignableNode.getAssignedReplicaCount(), 3); - Assert.assertEquals(assignableNode.getProjectedHighestUtilization(Collections.EMPTY_MAP), + Assert.assertEquals(assignableNode.getGeneralProjectedHighestUtilization(Collections.EMPTY_MAP), 11.0 / 20.0, 0.005); + Assert.assertEquals(assignableNode.getTopStateProjectedHighestUtilization(Collections.EMPTY_MAP), + 3.0 / 20.0, 0.005); Assert.assertEquals(assignableNode.getMaxCapacity(), _capacityDataMap); Assert.assertEquals(assignableNode.getMaxPartition(), 5); Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags); @@ -143,8 +147,10 @@ public class TestAssignableNode extends AbstractTestClusterModel { Assert.assertEquals(assignableNode.getAssignedPartitionsMap(), expectedAssignment); Assert.assertEquals(assignableNode.getAssignedReplicaCount(), 4); - Assert.assertEquals(assignableNode.getProjectedHighestUtilization(Collections.EMPTY_MAP), + Assert.assertEquals(assignableNode.getGeneralProjectedHighestUtilization(Collections.EMPTY_MAP), 16.0 / 20.0, 0.005); + Assert.assertEquals(assignableNode.getTopStateProjectedHighestUtilization(Collections.EMPTY_MAP), + 3.0 / 20.0, 0.005); Assert.assertEquals(assignableNode.getMaxCapacity(), _capacityDataMap); Assert.assertEquals(assignableNode.getMaxPartition(), 5); Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags); diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java index 6b2787c..7171755 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java @@ -68,6 +68,10 @@ public class TestClusterContext extends AbstractTestClusterModel { .addPartitionToFaultZone(_testFaultZoneId, replica.getResourceName(), replica.getPartitionName())); Assert.assertEquals(context.getAssignmentForFaultZoneMap(), expectedFaultZoneMap); + // Capacity with "item1" key is the highest utilized. Among 4 partitions, their weights are + // 3, 5, 3, 5, so a total of 16/20 is used; the 2 master partitions have 3, 5, so 8/20 used. + Assert.assertEquals(context.getEstimatedMaxUtilization(), 16.0 / 20.0, 0.005); + Assert.assertEquals(context.getEstimatedTopStateMaxUtilization(), 8.0 / 20.0, 0.005); // release expectedFaultZoneMap.get(_testFaultZoneId).get(_resourceNames.get(0)) diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java index 80c63bc..bba94fc 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java @@ -123,6 +123,16 @@ public class TestWagedRebalance extends ZkTestBase { return super.getBestPossibleAssignment(); } }; + + // Set test instance capacity and partition weights + HelixDataAccessor dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor); + ClusterConfig clusterConfig = + dataAccessor.getProperty(dataAccessor.keyBuilder().clusterConfig()); + String testCapacityKey = "TestCapacityKey"; + clusterConfig.setInstanceCapacityKeys(Collections.singletonList(testCapacityKey)); + clusterConfig.setDefaultInstanceCapacityMap(Collections.singletonMap(testCapacityKey, 100)); + clusterConfig.setDefaultPartitionWeightMap(Collections.singletonMap(testCapacityKey, 1)); + dataAccessor.setProperty(dataAccessor.keyBuilder().clusterConfig(), clusterConfig); } protected void addInstanceConfig(String storageNodeName, int seqNo, int tagCount) {
