pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation URL: https://github.com/apache/helix/pull/362#discussion_r307987847
########## File path: helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java ########## @@ -19,10 +19,290 @@ * under the License. */ +import org.apache.helix.HelixException; +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.InstanceConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.max; + /** - * A placeholder before we have the implementation. - * - * This class represents a potential allocation of the replication. - * Note that AssignableNode is not thread safe. + * This class represents a possible allocation of the replication. + * Note that any usage updates to the AssignableNode are not thread safe. */ -public class AssignableNode { } +public class AssignableNode { + private static final Logger _logger = LoggerFactory.getLogger(AssignableNode.class.getName()); + + // basic node information + private final String _instanceName; + private Set<String> _instanceTags; + private String _faultZone; + private Map<String, List<String>> _disabledPartitionsMap; + private Map<String, Integer> _maxCapacity; + private int _maxPartition; + + // proposed assignment tracking + // <resource name, partition name> + private Map<String, Set<String>> _currentAssignments; + // <resource name, top state partition name> + private Map<String, Set<String>> _currentTopStateAssignments; + // <capacity key, capacity value> + private Map<String, Integer> _currentCapacity; + // runtime usage tracking + private int _totalReplicaAssignmentCount; + private float _highestCapacityUtilization; + + AssignableNode(ResourceControllerDataProvider clusterCache, String instanceName, + Collection<AssignableReplica> existingAssignment) { + _instanceName = instanceName; + refresh(clusterCache, existingAssignment); + } + + private void reset() { + _currentAssignments = new HashMap<>(); + _currentTopStateAssignments = new HashMap<>(); + _currentCapacity = new HashMap<>(); + _totalReplicaAssignmentCount = 0; + _highestCapacityUtilization = 0; + } + + /** + * Update the node with a ClusterDataCache. This resets the current assignment and recalculate currentCapacity. + * NOTE: While this is required to be used in the constructor, this can also be used when the clusterCache needs to be + * refreshed. This is under the assumption that the capacity mappings of InstanceConfig and ResourceConfig could + * subject to changes. If the assumption is no longer true, this function should become private. + * + * @param clusterCache - the current cluster cache to initial the AssignableNode. + */ + private void refresh(ResourceControllerDataProvider clusterCache, + Collection<AssignableReplica> existingAssignment) { + reset(); + + InstanceConfig instanceConfig = clusterCache.getInstanceConfigMap().get(_instanceName); + ClusterConfig clusterConfig = clusterCache.getClusterConfig(); + + _currentCapacity.putAll(instanceConfig.getInstanceCapacityMap()); + _faultZone = computeFaultZone(clusterConfig, instanceConfig); + _instanceTags = new HashSet<>(instanceConfig.getTags()); + _disabledPartitionsMap = instanceConfig.getDisabledPartitionsMap(); + _maxCapacity = instanceConfig.getInstanceCapacityMap(); + _maxPartition = clusterConfig.getMaxPartitionsPerInstance(); + + assignNewBatch(existingAssignment); + } + + /** + * Assign a replica to the node. + * + * @param assignableReplica - the replica to be assigned + */ + void assign(AssignableReplica assignableReplica) { + if (!addToAssignmentRecord(assignableReplica, _currentAssignments)) { + throw new HelixException(String + .format("Resource {} already has a replica from partition {} on this node", + assignableReplica.getResourceName(), assignableReplica.getPartitionName())); + } else { + if (assignableReplica.isReplicaTopState()) { + addToAssignmentRecord(assignableReplica, _currentTopStateAssignments); + } + _totalReplicaAssignmentCount += 1; + assignableReplica.getCapacity().entrySet().stream() + .forEach(entry -> updateCapacityAndUtilization(entry.getKey(), entry.getValue())); + } + } + + /** + * Release a replica from the node. + * If the replication is not on this node, the assignable node is not updated. + * + * @param assignableReplica - the replica to be released + */ + void release(AssignableReplica assignableReplica) throws IllegalArgumentException { + String resourceName = assignableReplica.getResourceName(); + String partitionName = assignableReplica.getPartitionName(); + if (!_currentAssignments.containsKey(resourceName)) { + _logger.warn("Resource " + resourceName + " is not on this node. Ignore the release call."); + return; + } + + Set<String> partitions = _currentAssignments.get(resourceName); + if (!partitions.contains(partitionName)) { + _logger.warn(String + .format("Resource {} does not have a replica from partition {} on this node", + resourceName, partitionName)); + return; + } + + partitions.remove(assignableReplica.getPartitionName()); + if (assignableReplica.isReplicaTopState()) { + _currentTopStateAssignments.get(resourceName).remove(partitionName); + } + _totalReplicaAssignmentCount -= 1; + // Recalculate utilization because of release + _highestCapacityUtilization = 0; + assignableReplica.getCapacity().entrySet().stream() + .forEach(entry -> updateCapacityAndUtilization(entry.getKey(), -1 * entry.getValue())); + } + + public Map<String, Set<String>> getCurrentAssignmentsMap() { + return _currentAssignments; + } + + public Set<String> getCurrentAssignmentsByResource(String resource) { + if (!_currentAssignments.containsKey(resource)) { + return Collections.emptySet(); + } + return _currentAssignments.get(resource); + } + + public Set<String> getCurrentTopStateAssignmentsByResource(String resource) { + if (!_currentTopStateAssignments.containsKey(resource)) { + return Collections.emptySet(); + } + return _currentTopStateAssignments.get(resource); + } + + public Integer getTopStateAssignmentTotalSize() { + int totalSize = 0; + for (Set<String> topStates : _currentTopStateAssignments.values()) { + totalSize += topStates.size(); + } + return totalSize; + } + + public int getCurrentAssignmentCount() { + return _totalReplicaAssignmentCount; + } + + public Map<String, Integer> getCurrentCapacity() { + return _currentCapacity; + } + + public float getHighestCapacityUtilization() { + return _highestCapacityUtilization; + } + + public String getInstanceName() { + return _instanceName; + } + + public Set<String> getInstanceTags() { + return _instanceTags; + } + + public String getFaultZone() { + return _faultZone; + } + + public Map<String, List<String>> getDisabledPartitionsMap() { + return _disabledPartitionsMap; + } + + public Map<String, Integer> getMaxCapacity() { + return _maxCapacity; + } + + public Integer getMaxPartition() { + return _maxPartition; + } + + /** + * Computes the fault zone id based on the domain and fault zone type when topology is enabled. For example, when + * the domain is "zone=2, instance=testInstance" and the fault zone type is "zone", this function returns "2". + * If cannot find the fault zone id, this function leaves the fault zone id as the instance name. + */ + private String computeFaultZone(ClusterConfig clusterConfig, InstanceConfig instanceConfig) { + if (clusterConfig.isTopologyAwareEnabled()) { + String[] topologyDef = clusterConfig.getTopology().trim().split("/"); + String faultZoneType = clusterConfig.getFaultZoneType(); + Map<String, String> domainAsMap = instanceConfig.getDomainAsMap(); + if (faultZoneType == null || topologyDef.length == 0 || domainAsMap == null || domainAsMap + .isEmpty()) { + _logger.error( + "For instance {}, either faultZoneType or domain is not defined. Proceed to leave the fault zone as the instance name", + _instanceName); + return instanceConfig.getInstanceName(); + } else { + StringBuilder faultZoneStringBuilder = new StringBuilder(); + for (String key : topologyDef) { + if (domainAsMap.containsKey(key)) { + faultZoneStringBuilder.append(domainAsMap.get(key)); + faultZoneStringBuilder.append('/'); + } else { + _logger.error("The domain configuration of instance {} is not complete", _instanceName); + return instanceConfig.getInstanceName(); + } + if (key.equals(faultZoneType)) { + break; + } + } + return faultZoneStringBuilder.toString(); + } + } else { + // For backward compatibility + String zoneId = instanceConfig.getZoneId(); + return zoneId == null ? instanceConfig.getInstanceName() : zoneId; + } + } + + /** + * This function should only be used to assign a set of partitions that doesn't exist before. + * Using this function avoids the overhead of updating capacity repeatedly. + */ + private void assignNewBatch(Collection<AssignableReplica> replicas) { + Map<String, Integer> totalPartitionCapacity = new HashMap<>(); + for (AssignableReplica replica : replicas) { + addToAssignmentRecord(replica, _currentAssignments); + if (replica.isReplicaTopState()) { + addToAssignmentRecord(replica, _currentTopStateAssignments); + } + for (Map.Entry<String, Integer> capacity : replica.getCapacity().entrySet()) { + totalPartitionCapacity.put(capacity.getKey(), + totalPartitionCapacity.getOrDefault(capacity.getKey(), 0) + capacity.getValue()); + } + } + _totalReplicaAssignmentCount += replicas.size(); + for (String key : totalPartitionCapacity.keySet()) { + updateCapacityAndUtilization(key, totalPartitionCapacity.get(key)); + } + } + + private boolean addToAssignmentRecord(AssignableReplica replica, + Map<String, Set<String>> currentAssignments) { + String resourceName = replica.getResourceName(); + if (currentAssignments.containsKey(resourceName)) { + return currentAssignments.get(resourceName).add(replica.getPartitionName()); + } else { + Set<String> replicaSet = new HashSet<>(); + replicaSet.add(replica.getPartitionName()); + currentAssignments.put(resourceName, replicaSet); + return true; + } Review comment: It seems this function can actually simplify by using `computeIfAbsent`? ```java return currentAssignments.computeIfAbsent(replica.getResourceName(), k -> new HashSet<>()) .add(replica.getPartitionName()); ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services