This is an automated email from the ASF dual-hosted git repository. jiajunwang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit fe4a9bbdd00928b285eaa896c3a1d11a62299f8d Author: xyuanlu <[email protected]> AuthorDate: Fri Apr 23 21:42:23 2021 -0700 Improve the WAGED.ConstraintBasedAlgorithm sorting logic to prioritize replica with larger impact (#1691) Improve the WAGED.ConstraintBasedAlgorithm sorting logic to prioritize replica with larger impact. --- .../constraints/ConstraintBasedAlgorithm.java | 222 ++++++++++++++------- .../constraints/TestConstraintBasedAlgorithm.java | 15 ++ .../waged/model/AbstractTestClusterModel.java | 42 ++++ .../waged/model/ClusterModelTestHelper.java | 2 +- .../WagedRebalancer/TestWagedRebalance.java | 2 +- 5 files changed, 213 insertions(+), 70 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java index 2f6c9cd..89730c6 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java @@ -42,6 +42,7 @@ import org.apache.helix.model.ResourceAssignment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + /** * The algorithm is based on a given set of constraints * - HardConstraint: Approve or deny the assignment given its condition, any assignment cannot @@ -68,23 +69,38 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm { List<AssignableNode> nodes = new ArrayList<>(clusterModel.getAssignableNodes().values()); Set<String> busyInstances = getBusyInstances(clusterModel.getContext().getBestPossibleAssignment().values()); - // Sort the replicas so the input is stable for the greedy algorithm. - // For the other algorithm implementation, this sorting could be unnecessary. - for (AssignableReplica replica : getOrderedAssignableReplica(clusterModel)) { + + // Compute overall utilization of the cluster. Capacity dimension -> total remaining capacity + Map<String, Integer> overallClusterRemainingCapacityMap = + computeOverallClusterRemainingCapacity(nodes); + + // Create a wrapper for each AssignableReplica. + Set<AssignableReplicaWithScore> toBeAssignedReplicas = + clusterModel.getAssignableReplicaMap().values().stream() + .flatMap(replicas -> replicas.stream()) + .map(replica -> new AssignableReplicaWithScore(replica, clusterModel)) + .collect(Collectors.toSet()); + + while (!toBeAssignedReplicas.isEmpty()) { + AssignableReplica replica = + getNextAssignableReplica(toBeAssignedReplicas, overallClusterRemainingCapacityMap); Optional<AssignableNode> maybeBestNode = getNodeWithHighestPoints(replica, nodes, clusterModel.getContext(), busyInstances, optimalAssignment); // stop immediately if any replica cannot find best assignable node - if (optimalAssignment.hasAnyFailure()) { - String errorMessage = String - .format("Unable to find any available candidate node for partition %s; Fail reasons: %s", + if (!maybeBestNode.isPresent() || optimalAssignment.hasAnyFailure()) { + String errorMessage = String.format( + "Unable to find any available candidate node for partition %s; Fail reasons: %s", replica.getPartitionName(), optimalAssignment.getFailures()); throw new HelixRebalanceException(errorMessage, HelixRebalanceException.Type.FAILED_TO_CALCULATE); } - maybeBestNode.ifPresent(node -> clusterModel + AssignableNode bestNode = maybeBestNode.get(); + // Assign the replica and update the cluster model. + clusterModel .assign(replica.getResourceName(), replica.getPartitionName(), replica.getReplicaState(), - node.getInstanceName())); + bestNode.getInstanceName()); + updateOverallClusterRemainingCapacity(overallClusterRemainingCapacityMap, replica); } optimalAssignment.updateAssignments(clusterModel); return optimalAssignment; @@ -152,69 +168,139 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm { .collect(Collectors.toList()); } - private List<AssignableReplica> getOrderedAssignableReplica(ClusterModel clusterModel) { - Map<String, Set<AssignableReplica>> replicasByResource = clusterModel.getAssignableReplicaMap(); - List<AssignableReplica> orderedAssignableReplicas = - replicasByResource.values().stream().flatMap(replicas -> replicas.stream()) - .collect(Collectors.toList()); - - Map<String, ResourceAssignment> bestPossibleAssignment = - clusterModel.getContext().getBestPossibleAssignment(); - Map<String, ResourceAssignment> baselineAssignment = - clusterModel.getContext().getBaselineAssignment(); - - Map<String, Integer> replicaHashCodeMap = orderedAssignableReplicas.parallelStream().collect( - Collectors.toMap(AssignableReplica::toString, - replica -> Objects.hash(replica.toString(), clusterModel.getAssignableNodes().keySet()), - (hash1, hash2) -> hash2)); - - // 1. Sort according if the assignment exists in the best possible and/or baseline assignment - // 2. Sort according to the state priority. Note that prioritizing the top state is required. - // Or the greedy algorithm will unnecessarily shuffle the states between replicas. - // 3. Sort according to the resource/partition name. - orderedAssignableReplicas.sort((replica1, replica2) -> { - String resourceName1 = replica1.getResourceName(); - String resourceName2 = replica2.getResourceName(); - if (bestPossibleAssignment.containsKey(resourceName1) == bestPossibleAssignment - .containsKey(resourceName2)) { - if (baselineAssignment.containsKey(resourceName1) == baselineAssignment - .containsKey(resourceName2)) { - // If both assignment states have/not have the resource assignment the same, - // compare for additional dimensions. - int statePriority1 = replica1.getStatePriority(); - int statePriority2 = replica2.getStatePriority(); - if (statePriority1 == statePriority2) { - // If state priorities are the same, try to randomize the replicas order. Otherwise, - // the same replicas might always be moved in each rebalancing. This is because their - // placement calculating will always happen at the critical moment while the cluster is - // almost close to the expected utilization. - // - // Note that to ensure the algorithm is deterministic with the same inputs, do not use - // Random functions here. Use hashcode based on the cluster topology information to get - // a controlled randomized order is good enough. - Integer replicaHash1 = replicaHashCodeMap.get(replica1.toString()); - Integer replicaHash2 = replicaHashCodeMap.get(replica2.toString()); - if (!replicaHash1.equals(replicaHash2)) { - return replicaHash1.compareTo(replicaHash2); - } else { - // In case of hash collision, return order according to the name. - return replica1.toString().compareTo(replica2.toString()); - } - } else { - // Note we shall prioritize the replica with a higher state priority, - // the smaller priority number means higher priority. - return statePriority1 - statePriority2; - } - } else { - // If the baseline assignment contains the assignment, prioritize the replica. - return baselineAssignment.containsKey(resourceName1) ? -1 : 1; + private Map<String, Integer> computeOverallClusterRemainingCapacity(List<AssignableNode> nodes) { + Map<String, Integer> utilizationMap = new HashMap<>(); + for (AssignableNode node : nodes) { + for (String capacityKey : node.getMaxCapacity().keySet()) { + utilizationMap.compute(capacityKey, + (k, v) -> v == null ? node.getRemainingCapacity().get(capacityKey) + : v + node.getRemainingCapacity().get(capacityKey)); + } + } + return utilizationMap; + } + + /** + * Update the overallClusterUtilMap with newly placed replica + */ + private void updateOverallClusterRemainingCapacity( + Map<String, Integer> overallClusterRemainingCapacityMap, AssignableReplica replica) { + for (Map.Entry<String, Integer> resourceUsage : replica.getCapacity().entrySet()) { + overallClusterRemainingCapacityMap.put(resourceUsage.getKey(), + overallClusterRemainingCapacityMap.get(resourceUsage.getKey()) - resourceUsage + .getValue()); + } + } + + private class AssignableReplicaWithScore implements Comparable<AssignableReplicaWithScore> { + private final AssignableReplica _replica; + private float _score = 0; + private final boolean _isInBestPossibleAssignment; + private final boolean _isInBaselineAssignment; + private final Integer _replicaHash; + + AssignableReplicaWithScore(AssignableReplica replica, ClusterModel clusterModel) { + _replica = replica; + _isInBestPossibleAssignment = clusterModel.getContext().getBestPossibleAssignment() + .containsKey(replica.getResourceName()); + _isInBaselineAssignment = + clusterModel.getContext().getBaselineAssignment().containsKey(replica.getResourceName()); + _replicaHash = Objects.hash(replica.toString(), clusterModel.getAssignableNodes().keySet()); + } + + public void computeScore(Map<String, Integer> overallClusterRemainingCapMap) { + float score = 0; + // score = SUM(weight * (resource_capacity/cluster_capacity) where weight = 1/(1-total_util%) + // it could be be simplified to "resource_capacity/cluster_remainingCapacity". + for (Map.Entry<String, Integer> resourceCapacity : _replica.getCapacity().entrySet()) { + if (resourceCapacity.getValue() == 0) { + continue; } + score = (overallClusterRemainingCapMap.get(resourceCapacity.getKey()) == 0 + || resourceCapacity.getValue() > (overallClusterRemainingCapMap + .get(resourceCapacity.getKey()))) ? Float.MAX_VALUE + : score + (float) resourceCapacity.getValue() / (overallClusterRemainingCapMap + .get(resourceCapacity.getKey())); + if (Float.compare(score, Float.MAX_VALUE) == 0) { + break; + } + } + _score = score; + } + + public AssignableReplica getAssignableReplica() { + return _replica; + } + + + @Override + public String toString() { + return _replica.toString(); + } + + @Override + public int compareTo(AssignableReplicaWithScore replica2) { + // 1. Sort according if the assignment exists in the best possible and/or baseline assignment + if (_isInBestPossibleAssignment != replica2._isInBestPossibleAssignment) { + // If the best possible assignment contains only one replica's assignment, + // prioritize the replica. + return _isInBestPossibleAssignment ? -1 : 1; + } + + if (_isInBaselineAssignment != replica2._isInBaselineAssignment) { + // If the baseline assignment contains only one replica's assignment, prioritize the replica. + return _isInBaselineAssignment ? -1 : 1; + } + + // 2. Sort according to the state priority. Or the greedy algorithm will unnecessarily shuffle + // the states between replicas. + int statePriority1 = _replica.getStatePriority(); + int statePriority2 = replica2._replica.getStatePriority(); + if (statePriority1 != statePriority2) { + // Note we shall prioritize the replica with a higher state priority, + // the smaller priority number means higher priority. + return statePriority1 - statePriority2; + } + + // 3. Sort according to the replica impact based on the weight. + // So the greedy algorithm will place the replicas with larger impact first. + int result = Float.compare(replica2._score, _score); + if (result != 0) { + return result; + } + + // 4. Sort according to the resource/partition name. + // If none of the above conditions is making a difference, try to randomize the replicas + // order. + // Otherwise, the same replicas might always be moved in each rebalancing. This is because + // their placement calculating will always happen at the critical moment while the cluster is + // almost close to the expected utilization. + // + // Note that to ensure the algorithm is deterministic with the same inputs, do not use + // Random functions here. Use hashcode based on the cluster topology information to get + // a controlled randomized order is good enough. + if (!_replicaHash.equals(replica2._replicaHash)) { + return _replicaHash.compareTo(replica2._replicaHash); } else { - // If the best possible assignment contains the assignment, prioritize the replica. - return bestPossibleAssignment.containsKey(resourceName1) ? -1 : 1; + // In case of hash collision, return order according to the name. + return _replica.toString().compareTo(replica2.toString()); } - }); - return orderedAssignableReplicas; + } + } + + private AssignableReplica getNextAssignableReplica( + Set<AssignableReplicaWithScore> allReplica, + Map<String, Integer> overallClusterRemainingCapMap) { + AssignableReplicaWithScore nextAssinableReplica = null; + // Compare every replica with current candidate, update candidate if needed + for (AssignableReplicaWithScore replica : allReplica) { + replica.computeScore(overallClusterRemainingCapMap); + if (nextAssinableReplica == null || replica.compareTo(nextAssinableReplica) < 0) { + nextAssinableReplica = replica; + } + } + allReplica.remove(nextAssinableReplica); + return nextAssinableReplica.getAssignableReplica(); } /** diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestConstraintBasedAlgorithm.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestConstraintBasedAlgorithm.java index 3954407..84aeb20 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestConstraintBasedAlgorithm.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestConstraintBasedAlgorithm.java @@ -83,4 +83,19 @@ public class TestConstraintBasedAlgorithm { .containsKey(ClusterModelTestHelper.TEST_INSTANCE_ID_1)); })); } + + // Add capacity related hard/soft constrain to test sorting algorithm in ConstraintBasedAlgorithm. + @Test + public void testSortingByResourceCapacity() throws IOException, HelixRebalanceException { + HardConstraint nodeCapacityConstraint = new NodeCapacityConstraint(); + SoftConstraint soft1 = new MaxCapacityUsageInstanceConstraint(); + SoftConstraint soft2 = new InstancePartitionsCountConstraint(); + ConstraintBasedAlgorithm algorithm = + new ConstraintBasedAlgorithm(ImmutableList.of(nodeCapacityConstraint), + ImmutableMap.of(soft1, 1f, soft2, 1f)); + ClusterModel clusterModel = new ClusterModelTestHelper().getMultiNodeClusterModel(); + OptimalAssignment optimalAssignment = algorithm.calculate(clusterModel); + + Assert.assertFalse(optimalAssignment.hasAnyFailure()); + } } diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java index e3b346d..7b5d63a 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java @@ -164,6 +164,7 @@ public abstract class AbstractTestClusterModel { testResourceConfigResource1.setPartitionCapacityMap( Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMapResource1)); when(testCache.getResourceConfig("Resource1")).thenReturn(testResourceConfigResource1); + Map<String, Integer> capacityDataMapResource2 = new HashMap<>(); capacityDataMapResource2.put("item1", 5); capacityDataMapResource2.put("item2", 10); @@ -184,6 +185,47 @@ public abstract class AbstractTestClusterModel { return testCache; } + // Add another resource. When compute, the two smaller resources' Master partition should be + // assigned to one instance and the relatively larger one's Master partition should be assigned to + // another. + // The sorting algorithm in ConstraintBasedAlgorithm should garnette these 2 smaller resources + // are placed after the larger one. + // This is the only way to accommodate all 6 partitions. + protected ResourceControllerDataProvider setupClusterDataCacheForNearFullUtil() throws IOException { + _resourceNames.add("Resource3"); + _partitionNames.add("Partition5"); + _partitionNames.add("Partition6"); + ResourceControllerDataProvider testCache = setupClusterDataCache(); + + CurrentState testCurrentStateResource3 = Mockito.mock(CurrentState.class); + Map<String, String> partitionStateMap3 = new HashMap<>(); + partitionStateMap3.put(_partitionNames.get(4), "MASTER"); + partitionStateMap3.put(_partitionNames.get(5), "SLAVE"); + when(testCurrentStateResource3.getResourceName()).thenReturn(_resourceNames.get(2)); + when(testCurrentStateResource3.getPartitionStateMap()).thenReturn(partitionStateMap3); + when(testCurrentStateResource3.getStateModelDefRef()).thenReturn("MasterSlave"); + when(testCurrentStateResource3.getState(_partitionNames.get(4))).thenReturn("MASTER"); + when(testCurrentStateResource3.getState(_partitionNames.get(5))).thenReturn("SLAVE"); + when(testCurrentStateResource3.getSessionId()).thenReturn(_sessionId); + + Map<String, CurrentState> currentStatemap = testCache.getCurrentState(_testInstanceId, _sessionId); + currentStatemap.put(_resourceNames.get(2), testCurrentStateResource3); + when(testCache.getCurrentState(_testInstanceId, _sessionId)).thenReturn(currentStatemap); + when(testCache.getCurrentState(_testInstanceId, _sessionId, false)).thenReturn(currentStatemap); + + Map<String, Integer> capacityDataMapResource3 = new HashMap<>(); + capacityDataMapResource3.put("item1", 9); + capacityDataMapResource3.put("item2", 17); + ResourceConfig testResourceConfigResource3 = new ResourceConfig("Resource3"); + testResourceConfigResource3.setPartitionCapacityMap( + Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMapResource3)); + when(testCache.getResourceConfig("Resource3")).thenReturn(testResourceConfigResource3); + Map<String, ResourceConfig> configMap = testCache.getResourceConfigMap(); + configMap.put("Resource3", testResourceConfigResource3); + when(testCache.getResourceConfigMap()).thenReturn(configMap); + return testCache; + } + /** * Generate the replica objects according to the provider information. */ diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java index 1c1687e..02ab0f5 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java @@ -48,7 +48,7 @@ public class ClusterModelTestHelper extends AbstractTestClusterModel { public ClusterModel getMultiNodeClusterModel() throws IOException { initialize(); - ResourceControllerDataProvider testCache = setupClusterDataCache(); + ResourceControllerDataProvider testCache = setupClusterDataCacheForNearFullUtil(); InstanceConfig testInstanceConfig1 = createMockInstanceConfig(TEST_INSTANCE_ID_1); InstanceConfig testInstanceConfig2 = createMockInstanceConfig(TEST_INSTANCE_ID_2); Map<String, InstanceConfig> instanceConfigMap = new HashMap<>(); diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java index bba94fc..bb58d86 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java @@ -732,7 +732,6 @@ public class TestWagedRebalance extends ZkTestBase { for (String db : _allDBs) { _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, db); } - _allDBs.clear(); // waiting for all DB be dropped. ZkHelixClusterVerifier _clusterVerifier = new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) @@ -741,6 +740,7 @@ public class TestWagedRebalance extends ZkTestBase { .build(); try { Assert.assertTrue(_clusterVerifier.verifyByPolling()); + _allDBs.clear(); } finally { _clusterVerifier.close(); }
