This is an automated email from the ASF dual-hosted git repository. jiajunwang pushed a commit to branch wagedRebalancer in repository https://gitbox.apache.org/repos/asf/helix.git
commit ba155521d2539d70ec572834207ec852a08b2812 Author: Jiajun Wang <[email protected]> AuthorDate: Mon Sep 9 10:35:59 2019 -0700 Implement the WAGED rebalancer with the limited functionality. (#443) The implemented rebalancer supports basic rebalance logic. It does not contain the logic to support delayed rebalance and user-defined preference list. Added unit test to cover the main workflow of the WAGED rebalancer. --- .../org/apache/helix/HelixRebalanceException.java | 4 +- .../rebalancer/waged/WagedRebalancer.java | 313 ++++++++++++++-- .../rebalancer/waged/model/AssignableNode.java | 7 +- .../waged/MockAssignmentMetadataStore.java | 55 +++ .../rebalancer/waged/TestWagedRebalancer.java | 415 +++++++++++++++++++++ .../waged/constraints/MockRebalanceAlgorithm.java | 84 +++++ .../waged/model/AbstractTestClusterModel.java | 4 +- 7 files changed, 850 insertions(+), 32 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java b/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java index d01fc60..a8b5055 100644 --- a/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java +++ b/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java @@ -33,12 +33,12 @@ public class HelixRebalanceException extends Exception { private final Type _type; public HelixRebalanceException(String message, Type type, Throwable cause) { - super(String.format("%s. Failure Type: %s", message, type.name()), cause); + super(String.format("%s Failure Type: %s", message, type.name()), cause); _type = type; } public HelixRebalanceException(String message, Type type) { - super(String.format("%s. Failure Type: %s", message, type.name())); + super(String.format("%s Failure Type: %s", message, type.name())); _type = type; } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java index 5b9634e..866c7c9 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java @@ -19,9 +19,18 @@ package org.apache.helix.controller.rebalancer.waged; * under the License. */ +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import com.google.common.annotations.VisibleForTesting; +import org.apache.helix.HelixConstants; import org.apache.helix.HelixManager; import org.apache.helix.HelixRebalanceException; import org.apache.helix.controller.changedetector.ResourceChangeDetector; @@ -29,14 +38,18 @@ import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer; import org.apache.helix.controller.rebalancer.internal.MappingCalculator; import org.apache.helix.controller.rebalancer.waged.constraints.ConstraintBasedAlgorithmFactory; +import org.apache.helix.controller.rebalancer.waged.model.ClusterModel; +import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider; +import org.apache.helix.controller.rebalancer.waged.model.OptimalAssignment; import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.model.IdealState; +import org.apache.helix.model.Partition; import org.apache.helix.model.Resource; +import org.apache.helix.model.ResourceAssignment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * A placeholder before we have the implementation. * Weight-Aware Globally-Even Distribute Rebalancer. * * @see <a href="https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer"> @@ -46,50 +59,296 @@ import org.slf4j.LoggerFactory; public class WagedRebalancer { private static final Logger LOG = LoggerFactory.getLogger(WagedRebalancer.class); + // When any of the following change happens, the rebalancer needs to do a global rebalance which + // contains 1. baseline recalculate, 2. partial rebalance that is based on the new baseline. + private static final Set<HelixConstants.ChangeType> GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES = + Collections.unmodifiableSet(new HashSet<>(Arrays + .asList(HelixConstants.ChangeType.RESOURCE_CONFIG, + HelixConstants.ChangeType.CLUSTER_CONFIG, + HelixConstants.ChangeType.INSTANCE_CONFIG))); + // The cluster change detector is a stateful object. + // Make it static to avoid unnecessary reinitialization. + private static final ThreadLocal<ResourceChangeDetector> CHANGE_DETECTOR_THREAD_LOCAL = + new ThreadLocal<>(); + private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator; + // --------- The following fields are placeholders and need replacement. -----------// // TODO Shall we make the metadata store a static threadlocal object as well to avoid reinitialization? private final AssignmentMetadataStore _assignmentMetadataStore; private final RebalanceAlgorithm _rebalanceAlgorithm; // ------------------------------------------------------------------------------------// - // The cluster change detector is a stateful object. Make it static to avoid unnecessary - // reinitialization. - private static final ThreadLocal<ResourceChangeDetector> CHANGE_DETECTOR_THREAD_LOCAL = - new ThreadLocal<>(); - private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator; + public WagedRebalancer(HelixManager helixManager) { + this( + // TODO init the metadata store according to their requirement when integrate, + // or change to final static method if possible. + new AssignmentMetadataStore(), + // TODO parse the cluster setting + ConstraintBasedAlgorithmFactory.getInstance(), + // Use DelayedAutoRebalancer as the mapping calculator for the final assignment output. + // Mapping calculator will translate the best possible assignment into the applicable state + // mapping based on the current states. + // TODO abstract and separate the main mapping calculator logic from DelayedAutoRebalancer + new DelayedAutoRebalancer()); + } - private ResourceChangeDetector getChangeDetector() { - if (CHANGE_DETECTOR_THREAD_LOCAL.get() == null) { - CHANGE_DETECTOR_THREAD_LOCAL.set(new ResourceChangeDetector()); - } - return CHANGE_DETECTOR_THREAD_LOCAL.get(); + private WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore, + RebalanceAlgorithm algorithm, MappingCalculator mappingCalculator) { + _assignmentMetadataStore = assignmentMetadataStore; + _rebalanceAlgorithm = algorithm; + _mappingCalculator = mappingCalculator; } - public WagedRebalancer(HelixManager helixManager) { - // TODO init the metadata store according to their requirement when integrate, or change to final static method if possible. - _assignmentMetadataStore = new AssignmentMetadataStore(); - // TODO parse the cluster setting - _rebalanceAlgorithm = ConstraintBasedAlgorithmFactory.getInstance(); - - // Use the mapping calculator in DelayedAutoRebalancer for calculating the final assignment - // output. - // This calculator will translate the best possible assignment into an applicable state mapping - // based on the current states. - // TODO abstract and separate the mapping calculator logic from the DelayedAutoRebalancer - _mappingCalculator = new DelayedAutoRebalancer(); + @VisibleForTesting + protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore, + RebalanceAlgorithm algorithm) { + this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer()); } /** - * Compute the new IdealStates for all the resources input. The IdealStates include both the new + * Compute the new IdealStates for all the input resources. The IdealStates include both new * partition assignment (in the listFiles) and the new replica state mapping (in the mapFields). + * * @param clusterData The Cluster status data provider. * @param resourceMap A map containing all the rebalancing resources. - * @param currentStateOutput The present Current State of the cluster. - * @return A map containing the computed new IdealStates. + * @param currentStateOutput The present Current States of the resources. + * @return A map of the new IdealStates with the resource name as key. */ public Map<String, IdealState> computeNewIdealStates(ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap, final CurrentStateOutput currentStateOutput) throws HelixRebalanceException { - return new HashMap<>(); + LOG.info("Start computing new ideal states for resources: {}", resourceMap.keySet().toString()); + + // Find the compatible resources: 1. FULL_AUTO 2. Configured to use the WAGED rebalancer + resourceMap = resourceMap.entrySet().stream().filter(resourceEntry -> { + IdealState is = clusterData.getIdealState(resourceEntry.getKey()); + return is != null && is.getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO) + && getClass().getName().equals(is.getRebalancerClassName()); + }).collect(Collectors + .toMap(resourceEntry -> resourceEntry.getKey(), resourceEntry -> resourceEntry.getValue())); + + if (resourceMap.isEmpty()) { + LOG.warn("There is no valid resource to be rebalanced by {}", + this.getClass().getSimpleName()); + return Collections.emptyMap(); + } else { + LOG.info("Valid resources that will be rebalanced by {}: {}", this.getClass().getSimpleName(), + resourceMap.keySet().toString()); + } + + // Calculate the target assignment based on the current cluster status. + Map<String, IdealState> newIdealStates = computeBestPossibleStates(clusterData, resourceMap); + + // Construct the new best possible states according to the current state and target assignment. + // Note that the new ideal state might be an intermediate state between the current state and the target assignment. + for (IdealState is : newIdealStates.values()) { + String resourceName = is.getResourceName(); + // Adjust the states according to the current state. + ResourceAssignment finalAssignment = _mappingCalculator + .computeBestPossiblePartitionState(clusterData, is, resourceMap.get(resourceName), + currentStateOutput); + + // Clean up the state mapping fields. Use the final assignment that is calculated by the + // mapping calculator to replace them. + is.getRecord().getMapFields().clear(); + for (Partition partition : finalAssignment.getMappedPartitions()) { + Map<String, String> newStateMap = finalAssignment.getReplicaMap(partition); + // if the final states cannot be generated, override the best possible state with empty map. + is.setInstanceStateMap(partition.getPartitionName(), + newStateMap == null ? Collections.emptyMap() : newStateMap); + } + } + + LOG.info("Finish computing new ideal states for resources: {}", + resourceMap.keySet().toString()); + return newIdealStates; + } + + // Coordinate baseline recalculation and partial rebalance according to the cluster changes. + private Map<String, IdealState> computeBestPossibleStates( + ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap) + throws HelixRebalanceException { + getChangeDetector().updateSnapshots(clusterData); + // Get all the modified and new items' information + Map<HelixConstants.ChangeType, Set<String>> clusterChanges = + getChangeDetector().getChangeTypes().stream() + .collect(Collectors.toMap(changeType -> changeType, changeType -> { + Set<String> itemKeys = new HashSet<>(); + itemKeys.addAll(getChangeDetector().getAdditionsByType(changeType)); + itemKeys.addAll(getChangeDetector().getChangesByType(changeType)); + return itemKeys; + })); + + if (clusterChanges.keySet().stream() + .anyMatch(changeType -> GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES.contains(changeType))) { + refreshBaseline(clusterData, clusterChanges, resourceMap); + // Inject a cluster config change for large scale partial rebalance once the baseline changed. + clusterChanges.putIfAbsent(HelixConstants.ChangeType.CLUSTER_CONFIG, Collections.emptySet()); + } + + Map<String, ResourceAssignment> newAssignment = + partialRebalance(clusterData, clusterChanges, resourceMap); + + // Convert the assignments into IdealState for the following state mapping calculation. + Map<String, IdealState> finalIdealState = new HashMap<>(); + for (String resourceName : newAssignment.keySet()) { + IdealState newIdeaState; + try { + IdealState currentIdealState = clusterData.getIdealState(resourceName); + Map<String, Integer> statePriorityMap = + clusterData.getStateModelDef(currentIdealState.getStateModelDefRef()) + .getStatePriorityMap(); + // Create a new IdealState instance contains the new calculated assignment in the preference list. + newIdeaState = generateIdealStateWithAssignment(resourceName, currentIdealState, + newAssignment.get(resourceName), statePriorityMap); + } catch (Exception ex) { + throw new HelixRebalanceException( + "Fail to calculate the new IdealState for resource: " + resourceName, + HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex); + } + finalIdealState.put(resourceName, newIdeaState); + } + return finalIdealState; + } + + // TODO make the Baseline calculation async if complicated algorithm is used for the Baseline + private void refreshBaseline(ResourceControllerDataProvider clusterData, + Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String, Resource> resourceMap) + throws HelixRebalanceException { + // For baseline calculation + // 1. Ignore node status (disable/offline). + // 2. Use the baseline as the previous best possible assignment since there is no "baseline" for + // the baseline. + LOG.info("Start calculating the new baseline."); + Map<String, ResourceAssignment> currentBaseline; + try { + currentBaseline = _assignmentMetadataStore.getBaseline(); + } catch (Exception ex) { + throw new HelixRebalanceException("Failed to get the current baseline assignment.", + HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex); + } + Map<String, ResourceAssignment> baseline = + calculateAssignment(clusterData, clusterChanges, resourceMap, clusterData.getAllInstances(), + Collections.emptyMap(), currentBaseline); + try { + _assignmentMetadataStore.persistBaseline(baseline); + } catch (Exception ex) { + throw new HelixRebalanceException("Failed to persist the new baseline assignment.", + HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex); + } + LOG.info("Finish calculating the new baseline."); + } + + private Map<String, ResourceAssignment> partialRebalance( + ResourceControllerDataProvider clusterData, + Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String, Resource> resourceMap) + throws HelixRebalanceException { + LOG.info("Start calculating the new best possible assignment."); + Set<String> activeInstances = clusterData.getEnabledLiveInstances(); + Map<String, ResourceAssignment> baseline; + Map<String, ResourceAssignment> prevBestPossibleAssignment; + try { + baseline = _assignmentMetadataStore.getBaseline(); + prevBestPossibleAssignment = _assignmentMetadataStore.getBestPossibleAssignment(); + } catch (Exception ex) { + throw new HelixRebalanceException("Failed to get the persisted assignment records.", + HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex); + } + Map<String, ResourceAssignment> newAssignment = + calculateAssignment(clusterData, clusterChanges, resourceMap, activeInstances, baseline, + prevBestPossibleAssignment); + try { + // TODO Test to confirm if persisting the final assignment (with final partition states) + // would be a better option. + _assignmentMetadataStore.persistBestPossibleAssignment(newAssignment); + } catch (Exception ex) { + throw new HelixRebalanceException("Failed to persist the new best possible assignment.", + HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex); + } + LOG.info("Finish calculating the new best possible assignment."); + return newAssignment; + } + + /** + * Generate the cluster model based on the input and calculate the optimal assignment. + * + * @param clusterData the cluster data cache. + * @param clusterChanges the detected cluster changes. + * @param resourceMap the rebalancing resources. + * @param activeNodes the alive and enabled nodes. + * @param baseline the baseline assignment for the algorithm as a reference. + * @param prevBestPossibleAssignment the previous best possible assignment for the algorithm as a reference. + * @return the new optimal assignment for the resources. + */ + private Map<String, ResourceAssignment> calculateAssignment( + ResourceControllerDataProvider clusterData, + Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String, Resource> resourceMap, + Set<String> activeNodes, Map<String, ResourceAssignment> baseline, + Map<String, ResourceAssignment> prevBestPossibleAssignment) throws HelixRebalanceException { + long startTime = System.currentTimeMillis(); + LOG.info("Start calculating for an assignment"); + ClusterModel clusterModel; + try { + clusterModel = ClusterModelProvider + .generateClusterModel(clusterData, resourceMap, activeNodes, clusterChanges, baseline, + prevBestPossibleAssignment); + } catch (Exception ex) { + throw new HelixRebalanceException("Failed to generate cluster model.", + HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex); + } + + OptimalAssignment optimalAssignment = _rebalanceAlgorithm.calculate(clusterModel); + Map<String, ResourceAssignment> newAssignment = + optimalAssignment.getOptimalResourceAssignment(); + + LOG.info("Finish calculating. Time spent: {}ms.", System.currentTimeMillis() - startTime); + return newAssignment; + } + + private ResourceChangeDetector getChangeDetector() { + if (CHANGE_DETECTOR_THREAD_LOCAL.get() == null) { + CHANGE_DETECTOR_THREAD_LOCAL.set(new ResourceChangeDetector()); + } + return CHANGE_DETECTOR_THREAD_LOCAL.get(); + } + + // Generate a new IdealState based on the input newAssignment. + // The assignment will be propagate to the preference lists. + // Note that we will recalculate the states based on the current state, so there is no need to + // update the mapping fields in the IdealState output. + private IdealState generateIdealStateWithAssignment(String resourceName, + IdealState currentIdealState, ResourceAssignment newAssignment, + Map<String, Integer> statePriorityMap) { + IdealState newIdealState = new IdealState(resourceName); + // Copy the simple fields + newIdealState.getRecord().setSimpleFields(currentIdealState.getRecord().getSimpleFields()); + // Sort the preference list according to state priority. + newIdealState.setPreferenceLists(getPreferenceLists(newAssignment, statePriorityMap)); + // Note the state mapping in the new assignment won't be directly propagate to the map fields. + // The rebalancer will calculate for the final state mapping considering the current states. + return newIdealState; + } + + // Generate the preference lists from the state mapping based on state priority. + private Map<String, List<String>> getPreferenceLists(ResourceAssignment newAssignment, + Map<String, Integer> statePriorityMap) { + Map<String, List<String>> preferenceList = new HashMap<>(); + for (Partition partition : newAssignment.getMappedPartitions()) { + List<String> nodes = new ArrayList<>(newAssignment.getReplicaMap(partition).keySet()); + // To ensure backward compatibility, sort the preference list according to state priority. + nodes.sort((node1, node2) -> { + int statePriority1 = + statePriorityMap.get(newAssignment.getReplicaMap(partition).get(node1)); + int statePriority2 = + statePriorityMap.get(newAssignment.getReplicaMap(partition).get(node2)); + if (statePriority1 == statePriority2) { + return node1.compareTo(node2); + } else { + return statePriority1 - statePriority2; + } + }); + preferenceList.put(partition.getPartitionName(), nodes); + } + return preferenceList; } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java index 33677e5..4141d20 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java @@ -41,7 +41,7 @@ import static java.lang.Math.max; * This class represents a possible allocation of the replication. * Note that any usage updates to the AssignableNode are not thread safe. */ -public class AssignableNode { +public class AssignableNode implements Comparable<AssignableNode> { private static final Logger LOG = LoggerFactory.getLogger(AssignableNode.class.getName()); // basic node information @@ -384,4 +384,9 @@ public class AssignableNode { public int hashCode() { return _instanceName.hashCode(); } + + @Override + public int compareTo(AssignableNode o) { + return _instanceName.compareTo(o.getInstanceName()); + } } diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java new file mode 100644 index 0000000..ea8c164 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java @@ -0,0 +1,55 @@ +package org.apache.helix.controller.rebalancer.waged; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.model.ResourceAssignment; + +import java.util.HashMap; +import java.util.Map; + +/** + * A mock up metadata store for unit test. + * This mock datastore persist assignments in memory only. + */ +public class MockAssignmentMetadataStore extends AssignmentMetadataStore { + private Map<String, ResourceAssignment> _persistGlobalBaseline = new HashMap<>(); + private Map<String, ResourceAssignment> _persistBestPossibleAssignment = new HashMap<>(); + + public Map<String, ResourceAssignment> getBaseline() { + return _persistGlobalBaseline; + } + + public void persistBaseline(Map<String, ResourceAssignment> globalBaseline) { + _persistGlobalBaseline = globalBaseline; + } + + public Map<String, ResourceAssignment> getBestPossibleAssignment() { + return _persistBestPossibleAssignment; + } + + public void persistBestPossibleAssignment(Map<String, ResourceAssignment> bestPossibleAssignment) { + _persistBestPossibleAssignment = bestPossibleAssignment; + } + + public void clearMetadataStore() { + _persistBestPossibleAssignment.clear(); + _persistGlobalBaseline.clear(); + } +} diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java new file mode 100644 index 0000000..6759a10 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java @@ -0,0 +1,415 @@ +package org.apache.helix.controller.rebalancer.waged; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.helix.HelixConstants; +import org.apache.helix.HelixRebalanceException; +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy; +import org.apache.helix.controller.rebalancer.waged.constraints.MockRebalanceAlgorithm; +import org.apache.helix.controller.rebalancer.waged.model.AbstractTestClusterModel; +import org.apache.helix.controller.stages.CurrentStateOutput; +import org.apache.helix.model.CurrentState; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.Partition; +import org.apache.helix.model.Resource; +import org.apache.helix.model.ResourceAssignment; +import org.mockito.Mockito; +import org.mockito.stubbing.Answer; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.when; + +public class TestWagedRebalancer extends AbstractTestClusterModel { + private Set<String> _instances; + private MockRebalanceAlgorithm _algorithm; + + @BeforeClass + public void initialize() { + super.initialize(); + _instances = new HashSet<>(); + _instances.add(_testInstanceId); + _algorithm = new MockRebalanceAlgorithm(); + } + + @Override + protected ResourceControllerDataProvider setupClusterDataCache() throws IOException { + ResourceControllerDataProvider testCache = super.setupClusterDataCache(); + + // Set up mock idealstate + Map<String, IdealState> isMap = new HashMap<>(); + for (String resource : _resourceNames) { + IdealState is = new IdealState(resource); + is.setNumPartitions(_partitionNames.size()); + is.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO); + is.setStateModelDefRef("MasterSlave"); + is.setReplicas("3"); + is.setRebalancerClassName(WagedRebalancer.class.getName()); + _partitionNames.stream() + .forEach(partition -> is.setPreferenceList(partition, Collections.emptyList())); + isMap.put(resource, is); + } + when(testCache.getIdealState(anyString())).thenAnswer( + (Answer<IdealState>) invocationOnMock -> isMap.get(invocationOnMock.getArguments()[0])); + when(testCache.getIdealStates()).thenReturn(isMap); + + // Set up 2 more instances + for (int i = 1; i < 3; i++) { + String instanceName = _testInstanceId + i; + _instances.add(instanceName); + // 1. Set up the default instance information with capacity configuration. + InstanceConfig testInstanceConfig = createMockInstanceConfig(instanceName); + Map<String, InstanceConfig> instanceConfigMap = testCache.getInstanceConfigMap(); + instanceConfigMap.put(instanceName, testInstanceConfig); + when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap); + // 2. Mock the live instance node for the default instance. + LiveInstance testLiveInstance = createMockLiveInstance(instanceName); + Map<String, LiveInstance> liveInstanceMap = testCache.getLiveInstances(); + liveInstanceMap.put(instanceName, testLiveInstance); + when(testCache.getLiveInstances()).thenReturn(liveInstanceMap); + when(testCache.getEnabledInstances()).thenReturn(liveInstanceMap.keySet()); + when(testCache.getEnabledLiveInstances()).thenReturn(liveInstanceMap.keySet()); + } + + return testCache; + } + + @Test + public void testRebalance() throws IOException, HelixRebalanceException { + // Init mock metadatastore for the unit test + MockAssignmentMetadataStore metadataStore = new MockAssignmentMetadataStore(); + WagedRebalancer rebalancer = new WagedRebalancer(metadataStore, _algorithm); + + // Generate the input for the rebalancer. + ResourceControllerDataProvider clusterData = setupClusterDataCache(); + Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream() + .collect(Collectors.toMap(entry -> entry.getKey(), entry -> { + Resource resource = new Resource(entry.getKey()); + entry.getValue().getPartitionSet().stream() + .forEach(partition -> resource.addPartition(partition)); + return resource; + })); + Map<String, IdealState> newIdealStates = + rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput()); + Map<String, ResourceAssignment> algorithmResult = _algorithm.getRebalanceResult(); + // Since there is no special condition, the calculated IdealStates should be exactly the same + // as the mock algorithm result. + validateRebalanceResult(resourceMap, newIdealStates, algorithmResult); + } + + @Test(dependsOnMethods = "testRebalance") + public void testPartialRebalance() throws IOException, HelixRebalanceException { + // Init mock metadatastore for the unit test + MockAssignmentMetadataStore metadataStore = new MockAssignmentMetadataStore(); + WagedRebalancer rebalancer = new WagedRebalancer(metadataStore, _algorithm); + + // Generate the input for the rebalancer. + ResourceControllerDataProvider clusterData = setupClusterDataCache(); + Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream() + .collect(Collectors.toMap(entry -> entry.getKey(), entry -> { + Resource resource = new Resource(entry.getKey()); + entry.getValue().getPartitionSet().stream() + .forEach(partition -> resource.addPartition(partition)); + return resource; + })); + + // Test with partial resources listed in the resourceMap input. + // Remove the first resource from the input. Note it still exists in the cluster data cache. + metadataStore.clearMetadataStore(); + resourceMap.remove(_resourceNames.get(0)); + Map<String, IdealState> newIdealStates = + rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput()); + Map<String, ResourceAssignment> algorithmResult = _algorithm.getRebalanceResult(); + validateRebalanceResult(resourceMap, newIdealStates, algorithmResult); + } + + @Test(dependsOnMethods = "testRebalance") + public void testRebalanceWithCurrentState() throws IOException, HelixRebalanceException { + // Init mock metadatastore for the unit test + MockAssignmentMetadataStore metadataStore = new MockAssignmentMetadataStore(); + WagedRebalancer rebalancer = new WagedRebalancer(metadataStore, _algorithm); + + // Generate the input for the rebalancer. + ResourceControllerDataProvider clusterData = setupClusterDataCache(); + Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream() + .collect(Collectors.toMap(entry -> entry.getKey(), entry -> { + Resource resource = new Resource(entry.getKey()); + entry.getValue().getPartitionSet().stream() + .forEach(partition -> resource.addPartition(partition)); + return resource; + })); + + // Test with current state exists, so the rebalancer should calculate for the intermediate state + // Create current state based on the cluster data cache. + CurrentStateOutput currentStateOutput = new CurrentStateOutput(); + for (String instanceName : _instances) { + for (Map.Entry<String, CurrentState> csEntry : clusterData + .getCurrentState(instanceName, _sessionId).entrySet()) { + String resourceName = csEntry.getKey(); + CurrentState cs = csEntry.getValue(); + for (Map.Entry<String, String> partitionStateEntry : cs.getPartitionStateMap().entrySet()) { + currentStateOutput + .setCurrentState(resourceName, new Partition(partitionStateEntry.getKey()), + instanceName, partitionStateEntry.getValue()); + } + } + } + + // The state calculation will be adjusted based on the current state. + // So test the following cases: + // 1.1. Disable a resource, and the partitions in CS will be offline. + String disabledResourceName = _resourceNames.get(0); + clusterData.getIdealState(disabledResourceName).enable(false); + // 1.2. Adding more unknown partitions to the CS, so they will be dropped. + String droppingResourceName = _resourceNames.get(1); + String droppingPartitionName = "UnknownPartition"; + String droppingFromInstance = _testInstanceId; + currentStateOutput.setCurrentState(droppingResourceName, new Partition(droppingPartitionName), + droppingFromInstance, "SLAVE"); + resourceMap.get(droppingResourceName).addPartition(droppingPartitionName); + + Map<String, IdealState> newIdealStates = + rebalancer.computeNewIdealStates(clusterData, resourceMap, currentStateOutput); + // All the replica state should be OFFLINE + IdealState disabledIdealState = newIdealStates.get(disabledResourceName); + for (String partition : disabledIdealState.getPartitionSet()) { + Assert.assertTrue(disabledIdealState.getInstanceStateMap(partition).values().stream() + .allMatch(state -> state.equals("OFFLINE"))); + } + // the dropped partition should be dropped. + IdealState droppedIdealState = newIdealStates.get(droppingResourceName); + Assert.assertEquals( + droppedIdealState.getInstanceStateMap(droppingPartitionName).get(droppingFromInstance), + "DROPPED"); + } + + @Test(dependsOnMethods = "testRebalance") + public void testNonCompatibleConfiguration() throws IOException, HelixRebalanceException { + WagedRebalancer rebalancer = new WagedRebalancer(new MockAssignmentMetadataStore(), _algorithm); + + ResourceControllerDataProvider clusterData = setupClusterDataCache(); + String nonCompatibleResourceName = _resourceNames.get(0); + clusterData.getIdealState(nonCompatibleResourceName) + .setRebalancerClassName(CrushRebalanceStrategy.class.getName()); + // The input resource Map shall contain all the valid resources. + Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream() + .collect(Collectors.toMap(entry -> entry.getKey(), entry -> { + Resource resource = new Resource(entry.getKey()); + entry.getValue().getPartitionSet().stream() + .forEach(partition -> resource.addPartition(partition)); + return resource; + })); + Map<String, IdealState> newIdealStates = + rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput()); + Map<String, ResourceAssignment> algorithmResult = _algorithm.getRebalanceResult(); + // The output shall not contains the nonCompatibleResource. + resourceMap.remove(nonCompatibleResourceName); + validateRebalanceResult(resourceMap, newIdealStates, algorithmResult); + } + + // TODO test with invalid capacity configuration which will fail the cluster model constructing. + @Test(dependsOnMethods = "testRebalance") + public void testInvalidClusterStatus() throws IOException { + WagedRebalancer rebalancer = new WagedRebalancer(new MockAssignmentMetadataStore(), _algorithm); + + ResourceControllerDataProvider clusterData = setupClusterDataCache(); + String invalidResource = _resourceNames.get(0); + // The state model does not exist + clusterData.getIdealState(invalidResource).setStateModelDefRef("foobar"); + // The input resource Map shall contain all the valid resources. + Map<String, Resource> resourceMap = clusterData.getIdealStates().keySet().stream().collect( + Collectors.toMap(resourceName -> resourceName, resourceName -> new Resource(resourceName))); + try { + rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput()); + Assert.fail("Rebalance shall fail."); + } catch (HelixRebalanceException ex) { + Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.INVALID_CLUSTER_STATUS); + Assert.assertEquals(ex.getMessage(), + "Failed to generate cluster model. Failure Type: INVALID_CLUSTER_STATUS"); + } + } + + @Test(dependsOnMethods = "testRebalance") + public void testInvalidRebalancerStatus() throws IOException { + // Mock a metadata store that will fail on all the calls. + AssignmentMetadataStore metadataStore = Mockito.mock(AssignmentMetadataStore.class); + when(metadataStore.getBaseline()) + .thenThrow(new RuntimeException("Mock Error. Metadata store fails.")); + WagedRebalancer rebalancer = new WagedRebalancer(metadataStore, _algorithm); + + ResourceControllerDataProvider clusterData = setupClusterDataCache(); + // The input resource Map shall contain all the valid resources. + Map<String, Resource> resourceMap = clusterData.getIdealStates().keySet().stream().collect( + Collectors.toMap(resourceName -> resourceName, resourceName -> new Resource(resourceName))); + try { + rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput()); + Assert.fail("Rebalance shall fail."); + } catch (HelixRebalanceException ex) { + Assert.assertEquals(ex.getFailureType(), + HelixRebalanceException.Type.INVALID_REBALANCER_STATUS); + Assert.assertEquals(ex.getMessage(), + "Failed to get the persisted assignment records. Failure Type: INVALID_REBALANCER_STATUS"); + } + } + + @Test(dependsOnMethods = "testRebalance") + public void testAlgorithmExepction() throws IOException, HelixRebalanceException { + RebalanceAlgorithm badAlgorithm = Mockito.mock(RebalanceAlgorithm.class); + when(badAlgorithm.calculate(any())).thenThrow(new HelixRebalanceException("Algorithm fails.", + HelixRebalanceException.Type.FAILED_TO_CALCULATE)); + + WagedRebalancer rebalancer = + new WagedRebalancer(new MockAssignmentMetadataStore(), badAlgorithm); + + ResourceControllerDataProvider clusterData = setupClusterDataCache(); + Map<String, Resource> resourceMap = clusterData.getIdealStates().keySet().stream().collect( + Collectors.toMap(resourceName -> resourceName, resourceName -> new Resource(resourceName))); + try { + rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput()); + Assert.fail("Rebalance shall fail."); + } catch (HelixRebalanceException ex) { + Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.FAILED_TO_CALCULATE); + Assert.assertEquals(ex.getMessage(), "Algorithm fails. Failure Type: FAILED_TO_CALCULATE"); + } + } + + @Test(dependsOnMethods = "testRebalance") + public void testRebalanceOnChanges() throws IOException, HelixRebalanceException { + // Test continuously rebalance with the same rebalancer with different internal state. Ensure + // that the rebalancer handles different input (different cluster changes) based on the internal + // state in a correct way. + + // Note that this test relies on the MockRebalanceAlgorithm implementation. The mock algorithm + // won't propagate any existing assignment from the cluster model. + + // Init mock metadatastore for the unit test + MockAssignmentMetadataStore metadataStore = new MockAssignmentMetadataStore(); + WagedRebalancer rebalancer = new WagedRebalancer(metadataStore, _algorithm); + + // 1. rebalance with baseline calculation done + // Generate the input for the rebalancer. + ResourceControllerDataProvider clusterData = setupClusterDataCache(); + // Cluster config change will trigger baseline to be recalculated. + when(clusterData.getRefreshedChangeTypes()) + .thenReturn(Collections.singleton(HelixConstants.ChangeType.CLUSTER_CONFIG)); + Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream() + .collect(Collectors.toMap(entry -> entry.getKey(), entry -> { + Resource resource = new Resource(entry.getKey()); + entry.getValue().getPartitionSet().stream() + .forEach(partition -> resource.addPartition(partition)); + return resource; + })); + Map<String, IdealState> newIdealStates = + rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput()); + Map<String, ResourceAssignment> algorithmResult = _algorithm.getRebalanceResult(); + // Since there is no special condition, the calculated IdealStates should be exactly the same + // as the mock algorithm result. + validateRebalanceResult(resourceMap, newIdealStates, algorithmResult); + Map<String, ResourceAssignment> baseline = metadataStore.getBaseline(); + Assert.assertEquals(baseline, algorithmResult); + Map<String, ResourceAssignment> bestPossibleAssignment = + metadataStore.getBestPossibleAssignment(); + Assert.assertEquals(bestPossibleAssignment, algorithmResult); + + // 2. rebalance with one ideal state changed only + String changedResourceName = _resourceNames.get(0); + // Create a new cluster data cache to simulate cluster change + clusterData = setupClusterDataCache(); + when(clusterData.getRefreshedChangeTypes()) + .thenReturn(Collections.singleton(HelixConstants.ChangeType.IDEAL_STATE)); + IdealState is = clusterData.getIdealState(changedResourceName); + // Update the tag so the ideal state will be marked as changed. + is.setInstanceGroupTag("newTag"); + + // Although the input contains 2 resources, the rebalancer shall only call the algorithm to + // rebalance the changed one. + newIdealStates = + rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput()); + Map<String, ResourceAssignment> partialAlgorithmResult = _algorithm.getRebalanceResult(); + + // Verify that only the changed resource has been included in the calculation. + validateRebalanceResult( + Collections.singletonMap(changedResourceName, new Resource(changedResourceName)), + newIdealStates, partialAlgorithmResult); + // Baseline should be empty, because there is no cluster topology change. + baseline = metadataStore.getBaseline(); + Assert.assertEquals(baseline, Collections.emptyMap()); + // Best possible assignment contains the new assignment of only one resource. + bestPossibleAssignment = metadataStore.getBestPossibleAssignment(); + Assert.assertEquals(bestPossibleAssignment, partialAlgorithmResult); + + // * Before the next test, recover the best possible assignment record. + metadataStore.persistBestPossibleAssignment(algorithmResult); + + // 3. rebalance with current state change only + // Create a new cluster data cache to simulate cluster change + clusterData = setupClusterDataCache(); + when(clusterData.getRefreshedChangeTypes()) + .thenReturn(Collections.singleton(HelixConstants.ChangeType.CURRENT_STATE)); + // Modify any current state + CurrentState cs = + clusterData.getCurrentState(_testInstanceId, _sessionId).get(_resourceNames.get(0)); + // Update the tag so the ideal state will be marked as changed. + cs.setInfo(_partitionNames.get(0), "mock update"); + + // Although the input contains 2 resources, the rebalancer shall not try to recalculate + // assignment since there is only current state change. + newIdealStates = + rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput()); + algorithmResult = _algorithm.getRebalanceResult(); + + // Verify that only the changed resource has been included in the calculation. + validateRebalanceResult(Collections.emptyMap(), newIdealStates, algorithmResult); + // Both assignment state should be empty. + baseline = metadataStore.getBaseline(); + Assert.assertEquals(baseline, Collections.emptyMap()); + bestPossibleAssignment = metadataStore.getBestPossibleAssignment(); + Assert.assertEquals(bestPossibleAssignment, Collections.emptyMap()); + + // 4. rebalance with no change but best possible state record missing. + // This usually happens when the persisted assignment state is gone. + clusterData = setupClusterDataCache(); // Note this mock data cache won't report any change. + // Even with no change, since the previous assignment is empty, the rebalancer will still + // calculate the assignment for both resources. + newIdealStates = + rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput()); + algorithmResult = _algorithm.getRebalanceResult(); + // Verify that both resource has been included in the calculation. + validateRebalanceResult(resourceMap, newIdealStates, algorithmResult); + // Both assignment state should be empty since no cluster topology change. + baseline = metadataStore.getBaseline(); + Assert.assertEquals(baseline, Collections.emptyMap()); + // The best possible assignment should be present. + bestPossibleAssignment = metadataStore.getBestPossibleAssignment(); + Assert.assertEquals(bestPossibleAssignment, algorithmResult); + } + + private void validateRebalanceResult(Map<String, Resource> resourceMap, + Map<String, IdealState> newIdealStates, Map<String, ResourceAssignment> expectedResult) { + Assert.assertEquals(newIdealStates.keySet(), resourceMap.keySet()); + for (String resourceName : expectedResult.keySet()) { + Assert.assertTrue(newIdealStates.containsKey(resourceName)); + IdealState is = newIdealStates.get(resourceName); + ResourceAssignment assignment = expectedResult.get(resourceName); + Assert.assertEquals(is.getPartitionSet(), new HashSet<>( + assignment.getMappedPartitions().stream().map(partition -> partition.getPartitionName()) + .collect(Collectors.toSet()))); + for (String partitionName : is.getPartitionSet()) { + Assert.assertEquals(is.getInstanceStateMap(partitionName), + assignment.getReplicaMap(new Partition(partitionName))); + } + } + } +} diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/MockRebalanceAlgorithm.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/MockRebalanceAlgorithm.java new file mode 100644 index 0000000..2a39482 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/MockRebalanceAlgorithm.java @@ -0,0 +1,84 @@ +package org.apache.helix.controller.rebalancer.waged.constraints; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm; +import org.apache.helix.controller.rebalancer.waged.model.AssignableNode; +import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica; +import org.apache.helix.controller.rebalancer.waged.model.ClusterModel; +import org.apache.helix.controller.rebalancer.waged.model.OptimalAssignment; +import org.apache.helix.model.Partition; +import org.apache.helix.model.ResourceAssignment; +import org.mockito.Mockito; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import static org.mockito.Mockito.when; + +/** + * A mock up rebalance algorithm for unit test. + * Note that the mock algorithm won't propagate the existing assignment to the output as a real + * algorithm will do. This is for the convenience of testing. + */ +public class MockRebalanceAlgorithm implements RebalanceAlgorithm { + Map<String, ResourceAssignment> _resultHistory = Collections.emptyMap(); + + @Override + public OptimalAssignment calculate(ClusterModel clusterModel) { + // If no predefined rebalance result setup, do card dealing. + Map<String, ResourceAssignment> result = new HashMap<>(); + Iterator<AssignableNode> nodeIterator = + clusterModel.getAssignableNodes().values().stream().sorted().iterator(); + for (String resource : clusterModel.getAssignableReplicaMap().keySet()) { + Iterator<AssignableReplica> replicaIterator = + clusterModel.getAssignableReplicaMap().get(resource).stream().sorted().iterator(); + while (replicaIterator.hasNext()) { + AssignableReplica replica = replicaIterator.next(); + if (!nodeIterator.hasNext()) { + nodeIterator = clusterModel.getAssignableNodes().values().stream().sorted().iterator(); + } + AssignableNode node = nodeIterator.next(); + + // Put the assignment + ResourceAssignment assignment = result.computeIfAbsent(replica.getResourceName(), + resourceName -> new ResourceAssignment(resourceName)); + Partition partition = new Partition(replica.getPartitionName()); + if (assignment.getReplicaMap(partition).isEmpty()) { + assignment.addReplicaMap(partition, new HashMap<>()); + } + assignment.getReplicaMap(partition).put(node.getInstanceName(), replica.getReplicaState()); + } + } + + _resultHistory = result; + + // TODO remove this mockito when OptimalAssignment.getOptimalResourceAssignment is ready. + OptimalAssignment optimalAssignment = Mockito.mock(OptimalAssignment.class); + when(optimalAssignment.getOptimalResourceAssignment()).thenReturn(result); + return optimalAssignment; + } + + public Map<String, ResourceAssignment> getRebalanceResult() { + return _resultHistory; + } +} diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java index a8a5de5..0f799b3 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java @@ -74,7 +74,7 @@ public abstract class AbstractTestClusterModel { _testFaultZoneId = "testZone"; } - InstanceConfig createMockInstanceConfig(String instanceId) { + protected InstanceConfig createMockInstanceConfig(String instanceId) { InstanceConfig testInstanceConfig = new InstanceConfig(instanceId); testInstanceConfig.setInstanceCapacityMap(_capacityDataMap); testInstanceConfig.addTag(_testInstanceTags.get(0)); @@ -83,7 +83,7 @@ public abstract class AbstractTestClusterModel { return testInstanceConfig; } - LiveInstance createMockLiveInstance(String instanceId) { + protected LiveInstance createMockLiveInstance(String instanceId) { LiveInstance testLiveInstance = new LiveInstance(instanceId); testLiveInstance.setSessionId(_sessionId); return testLiveInstance;
