This is an automated email from the ASF dual-hosted git repository. jiajunwang pushed a commit to branch wagedRebalancer in repository https://gitbox.apache.org/repos/asf/helix.git
commit b75544371c145d221aa84d8348dfdba422c67155 Author: Yi Wang <[email protected]> AuthorDate: Sat Sep 28 00:49:13 2019 -0700 Separate AssignableNode properties by Immutable and Mutable (#485) Make AssignableNode properties different by Immutable and Mutable - It helps detect any wrong usage of these properties early --- .../waged/constraints/NodeCapacityConstraint.java | 2 +- .../rebalancer/waged/model/AssignableNode.java | 119 ++++++++++----------- .../waged/model/ClusterModelProvider.java | 2 +- .../constraints/TestNodeCapacityConstraint.java | 4 +- .../rebalancer/waged/model/TestAssignableNode.java | 15 ++- 5 files changed, 66 insertions(+), 76 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeCapacityConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeCapacityConstraint.java index 5fc2faf..827d6ce 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeCapacityConstraint.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeCapacityConstraint.java @@ -30,7 +30,7 @@ class NodeCapacityConstraint extends HardConstraint { @Override boolean isAssignmentValid(AssignableNode node, AssignableReplica replica, ClusterContext clusterContext) { - Map<String, Integer> nodeCapacity = node.getCurrentCapacity(); + Map<String, Integer> nodeCapacity = node.getRemainingCapacity(); Map<String, Integer> replicaCapacity = replica.getCapacity(); for (String key : replicaCapacity.keySet()) { diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java index 20de6da..2a68e15 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java @@ -23,7 +23,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -35,6 +34,10 @@ import org.apache.helix.model.InstanceConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + /** * This class represents a possible allocation of the replication. * Note that any usage updates to the AssignableNode are not thread safe. @@ -42,39 +45,25 @@ import org.slf4j.LoggerFactory; public class AssignableNode implements Comparable<AssignableNode> { private static final Logger LOG = LoggerFactory.getLogger(AssignableNode.class.getName()); - // basic node information + // Immutable Instance Properties private final String _instanceName; - private Set<String> _instanceTags; - private String _faultZone; - private Map<String, List<String>> _disabledPartitionsMap; - private Map<String, Integer> _maxCapacity; - private int _maxPartition; // maximum number of the partitions that can be assigned to the node. - + private final String _faultZone; + // maximum number of the partitions that can be assigned to the instance. + private final int _maxPartition; + private final ImmutableSet<String> _instanceTags; + private final ImmutableMap<String, List<String>> _disabledPartitionsMap; + private final ImmutableMap<String, Integer> _maxAllowedCapacity; + + // Mutable (Dynamic) Instance Properties // A map of <resource name, <partition name, replica>> that tracks the replicas assigned to the // node. private Map<String, Map<String, AssignableReplica>> _currentAssignedReplicaMap; // A map of <capacity key, capacity value> that tracks the current available node capacity - private Map<String, Integer> _currentCapacityMap; + private Map<String, Integer> _remainingCapacity; // The maximum capacity utilization (0.0 - 1.0) across all the capacity categories. private float _highestCapacityUtilization; /** - * @param clusterConfig - * @param instanceConfig - * @param instanceName - */ - AssignableNode(ClusterConfig clusterConfig, InstanceConfig instanceConfig, String instanceName) { - _instanceName = instanceName; - refresh(clusterConfig, instanceConfig); - } - - private void reset() { - _currentAssignedReplicaMap = new HashMap<>(); - _currentCapacityMap = new HashMap<>(); - _highestCapacityUtilization = 0; - } - - /** * Update the node with a ClusterDataCache. This resets the current assignment and recalculates * currentCapacity. * NOTE: While this is required to be used in the constructor, this can also be used when the @@ -82,29 +71,31 @@ public class AssignableNode implements Comparable<AssignableNode> { * refreshed. This is under the assumption that the capacity mappings of InstanceConfig and * ResourceConfig could * subject to change. If the assumption is no longer true, this function should become private. - * @param clusterConfig - the Cluster Config of the cluster where the node is located - * @param instanceConfig - the Instance Config of the node */ - private void refresh(ClusterConfig clusterConfig, InstanceConfig instanceConfig) { - reset(); - + AssignableNode(ClusterConfig clusterConfig, InstanceConfig instanceConfig, String instanceName) { + _instanceName = instanceName; Map<String, Integer> instanceCapacity = fetchInstanceCapacity(clusterConfig, instanceConfig); - _currentCapacityMap.putAll(instanceCapacity); _faultZone = computeFaultZone(clusterConfig, instanceConfig); - _instanceTags = new HashSet<>(instanceConfig.getTags()); - _disabledPartitionsMap = instanceConfig.getDisabledPartitionsMap(); - _maxCapacity = instanceCapacity; + _instanceTags = ImmutableSet.copyOf(instanceConfig.getTags()); + _disabledPartitionsMap = ImmutableMap.copyOf(instanceConfig.getDisabledPartitionsMap()); + // make a copy of max capacity + _maxAllowedCapacity = ImmutableMap.copyOf(instanceCapacity); + _remainingCapacity = new HashMap<>(instanceCapacity); _maxPartition = clusterConfig.getMaxPartitionsPerInstance(); + _currentAssignedReplicaMap = new HashMap<>(); + _highestCapacityUtilization = 0f; } /** * This function should only be used to assign a set of new partitions that are not allocated on - * this node. + * this node. It's because the any exception could occur at the middle of batch assignment and the + * previous finished assignment cannot be reverted * Using this function avoids the overhead of updating capacity repeatedly. */ - void assignNewBatch(Collection<AssignableReplica> replicas) { + void assignInitBatch(Collection<AssignableReplica> replicas) { Map<String, Integer> totalPartitionCapacity = new HashMap<>(); for (AssignableReplica replica : replicas) { + // TODO: the exception could occur in the middle of for loop and the previous added records cannot be reverted addToAssignmentRecord(replica); // increment the capacity requirement according to partition's capacity configuration. for (Map.Entry<String, Integer> capacity : replica.getCapacity().entrySet()) { @@ -115,8 +106,8 @@ public class AssignableNode implements Comparable<AssignableNode> { } // Update the global state after all single replications' calculation is done. - for (String key : totalPartitionCapacity.keySet()) { - updateCapacityAndUtilization(key, totalPartitionCapacity.get(key)); + for (String capacityKey : totalPartitionCapacity.keySet()) { + updateCapacityAndUtilization(capacityKey, totalPartitionCapacity.get(capacityKey)); } } @@ -127,7 +118,7 @@ public class AssignableNode implements Comparable<AssignableNode> { void assign(AssignableReplica assignableReplica) { addToAssignmentRecord(assignableReplica); assignableReplica.getCapacity().entrySet().stream() - .forEach(capacity -> updateCapacityAndUtilization(capacity.getKey(), capacity.getValue())); + .forEach(capacity -> updateCapacityAndUtilization(capacity.getKey(), capacity.getValue())); } /** @@ -218,8 +209,16 @@ public class AssignableNode implements Comparable<AssignableNode> { /** * @return The current available capacity. */ - public Map<String, Integer> getCurrentCapacity() { - return _currentCapacityMap; + public Map<String, Integer> getRemainingCapacity() { + return _remainingCapacity; + } + + /** + * @return A map of <capacity category, capacity number> that describes the max capacity of the + * node. + */ + public Map<String, Integer> getMaxCapacity() { + return _maxAllowedCapacity; } /** @@ -228,7 +227,6 @@ public class AssignableNode implements Comparable<AssignableNode> { * categories. * For example, if the current node usage is {CPU: 0.9, MEM: 0.4, DISK: 0.6}. Then this call shall * return 0.9. - * * @return The highest utilization number of the node among all the capacity category. */ public float getHighestCapacityUtilization() { @@ -260,14 +258,6 @@ public class AssignableNode implements Comparable<AssignableNode> { } /** - * @return A map of <capacity category, capacity number> that describes the max capacity of the - * node. - */ - public Map<String, Integer> getMaxCapacity() { - return _maxCapacity; - } - - /** * @return The max partition count that are allowed to be allocated on the node. */ public int getMaxPartition() { @@ -294,14 +284,15 @@ public class AssignableNode implements Comparable<AssignableNode> { if (topologyStr == null || faultZoneType == null) { LOG.debug("Topology configuration is not complete. Topology define: {}, Fault Zone Type: {}", topologyStr, faultZoneType); - // Use the instance name, or the deprecated ZoneId field (if exists) as the default fault zone. + // Use the instance name, or the deprecated ZoneId field (if exists) as the default fault + // zone. String zoneId = instanceConfig.getZoneId(); return zoneId == null ? instanceConfig.getInstanceName() : zoneId; } else { // Get the fault zone information from the complete topology definition. String[] topologyDef = topologyStr.trim().split("/"); - if (topologyDef.length == 0 || - Arrays.stream(topologyDef).noneMatch(type -> type.equals(faultZoneType))) { + if (topologyDef.length == 0 + || Arrays.stream(topologyDef).noneMatch(type -> type.equals(faultZoneType))) { throw new HelixException( "The configured topology definition is empty or does not contain the fault zone type."); } @@ -350,22 +341,22 @@ public class AssignableNode implements Comparable<AssignableNode> { } } - private void updateCapacityAndUtilization(String capacityKey, int valueToSubtract) { - if (_currentCapacityMap.containsKey(capacityKey)) { - int newCapacity = _currentCapacityMap.get(capacityKey) - valueToSubtract; - _currentCapacityMap.put(capacityKey, newCapacity); - // For the purpose of constraint calculation, the max utilization cannot be larger than 100%. - float utilization = Math.min( - (float) (_maxCapacity.get(capacityKey) - newCapacity) / _maxCapacity.get(capacityKey), 1); - _highestCapacityUtilization = Math.max(_highestCapacityUtilization, utilization); + private void updateCapacityAndUtilization(String capacityKey, int usage) { + if (!_remainingCapacity.containsKey(capacityKey)) { + //if the capacityKey belongs to replicas does not exist in the instance's capacity, + // it will be treated as if it has unlimited capacity of that capacityKey + return; } - // else if the capacityKey does not exist in the capacity map, this method essentially becomes - // a NOP; in other words, this node will be treated as if it has unlimited capacity. + int newCapacity = _remainingCapacity.get(capacityKey) - usage; + _remainingCapacity.put(capacityKey, newCapacity); + // For the purpose of constraint calculation, the max utilization cannot be larger than 100%. + float utilization = Math.min((float) (_maxAllowedCapacity.get(capacityKey) - newCapacity) + / _maxAllowedCapacity.get(capacityKey), 1); + _highestCapacityUtilization = Math.max(_highestCapacityUtilization, utilization); } /** * Get and validate the instance capacity from instance config. - * * @throws HelixException if any required capacity key is not configured in the instance config. */ private Map<String, Integer> fetchInstanceCapacity(ClusterConfig clusterConfig, diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java index 2b53422..276b998 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java @@ -80,7 +80,7 @@ public class ClusterModelProvider { bestPossibleAssignment, allocatedReplicas); // Update the allocated replicas to the assignable nodes. - assignableNodes.stream().forEach(node -> node.assignNewBatch( + assignableNodes.stream().forEach(node -> node.assignInitBatch( allocatedReplicas.getOrDefault(node.getInstanceName(), Collections.emptySet()))); // Construct and initialize cluster context. diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestNodeCapacityConstraint.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestNodeCapacityConstraint.java index 511f881..4365a42 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestNodeCapacityConstraint.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestNodeCapacityConstraint.java @@ -39,7 +39,7 @@ public class TestNodeCapacityConstraint { @Test public void testConstraintValidWhenNodeHasEnoughSpace() { String key = "testKey"; - when(_testNode.getCurrentCapacity()).thenReturn(ImmutableMap.of(key, 10)); + when(_testNode.getRemainingCapacity()).thenReturn(ImmutableMap.of(key, 10)); when(_testReplica.getCapacity()).thenReturn(ImmutableMap.of(key, 5)); Assert.assertTrue(_constraint.isAssignmentValid(_testNode, _testReplica, _clusterContext)); } @@ -47,7 +47,7 @@ public class TestNodeCapacityConstraint { @Test public void testConstraintInValidWhenNodeHasInsufficientSpace() { String key = "testKey"; - when(_testNode.getCurrentCapacity()).thenReturn(ImmutableMap.of(key, 1)); + when(_testNode.getRemainingCapacity()).thenReturn(ImmutableMap.of(key, 1)); when(_testReplica.getCapacity()).thenReturn(ImmutableMap.of(key, 5)); Assert.assertFalse(_constraint.isAssignmentValid(_testNode, _testReplica, _clusterContext)); } diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java index 6975901..b48587f 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java @@ -19,9 +19,10 @@ package org.apache.helix.controller.rebalancer.waged.model; * under the License. */ +import static org.mockito.Mockito.when; + import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -37,8 +38,6 @@ import org.testng.Assert; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import static org.mockito.Mockito.when; - public class TestAssignableNode extends AbstractTestClusterModel { @BeforeClass public void initialize() { @@ -65,7 +64,7 @@ public class TestAssignableNode extends AbstractTestClusterModel { AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(), testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId); - assignableNode.assignNewBatch(assignmentSet); + assignableNode.assignInitBatch(assignmentSet); Assert.assertEquals(assignableNode.getAssignedPartitionsMap(), expectedAssignment); Assert.assertEquals(assignableNode.getAssignedReplicaCount(), 4); Assert.assertEquals(assignableNode.getHighestCapacityUtilization(), 16.0 / 20.0, 0.005); @@ -74,7 +73,7 @@ public class TestAssignableNode extends AbstractTestClusterModel { Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags); Assert.assertEquals(assignableNode.getFaultZone(), _testFaultZoneId); Assert.assertEquals(assignableNode.getDisabledPartitionsMap(), _disabledPartitionsMap); - Assert.assertEquals(assignableNode.getCurrentCapacity(), expectedCapacityMap); + Assert.assertEquals(assignableNode.getRemainingCapacity(), expectedCapacityMap); Assert.assertEquals(assignableNode.getAssignedReplicas(), assignmentSet); Assert.assertEquals(assignableNode.getAssignedPartitionsByResource(_resourceNames.get(0)), expectedAssignmentSet1); @@ -114,7 +113,7 @@ public class TestAssignableNode extends AbstractTestClusterModel { Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags); Assert.assertEquals(assignableNode.getFaultZone(), _testFaultZoneId); Assert.assertEquals(assignableNode.getDisabledPartitionsMap(), _disabledPartitionsMap); - Assert.assertEquals(assignableNode.getCurrentCapacity(), expectedCapacityMap); + Assert.assertEquals(assignableNode.getRemainingCapacity(), expectedCapacityMap); Assert.assertEquals(assignableNode.getAssignedReplicas(), assignmentSet); Assert.assertEquals(assignableNode.getAssignedPartitionsByResource(_resourceNames.get(0)), expectedAssignmentSet1); @@ -147,7 +146,7 @@ public class TestAssignableNode extends AbstractTestClusterModel { Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags); Assert.assertEquals(assignableNode.getFaultZone(), _testFaultZoneId); Assert.assertEquals(assignableNode.getDisabledPartitionsMap(), _disabledPartitionsMap); - Assert.assertEquals(assignableNode.getCurrentCapacity(), expectedCapacityMap); + Assert.assertEquals(assignableNode.getRemainingCapacity(), expectedCapacityMap); Assert.assertEquals(assignableNode.getAssignedReplicas(), assignmentSet); Assert.assertEquals(assignableNode.getAssignedPartitionsByResource(_resourceNames.get(0)), expectedAssignmentSet1); @@ -184,7 +183,7 @@ public class TestAssignableNode extends AbstractTestClusterModel { AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(), testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId); - assignableNode.assignNewBatch(assignmentSet); + assignableNode.assignInitBatch(assignmentSet); AssignableReplica duplicateReplica = new AssignableReplica(testCache.getClusterConfig(), testCache.getResourceConfig(_resourceNames.get(0)), _partitionNames.get(0), "SLAVE", 2); assignableNode.assign(duplicateReplica);
