This is an automated email from the ASF dual-hosted git repository. jiajunwang pushed a commit to branch wagedRebalancer in repository https://gitbox.apache.org/repos/asf/helix.git
commit f8a4a89313c645d741b0cd0815640a68be94072b Author: jiajunwang <[email protected]> AuthorDate: Fri Aug 2 21:21:49 2019 -0700 Implement the WAGED rebalancer cluster model (#362) * Introduce the cluster model classes to support the WAGED rebalancer. Implement the cluster model classes with the minimum necessary information to support rebalance. Additional field/logics might be added later once the detailed rebalance logic is implemented. Also add related tests. --- .../rebalancer/waged/ClusterDataProvider.java | 2 +- .../rebalancer/waged/model/AssignableNode.java | 291 ++++++++++++++++++++- .../rebalancer/waged/model/AssignableReplica.java | 118 ++++++++- .../rebalancer/waged/model/ClusterContext.java | 99 ++++++- .../rebalancer/waged/model/ClusterModel.java | 132 +++++++++- .../apache/helix/model/StateModelDefinition.java | 4 +- .../waged/model/AbstractTestClusterModel.java | 176 +++++++++++++ .../rebalancer/waged/model/TestAssignableNode.java | 203 ++++++++++++++ .../waged/model/TestAssignableReplica.java | 99 +++++++ .../rebalancer/waged/model/TestClusterContext.java | 90 +++++++ .../rebalancer/waged/model/TestClusterModel.java | 114 ++++++++ 11 files changed, 1311 insertions(+), 17 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataProvider.java index 419be42..feae1dc 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataProvider.java @@ -48,6 +48,6 @@ public class ClusterDataProvider { Set<String> activeInstances, Map<ClusterDataDetector.ChangeType, Set<String>> clusterChanges, Map<String, IdealState> baselineAssignment, Map<String, IdealState> bestPossibleAssignment) { // TODO finish the implementation. - return new ClusterModel(); + return null; } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java index ae037f4..989323e 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java @@ -19,10 +19,291 @@ package org.apache.helix.controller.rebalancer.waged.model; * under the License. */ +import org.apache.helix.HelixException; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.InstanceConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.max; + /** - * A placeholder before we have the implementation. - * - * This class represents a potential allocation of the replication. - * Note that AssignableNode is not thread safe. + * This class represents a possible allocation of the replication. + * Note that any usage updates to the AssignableNode are not thread safe. */ -public class AssignableNode { } +public class AssignableNode { + private static final Logger LOG = LoggerFactory.getLogger(AssignableNode.class.getName()); + + // basic node information + private final String _instanceName; + private Set<String> _instanceTags; + private String _faultZone; + private Map<String, List<String>> _disabledPartitionsMap; + private Map<String, Integer> _maxCapacity; + private int _maxPartition; // maximum number of the partitions that can be assigned to the node. + + // proposed assignment tracking + // <resource name, partition name> + private Map<String, Set<String>> _currentAssignments; + // <resource name, top state partition name> + private Map<String, Set<String>> _currentTopStateAssignments; + // <capacity key, capacity value> + private Map<String, Integer> _currentCapacity; + // The maximum capacity utilization (0.0 - 1.0) across all the capacity categories. + private float _highestCapacityUtilization; + + AssignableNode(ClusterConfig clusterConfig, InstanceConfig instanceConfig, String instanceName, + Collection<AssignableReplica> existingAssignment) { + _instanceName = instanceName; + refresh(clusterConfig, instanceConfig, existingAssignment); + } + + private void reset() { + _currentAssignments = new HashMap<>(); + _currentTopStateAssignments = new HashMap<>(); + _currentCapacity = new HashMap<>(); + _highestCapacityUtilization = 0; + } + + /** + * Update the node with a ClusterDataCache. This resets the current assignment and recalculates currentCapacity. + * NOTE: While this is required to be used in the constructor, this can also be used when the clusterCache needs to be + * refreshed. This is under the assumption that the capacity mappings of InstanceConfig and ResourceConfig could + * subject to change. If the assumption is no longer true, this function should become private. + * + * @param clusterConfig - the Cluster Config of the cluster where the node is located + * @param instanceConfig - the Instance Config of the node + * @param existingAssignment - all the existing replicas that are current assigned to the node + */ + private void refresh(ClusterConfig clusterConfig, InstanceConfig instanceConfig, + Collection<AssignableReplica> existingAssignment) { + reset(); + + _currentCapacity.putAll(instanceConfig.getInstanceCapacityMap()); + _faultZone = computeFaultZone(clusterConfig, instanceConfig); + _instanceTags = new HashSet<>(instanceConfig.getTags()); + _disabledPartitionsMap = instanceConfig.getDisabledPartitionsMap(); + _maxCapacity = instanceConfig.getInstanceCapacityMap(); + _maxPartition = clusterConfig.getMaxPartitionsPerInstance(); + + assignNewBatch(existingAssignment); + } + + /** + * Assign a replica to the node. + * + * @param assignableReplica - the replica to be assigned + */ + void assign(AssignableReplica assignableReplica) { + if (!addToAssignmentRecord(assignableReplica, _currentAssignments)) { + throw new HelixException(String + .format("Resource %s already has a replica from partition %s on node %s", + assignableReplica.getResourceName(), assignableReplica.getPartitionName(), + getInstanceName())); + } else { + if (assignableReplica.isReplicaTopState()) { + addToAssignmentRecord(assignableReplica, _currentTopStateAssignments); + } + assignableReplica.getCapacity().entrySet().stream().forEach( + capacity -> updateCapacityAndUtilization(capacity.getKey(), capacity.getValue())); + } + } + + /** + * Release a replica from the node. + * If the replication is not on this node, the assignable node is not updated. + * + * @param assignableReplica - the replica to be released + */ + void release(AssignableReplica assignableReplica) throws IllegalArgumentException { + String resourceName = assignableReplica.getResourceName(); + String partitionName = assignableReplica.getPartitionName(); + + // Check if the release is necessary + if (!_currentAssignments.containsKey(resourceName)) { + LOG.warn("Resource {} is not on node {}. Ignore the release call.", resourceName, + getInstanceName()); + return; + } + Set<String> partitions = _currentAssignments.get(resourceName); + if (!partitions.contains(partitionName)) { + LOG.warn(String + .format("Resource %s does not have a replica from partition %s on node %s", resourceName, + partitionName, getInstanceName())); + return; + } + + partitions.remove(assignableReplica.getPartitionName()); + if (assignableReplica.isReplicaTopState()) { + _currentTopStateAssignments.get(resourceName).remove(partitionName); + } + // Recalculate utilization because of release + _highestCapacityUtilization = 0; + assignableReplica.getCapacity().entrySet().stream() + .forEach(entry -> updateCapacityAndUtilization(entry.getKey(), -1 * entry.getValue())); + } + + public Map<String, Set<String>> getCurrentAssignmentsMap() { + return _currentAssignments; + } + + public Set<String> getCurrentAssignmentsByResource(String resource) { + return _currentAssignments.getOrDefault(resource, Collections.emptySet()); + } + + public Set<String> getCurrentTopStateAssignmentsByResource(String resource) { + return _currentTopStateAssignments.getOrDefault(resource, Collections.emptySet()); + } + + public int getTopStateAssignmentTotalSize() { + return _currentTopStateAssignments.values().stream().mapToInt(Set::size).sum(); + } + + public int getCurrentAssignmentCount() { + return _currentAssignments.values().stream().mapToInt(Set::size).sum(); + } + + public Map<String, Integer> getCurrentCapacity() { + return _currentCapacity; + } + + public float getHighestCapacityUtilization() { + return _highestCapacityUtilization; + } + + public String getInstanceName() { + return _instanceName; + } + + public Set<String> getInstanceTags() { + return _instanceTags; + } + + public String getFaultZone() { + return _faultZone; + } + + public Map<String, List<String>> getDisabledPartitionsMap() { + return _disabledPartitionsMap; + } + + public Map<String, Integer> getMaxCapacity() { + return _maxCapacity; + } + + public int getMaxPartition() { + return _maxPartition; + } + + /** + * Computes the fault zone id based on the domain and fault zone type when topology is enabled. For example, when + * the domain is "zone=2, instance=testInstance" and the fault zone type is "zone", this function returns "2". + * If cannot find the fault zone id, this function leaves the fault zone id as the instance name. + * TODO merge this logic with Topology.java tree building logic. + * For now, the WAGED rebalancer has a more strict topology def requirement. + * Any missing field will cause an invalid topology config exception. + */ + private String computeFaultZone(ClusterConfig clusterConfig, InstanceConfig instanceConfig) { + if (clusterConfig.isTopologyAwareEnabled()) { + String topologyStr = clusterConfig.getTopology(); + String faultZoneType = clusterConfig.getFaultZoneType(); + if (topologyStr == null || faultZoneType == null) { + throw new HelixException("Fault zone or cluster topology information is not configured."); + } + + String[] topologyDef = topologyStr.trim().split("/"); + if (topologyDef.length == 0 || Arrays.stream(topologyDef) + .noneMatch(type -> type.equals(faultZoneType))) { + throw new HelixException( + "The configured topology definition is empty or does not contain the fault zone type."); + } + + Map<String, String> domainAsMap = instanceConfig.getDomainAsMap(); + if (domainAsMap == null) { + throw new HelixException( + String.format("The domain configuration of node %s is not configured", _instanceName)); + } else { + StringBuilder faultZoneStringBuilder = new StringBuilder(); + for (String key : topologyDef) { + if (!key.isEmpty()) { + if (domainAsMap.containsKey(key)) { + faultZoneStringBuilder.append(domainAsMap.get(key)); + faultZoneStringBuilder.append('/'); + } else { + throw new HelixException(String.format( + "The domain configuration of node %s is not complete. Type %s is not found.", + _instanceName, key)); + } + if (key.equals(faultZoneType)) { + break; + } + } + } + return faultZoneStringBuilder.toString(); + } + } else { + // For backward compatibility + String zoneId = instanceConfig.getZoneId(); + return zoneId == null ? instanceConfig.getInstanceName() : zoneId; + } + } + + /** + * This function should only be used to assign a set of new partitions that are not allocated on this node. + * Using this function avoids the overhead of updating capacity repeatedly. + */ + private void assignNewBatch(Collection<AssignableReplica> replicas) { + Map<String, Integer> totalPartitionCapacity = new HashMap<>(); + for (AssignableReplica replica : replicas) { + addToAssignmentRecord(replica, _currentAssignments); + if (replica.isReplicaTopState()) { + addToAssignmentRecord(replica, _currentTopStateAssignments); + } + // increment the capacity requirement according to partition's capacity configuration. + for (Map.Entry<String, Integer> capacity : replica.getCapacity().entrySet()) { + totalPartitionCapacity.compute(capacity.getKey(), + (key, totalValue) -> (totalValue == null) ? + capacity.getValue() : + totalValue + capacity.getValue()); + } + } + + // Update the global state after all single replications' calculation is done. + for (String key : totalPartitionCapacity.keySet()) { + updateCapacityAndUtilization(key, totalPartitionCapacity.get(key)); + } + } + + private boolean addToAssignmentRecord(AssignableReplica replica, + Map<String, Set<String>> currentAssignments) { + return currentAssignments.computeIfAbsent(replica.getResourceName(), k -> new HashSet<>()) + .add(replica.getPartitionName()); + } + + private void updateCapacityAndUtilization(String capacityKey, int valueToSubtract) { + if (_currentCapacity.containsKey(capacityKey)) { + int newCapacity = _currentCapacity.get(capacityKey) - valueToSubtract; + _currentCapacity.put(capacityKey, newCapacity); + // For the purpose of constraint calculation, the max utilization cannot be larger than 100%. + float utilization = Math.min( + (float) (_maxCapacity.get(capacityKey) - newCapacity) / _maxCapacity.get(capacityKey), 1); + _highestCapacityUtilization = max(_highestCapacityUtilization, utilization); + } + // else if the capacityKey does not exist in the capacity map, this method essentially becomes + // a NOP; in other words, this node will be treated as if it has unlimited capacity. + } + + @Override + public int hashCode() { + return _instanceName.hashCode(); + } +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java index a6a7e4a..0082a2d 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java @@ -19,9 +19,121 @@ package org.apache.helix.controller.rebalancer.waged.model; * under the License. */ +import org.apache.helix.model.ResourceConfig; +import org.apache.helix.model.StateModelDefinition; + +import java.io.IOException; +import java.util.Map; + /** - * A placeholder before we have the implementation. - * * This class represents a partition replication that needs to be allocated. */ -public class AssignableReplica { } +public class AssignableReplica implements Comparable<AssignableReplica> { + private final String _partitionName; + private final String _resourceName; + private final String _resourceInstanceGroupTag; + private final int _resourceMaxPartitionsPerInstance; + private final Map<String, Integer> _capacityUsage; + // The priority of the replica's state + private final int _statePriority; + // The state of the replica + private final String _replicaState; + + /** + * @param resourceConfig The resource config for the resource which contains the replication. + * @param partitionName The replication's partition name. + * @param replicaState The state of the replication. + * @param statePriority The priority of the replication's state. + */ + AssignableReplica(ResourceConfig resourceConfig, String partitionName, String replicaState, + int statePriority) { + _partitionName = partitionName; + _replicaState = replicaState; + _statePriority = statePriority; + _resourceName = resourceConfig.getResourceName(); + _capacityUsage = fetchCapacityUsage(partitionName, resourceConfig); + _resourceInstanceGroupTag = resourceConfig.getInstanceGroupTag(); + _resourceMaxPartitionsPerInstance = resourceConfig.getMaxPartitionsPerInstance(); + } + + public Map<String, Integer> getCapacity() { + return _capacityUsage; + } + + public String getPartitionName() { + return _partitionName; + } + + public String getReplicaState() { + return _replicaState; + } + + public boolean isReplicaTopState() { + return _statePriority == StateModelDefinition.TOP_STATE_PRIORITY; + } + + public int getStatePriority() { + return _statePriority; + } + + public String getResourceName() { + return _resourceName; + } + + public String getResourceInstanceGroupTag() { + return _resourceInstanceGroupTag; + } + + public int getResourceMaxPartitionsPerInstance() { + return _resourceMaxPartitionsPerInstance; + } + + @Override + public String toString() { + return generateReplicaKey(_resourceName, _partitionName, _replicaState); + } + + @Override + public int compareTo(AssignableReplica replica) { + if (!_resourceName.equals(replica._resourceName)) { + return _resourceName.compareTo(replica._resourceName); + } + if (!_partitionName.equals(replica._partitionName)) { + return _partitionName.compareTo(replica._partitionName); + } + if (!_replicaState.equals(replica._replicaState)) { + return _replicaState.compareTo(replica._replicaState); + } + return 0; + } + + public static String generateReplicaKey(String resourceName, String partitionName, String state) { + return String.format("%s-%s-%s", resourceName, partitionName, state); + } + + /** + * Parse the resource config for the partition weight. + */ + private Map<String, Integer> fetchCapacityUsage(String partitionName, + ResourceConfig resourceConfig) { + Map<String, Map<String, Integer>> capacityMap; + try { + capacityMap = resourceConfig.getPartitionCapacityMap(); + } catch (IOException ex) { + throw new IllegalArgumentException( + "Invalid partition capacity configuration of resource: " + resourceConfig + .getResourceName(), ex); + } + + Map<String, Integer> partitionCapacity = capacityMap.get(partitionName); + if (partitionCapacity == null) { + partitionCapacity = capacityMap.get(ResourceConfig.DEFAULT_PARTITION_KEY); + } + if (partitionCapacity == null) { + throw new IllegalArgumentException(String.format( + "The capacity usage of the specified partition %s is not configured in the Resource Config %s. No default partition capacity is configured neither.", + partitionName, resourceConfig.getResourceName())); + } + return partitionCapacity; + } +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java index adca7d1..c163e4c 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java @@ -19,9 +19,100 @@ package org.apache.helix.controller.rebalancer.waged.model; * under the License. */ +import org.apache.helix.HelixException; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + /** - * A placeholder before we have the implementation. - * - * This class tracks the global rebalance-related status of a Helix managed cluster. + * This class tracks the rebalance-related global cluster status. */ -public class ClusterContext { } +public class ClusterContext { + private final static float ERROR_MARGIN_FOR_ESTIMATED_MAX_COUNT = 1.1f; + + // This estimation helps to ensure global partition count evenness + private final int _estimatedMaxPartitionCount; + // This estimation helps to ensure global top state replica count evenness + private final int _estimatedMaxTopStateCount; + // This estimation helps to ensure per-resource partition count evenness + private final Map<String, Integer> _estimatedMaxPartitionByResource = new HashMap<>(); + + // map{zoneName : map{resourceName : set(partitionNames)}} + private Map<String, Map<String, Set<String>>> _assignmentForFaultZoneMap = new HashMap<>(); + + /** + * Construct the cluster context based on the current instance status. + * + * @param replicaSet All the partition replicas that are managed by the rebalancer + * @param instanceCount The count of all the active instances that can be used to host partitions. + */ + ClusterContext(Set<AssignableReplica> replicaSet, int instanceCount) { + int totalReplicas = 0; + int totalTopStateReplicas = 0; + + for (Map.Entry<String, List<AssignableReplica>> entry : replicaSet.stream() + .collect(Collectors.groupingBy(AssignableReplica::getResourceName)).entrySet()) { + int replicas = entry.getValue().size(); + totalReplicas += replicas; + + int replicaCnt = Math.max(1, estimateAvgReplicaCount(replicas, instanceCount)); + _estimatedMaxPartitionByResource.put(entry.getKey(), replicaCnt); + + totalTopStateReplicas += + entry.getValue().stream().filter(AssignableReplica::isReplicaTopState).count(); + } + + _estimatedMaxPartitionCount = estimateAvgReplicaCount(totalReplicas, instanceCount); + _estimatedMaxTopStateCount = estimateAvgReplicaCount(totalTopStateReplicas, instanceCount); + } + + public Map<String, Map<String, Set<String>>> getAssignmentForFaultZoneMap() { + return _assignmentForFaultZoneMap; + } + + public int getEstimatedMaxPartitionCount() { + return _estimatedMaxPartitionCount; + } + + public int getEstimatedMaxPartitionByResource(String resourceName) { + return _estimatedMaxPartitionByResource.get(resourceName); + } + + public int getEstimatedMaxTopStateCount() { + return _estimatedMaxTopStateCount; + } + + public Set<String> getPartitionsForResourceAndFaultZone(String resourceName, String faultZoneId) { + return _assignmentForFaultZoneMap.getOrDefault(faultZoneId, Collections.emptyMap()) + .getOrDefault(resourceName, Collections.emptySet()); + } + + void addPartitionToFaultZone(String faultZoneId, String resourceName, String partition) { + if (!_assignmentForFaultZoneMap.computeIfAbsent(faultZoneId, k -> new HashMap<>()) + .computeIfAbsent(resourceName, k -> new HashSet<>()).add(partition)) { + throw new HelixException(String + .format("Resource %s already has a replica from partition %s in fault zone %s", + resourceName, partition, faultZoneId)); + } + } + + boolean removePartitionFromFaultZone(String faultZoneId, String resourceName, String partition) { + return _assignmentForFaultZoneMap.getOrDefault(faultZoneId, Collections.emptyMap()) + .getOrDefault(resourceName, Collections.emptySet()).remove(partition); + } + + void setAssignmentForFaultZoneMap( + Map<String, Map<String, Set<String>>> assignmentForFaultZoneMap) { + _assignmentForFaultZoneMap = assignmentForFaultZoneMap; + } + + private int estimateAvgReplicaCount(int replicaCount, int instanceCount) { + return (int) Math + .ceil((float) replicaCount / instanceCount * ERROR_MARGIN_FOR_ESTIMATED_MAX_COUNT); + } +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java index 06eebf7..2908939 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java @@ -19,9 +19,135 @@ package org.apache.helix.controller.rebalancer.waged.model; * under the License. */ +import org.apache.helix.HelixException; +import org.apache.helix.model.IdealState; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + /** - * A placeholder before we have the implementation. - * * This class wraps the required input for the rebalance algorithm. */ -public class ClusterModel { } +public class ClusterModel { + private final ClusterContext _clusterContext; + // Map to track all the assignable replications. <Resource Name, Set<Replicas>> + private final Map<String, Set<AssignableReplica>> _assignableReplicaMap; + // The index to find the replication information with a certain state. <Resource, <Key(resource_partition_state), Replica>> + // Note that the identical replicas are deduped in the index. + private final Map<String, Map<String, AssignableReplica>> _assignableReplicaIndex; + private final Map<String, AssignableNode> _assignableNodeMap; + + // Records about the previous assignment + // <ResourceName, IdealState contains the baseline assignment> + private final Map<String, IdealState> _baselineAssignment; + // <ResourceName, IdealState contains the best possible assignment> + private final Map<String, IdealState> _bestPossibleAssignment; + + /** + * @param clusterContext The initialized cluster context. + * @param assignableReplicas The replicas to be assigned. + * Note that the replicas in this list shall not be included while initializing the context and assignable nodes. + * @param assignableNodes The active instances. + * @param baselineAssignment The recorded baseline assignment. + * @param bestPossibleAssignment The current best possible assignment. + */ + ClusterModel(ClusterContext clusterContext, Set<AssignableReplica> assignableReplicas, + Set<AssignableNode> assignableNodes, Map<String, IdealState> baselineAssignment, + Map<String, IdealState> bestPossibleAssignment) { + _clusterContext = clusterContext; + + // Save all the to be assigned replication + _assignableReplicaMap = assignableReplicas.stream() + .collect(Collectors.groupingBy(AssignableReplica::getResourceName, Collectors.toSet())); + + // Index all the replicas to be assigned. Dedup the replica if two instances have the same resource/partition/state + _assignableReplicaIndex = assignableReplicas.stream().collect(Collectors + .groupingBy(AssignableReplica::getResourceName, Collectors + .toMap(AssignableReplica::toString, replica -> replica, + (oldValue, newValue) -> oldValue))); + + _assignableNodeMap = assignableNodes.stream() + .collect(Collectors.toMap(AssignableNode::getInstanceName, node -> node)); + + _baselineAssignment = baselineAssignment; + _bestPossibleAssignment = bestPossibleAssignment; + } + + public ClusterContext getContext() { + return _clusterContext; + } + + public Map<String, AssignableNode> getAssignableNodes() { + return _assignableNodeMap; + } + + public Map<String, Set<AssignableReplica>> getAssignableReplicaMap() { + return _assignableReplicaMap; + } + + public Map<String, IdealState> getBaseline() { + return _baselineAssignment; + } + + public Map<String, IdealState> getBestPossibleAssignment() { + return _bestPossibleAssignment; + } + + /** + * Assign the given replica to the specified instance and record the assignment in the cluster model. + * The cluster usage information will be updated accordingly. + * + * @param resourceName + * @param partitionName + * @param state + * @param instanceName + */ + public void assign(String resourceName, String partitionName, String state, String instanceName) { + AssignableNode node = locateAssignableNode(instanceName); + AssignableReplica replica = locateAssignableReplica(resourceName, partitionName, state); + + node.assign(replica); + _clusterContext.addPartitionToFaultZone(node.getFaultZone(), resourceName, partitionName); + } + + /** + * Revert the proposed assignment from the cluster model. + * The cluster usage information will be updated accordingly. + * + * @param resourceName + * @param partitionName + * @param state + * @param instanceName + */ + public void release(String resourceName, String partitionName, String state, + String instanceName) { + AssignableNode node = locateAssignableNode(instanceName); + AssignableReplica replica = locateAssignableReplica(resourceName, partitionName, state); + + node.release(replica); + _clusterContext.removePartitionFromFaultZone(node.getFaultZone(), resourceName, partitionName); + } + + private AssignableNode locateAssignableNode(String instanceName) { + AssignableNode node = _assignableNodeMap.get(instanceName); + if (node == null) { + throw new HelixException("Cannot find the instance: " + instanceName); + } + return node; + } + + private AssignableReplica locateAssignableReplica(String resourceName, String partitionName, + String state) { + AssignableReplica sampleReplica = + _assignableReplicaIndex.getOrDefault(resourceName, Collections.emptyMap()) + .get(AssignableReplica.generateReplicaKey(resourceName, partitionName, state)); + if (sampleReplica == null) { + throw new HelixException(String + .format("Cannot find the replication with resource name %s, partition name %s, state %s.", + resourceName, partitionName, state)); + } + return sampleReplica; + } +} diff --git a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java index ae59522..0a40331 100644 --- a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java +++ b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java @@ -46,6 +46,8 @@ public class StateModelDefinition extends HelixProperty { STATE_PRIORITY_LIST } + public static final int TOP_STATE_PRIORITY = 1; + /** * state model's initial state */ @@ -98,7 +100,7 @@ public class StateModelDefinition extends HelixProperty { _stateTransitionTable = new HashMap<>(); _statesCountMap = new HashMap<>(); if (_statesPriorityList != null) { - int priority = 1; + int priority = TOP_STATE_PRIORITY; for (String state : _statesPriorityList) { Map<String, String> metaData = record.getMapField(state + ".meta"); if (metaData != null) { diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java new file mode 100644 index 0000000..0e2b43a --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java @@ -0,0 +1,176 @@ +package org.apache.helix.controller.rebalancer.waged.model; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.CurrentState; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.ResourceConfig; +import org.mockito.Mockito; +import org.testng.annotations.BeforeClass; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.mockito.Mockito.when; + +public abstract class AbstractTestClusterModel { + protected String _testInstanceId; + protected List<String> _resourceNames; + protected List<String> _partitionNames; + protected Map<String, Integer> _capacityDataMap; + protected Map<String, List<String>> _disabledPartitionsMap; + protected List<String> _testInstanceTags; + protected String _testFaultZoneId; + + @BeforeClass + public void initialize() { + _testInstanceId = "testInstanceId"; + _resourceNames = new ArrayList<>(); + _resourceNames.add("Resource1"); + _resourceNames.add("Resource2"); + _partitionNames = new ArrayList<>(); + _partitionNames.add("Partition1"); + _partitionNames.add("Partition2"); + _partitionNames.add("Partition3"); + _partitionNames.add("Partition4"); + _capacityDataMap = new HashMap<>(); + _capacityDataMap.put("item1", 20); + _capacityDataMap.put("item2", 40); + _capacityDataMap.put("item3", 30); + List<String> disabledPartitions = new ArrayList<>(); + disabledPartitions.add("TestPartition"); + _disabledPartitionsMap = new HashMap<>(); + _disabledPartitionsMap.put("TestResource", disabledPartitions); + _testInstanceTags = new ArrayList<>(); + _testInstanceTags.add("TestTag"); + _testFaultZoneId = "testZone"; + } + + protected ResourceControllerDataProvider setupClusterDataCache() throws IOException { + ResourceControllerDataProvider testCache = Mockito.mock(ResourceControllerDataProvider.class); + + // 1. Set up the default instance information with capacity configuration. + InstanceConfig testInstanceConfig = new InstanceConfig("testInstanceId"); + testInstanceConfig.setInstanceCapacityMap(_capacityDataMap); + testInstanceConfig.addTag(_testInstanceTags.get(0)); + testInstanceConfig.setInstanceEnabledForPartition("TestResource", "TestPartition", false); + testInstanceConfig.setInstanceEnabled(true); + testInstanceConfig.setZoneId(_testFaultZoneId); + Map<String, InstanceConfig> instanceConfigMap = new HashMap<>(); + instanceConfigMap.put(_testInstanceId, testInstanceConfig); + when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap); + + // 2. Set up the basic cluster configuration. + ClusterConfig testClusterConfig = new ClusterConfig("testClusterConfigId"); + testClusterConfig.setMaxPartitionsPerInstance(5); + testClusterConfig.setDisabledInstances(Collections.emptyMap()); + testClusterConfig.setTopologyAwareEnabled(false); + when(testCache.getClusterConfig()).thenReturn(testClusterConfig); + + // 3. Mock the live instance node for the default instance. + LiveInstance testLiveInstance = new LiveInstance(_testInstanceId); + testLiveInstance.setSessionId("testSessionId"); + Map<String, LiveInstance> liveInstanceMap = new HashMap<>(); + liveInstanceMap.put(_testInstanceId, testLiveInstance); + when(testCache.getLiveInstances()).thenReturn(liveInstanceMap); + + // 4. Mock two resources, each with 2 partitions on the default instance. + // The instance will have the following partitions assigned + // Resource 1: + // partition 1 - MASTER + // partition 2 - SLAVE + // Resource 2: + // partition 3 - MASTER + // partition 4 - SLAVE + CurrentState testCurrentStateResource1 = Mockito.mock(CurrentState.class); + Map<String, String> partitionStateMap1 = new HashMap<>(); + partitionStateMap1.put(_partitionNames.get(0), "MASTER"); + partitionStateMap1.put(_partitionNames.get(1), "SLAVE"); + when(testCurrentStateResource1.getResourceName()).thenReturn(_resourceNames.get(0)); + when(testCurrentStateResource1.getPartitionStateMap()).thenReturn(partitionStateMap1); + when(testCurrentStateResource1.getStateModelDefRef()).thenReturn("MasterSlave"); + when(testCurrentStateResource1.getState(_partitionNames.get(0))).thenReturn("MASTER"); + when(testCurrentStateResource1.getState(_partitionNames.get(1))).thenReturn("SLAVE"); + CurrentState testCurrentStateResource2 = Mockito.mock(CurrentState.class); + Map<String, String> partitionStateMap2 = new HashMap<>(); + partitionStateMap2.put(_partitionNames.get(2), "MASTER"); + partitionStateMap2.put(_partitionNames.get(3), "SLAVE"); + when(testCurrentStateResource2.getResourceName()).thenReturn(_resourceNames.get(1)); + when(testCurrentStateResource2.getPartitionStateMap()).thenReturn(partitionStateMap2); + when(testCurrentStateResource2.getStateModelDefRef()).thenReturn("MasterSlave"); + when(testCurrentStateResource2.getState(_partitionNames.get(2))).thenReturn("MASTER"); + when(testCurrentStateResource2.getState(_partitionNames.get(3))).thenReturn("SLAVE"); + Map<String, CurrentState> currentStatemap = new HashMap<>(); + currentStatemap.put(_resourceNames.get(0), testCurrentStateResource1); + currentStatemap.put(_resourceNames.get(1), testCurrentStateResource2); + when(testCache.getCurrentState(_testInstanceId, "testSessionId")).thenReturn(currentStatemap); + + // 5. Set up the resource config for the two resources with the partition weight. + Map<String, Integer> capacityDataMapResource1 = new HashMap<>(); + capacityDataMapResource1.put("item1", 3); + capacityDataMapResource1.put("item2", 6); + ResourceConfig testResourceConfigResource1 = new ResourceConfig("Resource1"); + testResourceConfigResource1.setPartitionCapacityMap( + Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMapResource1)); + when(testCache.getResourceConfig("Resource1")).thenReturn(testResourceConfigResource1); + Map<String, Integer> capacityDataMapResource2 = new HashMap<>(); + capacityDataMapResource2.put("item1", 5); + capacityDataMapResource2.put("item2", 10); + ResourceConfig testResourceConfigResource2 = new ResourceConfig("Resource2"); + testResourceConfigResource2.setPartitionCapacityMap( + Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMapResource2)); + when(testCache.getResourceConfig("Resource2")).thenReturn(testResourceConfigResource2); + + // 6. Define mock state model + for (BuiltInStateModelDefinitions bsmd : BuiltInStateModelDefinitions.values()) { + when(testCache.getStateModelDef(bsmd.name())).thenReturn(bsmd.getStateModelDefinition()); + } + + return testCache; + } + + /** + * Generate the replica objects according to the provider information. + */ + protected Set<AssignableReplica> generateReplicas(ResourceControllerDataProvider dataProvider) { + // Create assignable replica based on the current state. + Map<String, CurrentState> currentStatemap = + dataProvider.getCurrentState(_testInstanceId, "testSessionId"); + Set<AssignableReplica> assignmentSet = new HashSet<>(); + for (CurrentState cs : currentStatemap.values()) { + ResourceConfig resourceConfig = dataProvider.getResourceConfig(cs.getResourceName()); + // Construct one AssignableReplica for each partition in the current state. + cs.getPartitionStateMap().entrySet().stream().forEach(entry -> assignmentSet.add( + new AssignableReplica(resourceConfig, entry.getKey(), entry.getValue(), + entry.getValue().equals("MASTER") ? 1 : 2))); + } + return assignmentSet; + } +} diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java new file mode 100644 index 0000000..d7fcce9 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java @@ -0,0 +1,203 @@ +package org.apache.helix.controller.rebalancer.waged.model; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.HelixException; +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.InstanceConfig; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.mockito.Mockito.when; + +public class TestAssignableNode extends AbstractTestClusterModel { + @BeforeClass + public void initialize() { + super.initialize(); + } + + @Test + public void testNormalUsage() throws IOException { + // Test 1 - initialize based on the data cache and check with the expected result + ResourceControllerDataProvider testCache = setupClusterDataCache(); + Set<AssignableReplica> assignmentSet = generateReplicas(testCache); + + Set<String> expectedAssignmentSet1 = new HashSet<>(_partitionNames.subList(0, 2)); + Set<String> expectedAssignmentSet2 = new HashSet<>(_partitionNames.subList(2, 4)); + Map<String, Set<String>> expectedAssignment = new HashMap<>(); + expectedAssignment.put("Resource1", expectedAssignmentSet1); + expectedAssignment.put("Resource2", expectedAssignmentSet2); + Map<String, Integer> expectedCapacityMap = new HashMap<>(); + expectedCapacityMap.put("item1", 4); + expectedCapacityMap.put("item2", 8); + expectedCapacityMap.put("item3", 30); + + AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(), + testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId, assignmentSet); + Assert.assertTrue(assignableNode.getCurrentAssignmentsMap().equals(expectedAssignment)); + Assert.assertEquals(assignableNode.getCurrentAssignmentCount(), 4); + Assert.assertEquals(assignableNode.getHighestCapacityUtilization(), 16.0 / 20.0, 0.005); + Assert.assertTrue(assignableNode.getMaxCapacity().equals(_capacityDataMap)); + Assert.assertEquals(assignableNode.getMaxPartition(), 5); + Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags); + Assert.assertEquals(assignableNode.getFaultZone(), _testFaultZoneId); + Assert.assertTrue(assignableNode.getDisabledPartitionsMap().equals(_disabledPartitionsMap)); + Assert.assertTrue(assignableNode.getCurrentCapacity().equals(expectedCapacityMap)); + + // Test 2 - release assignment from the AssignableNode + AssignableReplica removingReplica = + new AssignableReplica(testCache.getResourceConfig(_resourceNames.get(1)), + _partitionNames.get(2), "MASTER", 1); + expectedAssignment.get(_resourceNames.get(1)).remove(_partitionNames.get(2)); + expectedCapacityMap.put("item1", 9); + expectedCapacityMap.put("item2", 18); + + assignableNode.release(removingReplica); + + Assert.assertTrue(assignableNode.getCurrentAssignmentsMap().equals(expectedAssignment)); + Assert.assertEquals(assignableNode.getCurrentAssignmentCount(), 3); + Assert.assertEquals(assignableNode.getHighestCapacityUtilization(), 11.0 / 20.0, 0.005); + Assert.assertTrue(assignableNode.getMaxCapacity().equals(_capacityDataMap)); + Assert.assertEquals(assignableNode.getMaxPartition(), 5); + Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags); + Assert.assertEquals(assignableNode.getFaultZone(), _testFaultZoneId); + Assert.assertTrue(assignableNode.getDisabledPartitionsMap().equals(_disabledPartitionsMap)); + Assert.assertTrue(assignableNode.getCurrentCapacity().equals(expectedCapacityMap)); + + // Test 3 - add assignment to the AssignableNode + AssignableReplica addingReplica = + new AssignableReplica(testCache.getResourceConfig(_resourceNames.get(1)), + _partitionNames.get(2), "SLAVE", 2); + expectedAssignment.get(_resourceNames.get(1)).add(_partitionNames.get(2)); + expectedCapacityMap.put("item1", 4); + expectedCapacityMap.put("item2", 8); + + assignableNode.assign(addingReplica); + + Assert.assertTrue(assignableNode.getCurrentAssignmentsMap().equals(expectedAssignment)); + Assert.assertEquals(assignableNode.getCurrentAssignmentCount(), 4); + Assert.assertEquals(assignableNode.getHighestCapacityUtilization(), 16.0 / 20.0, 0.005); + Assert.assertTrue(assignableNode.getMaxCapacity().equals(_capacityDataMap)); + Assert.assertEquals(assignableNode.getMaxPartition(), 5); + Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags); + Assert.assertEquals(assignableNode.getFaultZone(), _testFaultZoneId); + Assert.assertTrue(assignableNode.getDisabledPartitionsMap().equals(_disabledPartitionsMap)); + Assert.assertTrue(assignableNode.getCurrentCapacity().equals(expectedCapacityMap)); + } + + @Test + public void testReleaseNoPartition() throws IOException { + ResourceControllerDataProvider testCache = setupClusterDataCache(); + + AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(), + testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId, + Collections.emptyList()); + AssignableReplica removingReplica = + new AssignableReplica(testCache.getResourceConfig(_resourceNames.get(1)), + _partitionNames.get(2) + "non-exist", "MASTER", 1); + + // Release shall pass. + assignableNode.release(removingReplica); + } + + @Test(expectedExceptions = HelixException.class, expectedExceptionsMessageRegExp = "Resource Resource1 already has a replica from partition Partition1 on node testInstanceId") + public void testAssignDuplicateReplica() throws IOException { + ResourceControllerDataProvider testCache = setupClusterDataCache(); + Set<AssignableReplica> assignmentSet = generateReplicas(testCache); + + AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(), + testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId, assignmentSet); + AssignableReplica duplicateReplica = + new AssignableReplica(testCache.getResourceConfig(_resourceNames.get(0)), + _partitionNames.get(0), "SLAVE", 2); + assignableNode.assign(duplicateReplica); + } + + @Test(expectedExceptions = HelixException.class, expectedExceptionsMessageRegExp = "The domain configuration of node testInstanceId is not complete. Type DOES_NOT_EXIST is not found.") + public void testParseFaultZoneNotFound() throws IOException { + ResourceControllerDataProvider testCache = setupClusterDataCache(); + + ClusterConfig testClusterConfig = new ClusterConfig("testClusterConfigId"); + testClusterConfig.setFaultZoneType("DOES_NOT_EXIST"); + testClusterConfig.setTopologyAwareEnabled(true); + testClusterConfig.setTopology("/DOES_NOT_EXIST/"); + when(testCache.getClusterConfig()).thenReturn(testClusterConfig); + + InstanceConfig testInstanceConfig = new InstanceConfig("testInstanceConfigId"); + testInstanceConfig.setDomain("zone=2, instance=testInstance"); + Map<String, InstanceConfig> instanceConfigMap = new HashMap<>(); + instanceConfigMap.put(_testInstanceId, testInstanceConfig); + when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap); + + new AssignableNode(testCache.getClusterConfig(), + testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId, + Collections.emptyList()); + } + + @Test + public void testParseFaultZone() throws IOException { + ResourceControllerDataProvider testCache = setupClusterDataCache(); + + ClusterConfig testClusterConfig = new ClusterConfig("testClusterConfigId"); + testClusterConfig.setFaultZoneType("zone"); + testClusterConfig.setTopologyAwareEnabled(true); + testClusterConfig.setTopology("/zone/instance"); + when(testCache.getClusterConfig()).thenReturn(testClusterConfig); + + InstanceConfig testInstanceConfig = new InstanceConfig("testInstanceConfigId"); + testInstanceConfig.setDomain("zone=2, instance=testInstance"); + Map<String, InstanceConfig> instanceConfigMap = new HashMap<>(); + instanceConfigMap.put(_testInstanceId, testInstanceConfig); + when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap); + + AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(), + testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId, + Collections.emptyList()); + + Assert.assertEquals(assignableNode.getFaultZone(), "2/"); + + testClusterConfig = new ClusterConfig("testClusterConfigId"); + testClusterConfig.setFaultZoneType("instance"); + testClusterConfig.setTopologyAwareEnabled(true); + testClusterConfig.setTopology("/zone/instance"); + when(testCache.getClusterConfig()).thenReturn(testClusterConfig); + + testInstanceConfig = new InstanceConfig("testInstanceConfigId"); + testInstanceConfig.setDomain("zone=2, instance=testInstance"); + instanceConfigMap = new HashMap<>(); + instanceConfigMap.put(_testInstanceId, testInstanceConfig); + when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap); + + assignableNode = new AssignableNode(testCache.getClusterConfig(), + testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId, + Collections.emptyList()); + + Assert.assertEquals(assignableNode.getFaultZone(), "2/testInstance/"); + } +} diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableReplica.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableReplica.java new file mode 100644 index 0000000..d069ced --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableReplica.java @@ -0,0 +1,99 @@ +package org.apache.helix.controller.rebalancer.waged.model; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.model.ResourceConfig; +import org.apache.helix.model.StateModelDefinition; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public class TestAssignableReplica { + String resourceName = "Resource"; + String partitionNamePrefix = "partition"; + String masterState = "Master"; + int masterPriority = StateModelDefinition.TOP_STATE_PRIORITY; + String slaveState = "Slave"; + int slavePriority = 2; + + @Test + public void testConstructRepliaWithResourceConfig() throws IOException { + // Init assignable replica with a basic config object + Map<String, Integer> capacityDataMapResource1 = new HashMap<>(); + capacityDataMapResource1.put("item1", 3); + capacityDataMapResource1.put("item2", 6); + ResourceConfig testResourceConfigResource = new ResourceConfig(resourceName); + testResourceConfigResource.setPartitionCapacityMap( + Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMapResource1)); + + String partitionName = partitionNamePrefix + 1; + AssignableReplica replica = + new AssignableReplica(testResourceConfigResource, partitionName, masterState, + masterPriority); + Assert.assertEquals(replica.getResourceName(), resourceName); + Assert.assertEquals(replica.getPartitionName(), partitionName); + Assert.assertEquals(replica.getReplicaState(), masterState); + Assert.assertEquals(replica.getStatePriority(), masterPriority); + Assert.assertTrue(replica.isReplicaTopState()); + Assert.assertEquals(replica.getCapacity(), capacityDataMapResource1); + Assert.assertEquals(replica.getResourceInstanceGroupTag(), null); + Assert.assertEquals(replica.getResourceMaxPartitionsPerInstance(), Integer.MAX_VALUE); + + // Modify the config and initialize more replicas. + // 1. update capacity + Map<String, Integer> capacityDataMapResource2 = new HashMap<>(); + capacityDataMapResource2.put("item1", 5); + capacityDataMapResource2.put("item2", 10); + Map<String, Map<String, Integer>> capacityMap = + testResourceConfigResource.getPartitionCapacityMap(); + String partitionName2 = partitionNamePrefix + 2; + capacityMap.put(partitionName2, capacityDataMapResource2); + testResourceConfigResource.setPartitionCapacityMap(capacityMap); + // 2. update instance group tag and max partitions per instance + String group = "DEFAULT"; + int maxPartition = 10; + testResourceConfigResource.getRecord() + .setSimpleField(ResourceConfig.ResourceConfigProperty.INSTANCE_GROUP_TAG.toString(), group); + testResourceConfigResource.getRecord() + .setIntField(ResourceConfig.ResourceConfigProperty.MAX_PARTITIONS_PER_INSTANCE.name(), + maxPartition); + + replica = new AssignableReplica(testResourceConfigResource, partitionName, masterState, + masterPriority); + Assert.assertEquals(replica.getCapacity(), capacityDataMapResource1); + Assert.assertEquals(replica.getResourceInstanceGroupTag(), group); + Assert.assertEquals(replica.getResourceMaxPartitionsPerInstance(), maxPartition); + + replica = new AssignableReplica(testResourceConfigResource, partitionName2, slaveState, + slavePriority); + Assert.assertEquals(replica.getResourceName(), resourceName); + Assert.assertEquals(replica.getPartitionName(), partitionName2); + Assert.assertEquals(replica.getReplicaState(), slaveState); + Assert.assertEquals(replica.getStatePriority(), slavePriority); + Assert.assertFalse(replica.isReplicaTopState()); + Assert.assertEquals(replica.getCapacity(), capacityDataMapResource2); + Assert.assertEquals(replica.getResourceInstanceGroupTag(), group); + Assert.assertEquals(replica.getResourceMaxPartitionsPerInstance(), maxPartition); + } +} diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java new file mode 100644 index 0000000..8206f29 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java @@ -0,0 +1,90 @@ +package org.apache.helix.controller.rebalancer.waged.model; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.HelixException; +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +public class TestClusterContext extends AbstractTestClusterModel { + @BeforeClass + public void initialize() { + super.initialize(); + } + + @Test + public void testNormalUsage() throws IOException { + // Test 1 - initialize the cluster context based on the data cache. + ResourceControllerDataProvider testCache = setupClusterDataCache(); + Set<AssignableReplica> assignmentSet = generateReplicas(testCache); + + ClusterContext context = new ClusterContext(assignmentSet, 2); + + // Note that we left some margin for the max estimation. + Assert.assertEquals(context.getEstimatedMaxPartitionCount(), 3); + Assert.assertEquals(context.getEstimatedMaxTopStateCount(), 2); + Assert.assertEquals(context.getAssignmentForFaultZoneMap(), Collections.emptyMap()); + for (String resourceName : _resourceNames) { + Assert.assertEquals(context.getEstimatedMaxPartitionByResource(resourceName), 2); + Assert.assertEquals( + context.getPartitionsForResourceAndFaultZone(_testFaultZoneId, resourceName), + Collections.emptySet()); + } + + // Assign + Map<String, Map<String, Set<String>>> expectedFaultZoneMap = Collections + .singletonMap(_testFaultZoneId, assignmentSet.stream().collect(Collectors + .groupingBy(AssignableReplica::getResourceName, + Collectors.mapping(AssignableReplica::getPartitionName, Collectors.toSet())))); + + assignmentSet.stream().forEach(replica -> context + .addPartitionToFaultZone(_testFaultZoneId, replica.getResourceName(), + replica.getPartitionName())); + Assert.assertEquals(context.getAssignmentForFaultZoneMap(), expectedFaultZoneMap); + + // release + expectedFaultZoneMap.get(_testFaultZoneId).get(_resourceNames.get(0)) + .remove(_partitionNames.get(0)); + Assert.assertTrue(context.removePartitionFromFaultZone(_testFaultZoneId, _resourceNames.get(0), + _partitionNames.get(0))); + + Assert.assertEquals(context.getAssignmentForFaultZoneMap(), expectedFaultZoneMap); + } + + @Test(expectedExceptions = HelixException.class, expectedExceptionsMessageRegExp = "Resource Resource1 already has a replica from partition Partition1 in fault zone testZone") + public void testDuplicateAssign() throws IOException { + ResourceControllerDataProvider testCache = setupClusterDataCache(); + Set<AssignableReplica> assignmentSet = generateReplicas(testCache); + ClusterContext context = new ClusterContext(assignmentSet, 2); + context + .addPartitionToFaultZone(_testFaultZoneId, _resourceNames.get(0), _partitionNames.get(0)); + // Insert again and trigger the error. + context + .addPartitionToFaultZone(_testFaultZoneId, _resourceNames.get(0), _partitionNames.get(0)); + } +} diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java new file mode 100644 index 0000000..c07bd98 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java @@ -0,0 +1,114 @@ +package org.apache.helix.controller.rebalancer.waged.model; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.HelixException; +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +public class TestClusterModel extends AbstractTestClusterModel { + @BeforeClass + public void initialize() { + super.initialize(); + } + + /** + * Generate AssignableNodes according to the instances included in the cluster data cache. + */ + Set<AssignableNode> generateNodes(ResourceControllerDataProvider testCache) { + Set<AssignableNode> nodeSet = new HashSet<>(); + testCache.getInstanceConfigMap().values().stream().forEach(config -> nodeSet.add( + new AssignableNode(testCache.getClusterConfig(), + testCache.getInstanceConfigMap().get(_testInstanceId), config.getInstanceName(), + Collections.emptyList()))); + return nodeSet; + } + + @Test + public void testNormalUsage() throws IOException { + // Test 1 - initialize the cluster model based on the data cache. + ResourceControllerDataProvider testCache = setupClusterDataCache(); + Set<AssignableReplica> assignableReplicas = generateReplicas(testCache); + Set<AssignableNode> assignableNodes = generateNodes(testCache); + + ClusterContext context = new ClusterContext(assignableReplicas, 2); + ClusterModel clusterModel = + new ClusterModel(context, assignableReplicas, assignableNodes, Collections.emptyMap(), + Collections.emptyMap()); + + Assert.assertTrue(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream() + .allMatch(resourceMap -> resourceMap.values().isEmpty())); + Assert.assertFalse(clusterModel.getAssignableNodes().values().stream() + .anyMatch(node -> node.getCurrentAssignmentCount() != 0)); + + // The initialization of the context, node and replication has been tested separately. So for + // cluster model, focus on testing the assignment and release. + + // Assign + AssignableReplica replica = assignableReplicas.iterator().next(); + AssignableNode assignableNode = assignableNodes.iterator().next(); + clusterModel + .assign(replica.getResourceName(), replica.getPartitionName(), replica.getReplicaState(), + assignableNode.getInstanceName()); + + Assert.assertTrue( + clusterModel.getContext().getAssignmentForFaultZoneMap().get(assignableNode.getFaultZone()) + .get(replica.getResourceName()).contains(replica.getPartitionName())); + Assert.assertTrue(assignableNode.getCurrentAssignmentsMap().get(replica.getResourceName()) + .contains(replica.getPartitionName())); + + // Assign a nonexist replication + try { + clusterModel.assign("NOT-EXIST", replica.getPartitionName(), replica.getReplicaState(), + assignableNode.getInstanceName()); + Assert.fail("Assigning a non existing resource partition shall fail."); + } catch (HelixException ex) { + // expected + } + + // Assign a non-exist replication + try { + clusterModel + .assign(replica.getResourceName(), replica.getPartitionName(), replica.getReplicaState(), + "NON-EXIST"); + Assert.fail("Assigning a resource partition to a non existing instance shall fail."); + } catch (HelixException ex) { + // expected + } + + // Release + clusterModel + .release(replica.getResourceName(), replica.getPartitionName(), replica.getReplicaState(), + assignableNode.getInstanceName()); + + Assert.assertTrue(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream() + .allMatch(resourceMap -> resourceMap.values().stream() + .allMatch(partitions -> partitions.isEmpty()))); + Assert.assertFalse(clusterModel.getAssignableNodes().values().stream() + .anyMatch(node -> node.getCurrentAssignmentCount() != 0)); + } +}
