This is an automated email from the ASF dual-hosted git repository. jiajunwang pushed a commit to branch wagedRebalancer in repository https://gitbox.apache.org/repos/asf/helix.git
commit e893a38651785d4202cd647f8e1c67c0fc29b597 Author: Jiajun Wang <[email protected]> AuthorDate: Tue Sep 17 13:41:56 2019 -0700 Adjust the expected replica count according to fault zone count. (#476) The rebalancer should determine the expected replica count according to the fault zone instead of the node count only. --- .../rebalancer/waged/model/AssignableNode.java | 56 ++++++++++------------ .../waged/model/ClusterModelProvider.java | 28 ++++++----- .../waged/model/ClusterModelTestHelper.java | 3 +- .../rebalancer/waged/model/TestAssignableNode.java | 24 ++++------ .../rebalancer/waged/model/TestClusterModel.java | 3 +- .../waged/model/TestClusterModelProvider.java | 33 ++++++++----- 6 files changed, 76 insertions(+), 71 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java index 6966353..a3460fb 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java @@ -64,12 +64,10 @@ public class AssignableNode implements Comparable<AssignableNode> { * @param clusterConfig * @param instanceConfig * @param instanceName - * @param existingAssignment A collection of replicas that have been pre-allocated to the node. */ - AssignableNode(ClusterConfig clusterConfig, InstanceConfig instanceConfig, String instanceName, - Collection<AssignableReplica> existingAssignment) { + AssignableNode(ClusterConfig clusterConfig, InstanceConfig instanceConfig, String instanceName) { _instanceName = instanceName; - refresh(clusterConfig, instanceConfig, existingAssignment); + refresh(clusterConfig, instanceConfig); } private void reset() { @@ -88,10 +86,8 @@ public class AssignableNode implements Comparable<AssignableNode> { * subject to change. If the assumption is no longer true, this function should become private. * @param clusterConfig - the Cluster Config of the cluster where the node is located * @param instanceConfig - the Instance Config of the node - * @param existingAssignment - all the existing replicas that are current assigned to the node */ - private void refresh(ClusterConfig clusterConfig, InstanceConfig instanceConfig, - Collection<AssignableReplica> existingAssignment) { + private void refresh(ClusterConfig clusterConfig, InstanceConfig instanceConfig) { reset(); Map<String, Integer> instanceCapacity = fetchInstanceCapacity(clusterConfig, instanceConfig); @@ -101,8 +97,29 @@ public class AssignableNode implements Comparable<AssignableNode> { _disabledPartitionsMap = instanceConfig.getDisabledPartitionsMap(); _maxCapacity = instanceCapacity; _maxPartition = clusterConfig.getMaxPartitionsPerInstance(); + } + + /** + * This function should only be used to assign a set of new partitions that are not allocated on + * this node. + * Using this function avoids the overhead of updating capacity repeatedly. + */ + void assignNewBatch(Collection<AssignableReplica> replicas) { + Map<String, Integer> totalPartitionCapacity = new HashMap<>(); + for (AssignableReplica replica : replicas) { + addToAssignmentRecord(replica); + // increment the capacity requirement according to partition's capacity configuration. + for (Map.Entry<String, Integer> capacity : replica.getCapacity().entrySet()) { + totalPartitionCapacity.compute(capacity.getKey(), + (key, totalValue) -> (totalValue == null) ? capacity.getValue() + : totalValue + capacity.getValue()); + } + } - assignNewBatch(existingAssignment); + // Update the global state after all single replications' calculation is done. + for (String key : totalPartitionCapacity.keySet()) { + updateCapacityAndUtilization(key, totalPartitionCapacity.get(key)); + } } /** @@ -315,29 +332,6 @@ public class AssignableNode implements Comparable<AssignableNode> { } /** - * This function should only be used to assign a set of new partitions that are not allocated on - * this node. - * Using this function avoids the overhead of updating capacity repeatedly. - */ - private void assignNewBatch(Collection<AssignableReplica> replicas) { - Map<String, Integer> totalPartitionCapacity = new HashMap<>(); - for (AssignableReplica replica : replicas) { - addToAssignmentRecord(replica); - // increment the capacity requirement according to partition's capacity configuration. - for (Map.Entry<String, Integer> capacity : replica.getCapacity().entrySet()) { - totalPartitionCapacity.compute(capacity.getKey(), - (key, totalValue) -> (totalValue == null) ? capacity.getValue() - : totalValue + capacity.getValue()); - } - } - - // Update the global state after all single replications' calculation is done. - for (String key : totalPartitionCapacity.keySet()) { - updateCapacityAndUtilization(key, totalPartitionCapacity.get(key)); - } - } - - /** * @throws HelixException if the replica has already been assigned to the node. */ private void addToAssignmentRecord(AssignableReplica replica) { diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java index 3570164..20024c7 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java @@ -62,10 +62,15 @@ public class ClusterModelProvider { Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String, ResourceAssignment> baselineAssignment, Map<String, ResourceAssignment> bestPossibleAssignment) { + // Construct all the assignable nodes and initialize with the allocated replicas. + Set<AssignableNode> assignableNodes = + parseAllNodes(dataProvider.getClusterConfig(), dataProvider.getInstanceConfigMap(), + activeInstances); + // Generate replica objects for all the resource partitions. // <resource, replica set> Map<String, Set<AssignableReplica>> replicaMap = - parseAllReplicas(dataProvider, resourceMap, activeInstances.size()); + parseAllReplicas(dataProvider, resourceMap, assignableNodes); // Check if the replicas need to be reassigned. Map<String, Set<AssignableReplica>> allocatedReplicas = @@ -74,10 +79,9 @@ public class ClusterModelProvider { findToBeAssignedReplicas(replicaMap, clusterChanges, activeInstances, bestPossibleAssignment, allocatedReplicas); - // Construct all the assignable nodes and initialize with the allocated replicas. - Set<AssignableNode> assignableNodes = - parseAllNodes(dataProvider.getClusterConfig(), dataProvider.getInstanceConfigMap(), - activeInstances, allocatedReplicas); + // Update the allocated replicas to the assignable nodes. + assignableNodes.stream().forEach(node -> node.assignNewBatch( + allocatedReplicas.getOrDefault(node.getInstanceName(), Collections.emptySet()))); // Construct and initialize cluster context. ClusterContext context = new ClusterContext( @@ -171,15 +175,13 @@ public class ClusterModelProvider { * @param clusterConfig The cluster configuration. * @param instanceConfigMap A map of all the instance configuration. * @param activeInstances All the instances that are online and enabled. - * @param allocatedReplicas A map of all the assigned replicas, which will not be reassigned during the rebalance. * @return A map of assignable node set, <InstanceName, node set>. */ private static Set<AssignableNode> parseAllNodes(ClusterConfig clusterConfig, - Map<String, InstanceConfig> instanceConfigMap, Set<String> activeInstances, - Map<String, Set<AssignableReplica>> allocatedReplicas) { + Map<String, InstanceConfig> instanceConfigMap, Set<String> activeInstances) { return activeInstances.stream().map( instanceName -> new AssignableNode(clusterConfig, instanceConfigMap.get(instanceName), - instanceName, allocatedReplicas.getOrDefault(instanceName, Collections.emptySet()))) + instanceName)) .collect(Collectors.toSet()); } @@ -188,11 +190,12 @@ public class ClusterModelProvider { * * @param dataProvider The cluster status cache that contains the current cluster status. * @param resourceMap All the valid resources that are managed by the rebalancer. + * @param assignableNodes All the active assignable nodes. * @return A map of assignable replica set, <ResourceName, replica set>. */ private static Map<String, Set<AssignableReplica>> parseAllReplicas( ResourceControllerDataProvider dataProvider, Map<String, Resource> resourceMap, - int instanceCount) { + Set<AssignableNode> assignableNodes) { Map<String, Set<AssignableReplica>> totalReplicaMap = new HashMap<>(); ClusterConfig clusterConfig = dataProvider.getClusterConfig(); @@ -211,8 +214,11 @@ public class ClusterModelProvider { is.getStateModelDefRef(), resourceName)); } + int activeFaultZoneCount = + assignableNodes.stream().map(node -> node.getFaultZone()).collect(Collectors.toSet()) + .size(); Map<String, Integer> stateCountMap = - def.getStateCountMap(instanceCount, is.getReplicaCount(instanceCount)); + def.getStateCountMap(activeFaultZoneCount, is.getReplicaCount(assignableNodes.size())); for (String partition : is.getPartitionSet()) { for (Map.Entry<String, Integer> entry : stateCountMap.entrySet()) { diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java index 76f1141..08143c6 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java @@ -43,8 +43,7 @@ public class ClusterModelTestHelper extends AbstractTestClusterModel { Set<AssignableNode> nodeSet = new HashSet<>(); testCache.getInstanceConfigMap().values().stream() .forEach(config -> nodeSet.add(new AssignableNode(testCache.getClusterConfig(), - testCache.getInstanceConfigMap().get(_testInstanceId), config.getInstanceName(), - Collections.emptyList()))); + testCache.getInstanceConfigMap().get(_testInstanceId), config.getInstanceName()))); return nodeSet; } } diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java index 92a6998..6975901 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java @@ -64,7 +64,8 @@ public class TestAssignableNode extends AbstractTestClusterModel { expectedCapacityMap.put("item3", 30); AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(), - testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId, assignmentSet); + testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId); + assignableNode.assignNewBatch(assignmentSet); Assert.assertEquals(assignableNode.getAssignedPartitionsMap(), expectedAssignment); Assert.assertEquals(assignableNode.getAssignedReplicaCount(), 4); Assert.assertEquals(assignableNode.getHighestCapacityUtilization(), 16.0 / 20.0, 0.005); @@ -167,8 +168,7 @@ public class TestAssignableNode extends AbstractTestClusterModel { ResourceControllerDataProvider testCache = setupClusterDataCache(); AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(), - testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId, - Collections.emptyList()); + testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId); AssignableReplica removingReplica = new AssignableReplica(testCache.getClusterConfig(), testCache.getResourceConfig(_resourceNames.get(1)), _partitionNames.get(2) + "non-exist", "MASTER", 1); @@ -183,7 +183,8 @@ public class TestAssignableNode extends AbstractTestClusterModel { Set<AssignableReplica> assignmentSet = generateReplicas(testCache); AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(), - testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId, assignmentSet); + testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId); + assignableNode.assignNewBatch(assignmentSet); AssignableReplica duplicateReplica = new AssignableReplica(testCache.getClusterConfig(), testCache.getResourceConfig(_resourceNames.get(0)), _partitionNames.get(0), "SLAVE", 2); assignableNode.assign(duplicateReplica); @@ -206,8 +207,7 @@ public class TestAssignableNode extends AbstractTestClusterModel { when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap); new AssignableNode(testCache.getClusterConfig(), - testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId, - Collections.emptyList()); + testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId); } @Test @@ -227,8 +227,7 @@ public class TestAssignableNode extends AbstractTestClusterModel { when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap); AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(), - testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId, - Collections.emptyList()); + testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId); Assert.assertEquals(assignableNode.getFaultZone(), "2/"); @@ -245,8 +244,7 @@ public class TestAssignableNode extends AbstractTestClusterModel { when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap); assignableNode = new AssignableNode(testCache.getClusterConfig(), - testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId, - Collections.emptyList()); + testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId); Assert.assertEquals(assignableNode.getFaultZone(), "2/testInstance/"); } @@ -259,8 +257,7 @@ public class TestAssignableNode extends AbstractTestClusterModel { InstanceConfig testInstanceConfig = new InstanceConfig("testInstanceConfigId"); AssignableNode assignableNode = - new AssignableNode(testClusterConfig, testInstanceConfig, _testInstanceId, - Collections.emptyList()); + new AssignableNode(testClusterConfig, testInstanceConfig, _testInstanceId); Assert.assertEquals(assignableNode.getMaxCapacity(), _capacityDataMap); } @@ -274,7 +271,6 @@ public class TestAssignableNode extends AbstractTestClusterModel { InstanceConfig testInstanceConfig = new InstanceConfig("testInstanceConfigId"); testInstanceConfig.setInstanceCapacityMap(_capacityDataMap); - new AssignableNode(testClusterConfig, testInstanceConfig, _testInstanceId, - Collections.emptyList()); + new AssignableNode(testClusterConfig, testInstanceConfig, _testInstanceId); } } diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java index a45b729..5112413 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java @@ -43,8 +43,7 @@ public class TestClusterModel extends AbstractTestClusterModel { Set<AssignableNode> nodeSet = new HashSet<>(); testCache.getInstanceConfigMap().values().stream().forEach(config -> nodeSet.add( new AssignableNode(testCache.getClusterConfig(), - testCache.getInstanceConfigMap().get(_testInstanceId), config.getInstanceName(), - Collections.emptyList()))); + testCache.getInstanceConfigMap().get(_testInstanceId), config.getInstanceName()))); return nodeSet; } diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java index 1ec92a9..ad608b6 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java @@ -19,6 +19,14 @@ package org.apache.helix.controller.rebalancer.waged.model; * under the License. */ +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + import org.apache.helix.HelixConstants; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; import org.apache.helix.controller.rebalancer.waged.WagedRebalancer; @@ -34,14 +42,6 @@ import org.testng.Assert; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; - import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.when; @@ -111,7 +111,18 @@ public class TestClusterModelProvider extends AbstractTestClusterModel { Assert.assertEquals( clusterModel.getAssignableNodes().values().stream().map(AssignableNode::getInstanceName) .collect(Collectors.toSet()), _instances); - // Shall have 2 resources and 12 replicas + // Shall have 2 resources and 4 replicas, since all nodes are in the same fault zone. + Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 2); + Assert.assertTrue(clusterModel.getAssignableReplicaMap().values().stream() + .allMatch(replicaSet -> replicaSet.size() == 4)); + + // Adjust instance fault zone, so they have different fault zones. + testCache.getInstanceConfigMap().values().stream() + .forEach(config -> config.setZoneId(config.getInstanceName())); + clusterModel = ClusterModelProvider.generateClusterModel(testCache, _resourceNames.stream() + .collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))), + _instances, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()); + // Shall have 2 resources and 12 replicas after fault zone adjusted. Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 2); Assert.assertTrue(clusterModel.getAssignableReplicaMap().values().stream() .allMatch(replicaSet -> replicaSet.size() == 12)); @@ -197,10 +208,10 @@ public class TestClusterModelProvider extends AbstractTestClusterModel { _instances, Collections.singletonMap(HelixConstants.ChangeType.RESOURCE_CONFIG, Collections.singleton(changedResourceName)), Collections.emptyMap(), bestPossibleAssignment); - // There should be no existing assignment for all the resource except for resource2. + // There should be no existing assignment for all the resource except for resource2 Assert.assertEquals(clusterModel.getContext().getAssignmentForFaultZoneMap().size(), 1); Map<String, Set<String>> resourceAssignmentMap = - clusterModel.getContext().getAssignmentForFaultZoneMap().get(_testFaultZoneId); + clusterModel.getContext().getAssignmentForFaultZoneMap().get(_testInstanceId); // Should be only resource2 in the map Assert.assertEquals(resourceAssignmentMap.size(), 1); for (String resource : _resourceNames) {
