This is an automated email from the ASF dual-hosted git repository. jiajunwang pushed a commit to branch wagedRebalancer in repository https://gitbox.apache.org/repos/asf/helix.git
commit 2699b61e3cba8ad64243917684e31ea710b1c3bf Author: Jiajun Wang <[email protected]> AuthorDate: Tue Oct 1 12:08:56 2019 -0700 Add delayed rebalance and user-defined preference list features to the WAGED rebalancer. (#456) - Add delayed rebalance and user-defined preference list features to the WAGED rebalancer. - Refine the delayed rebalance usage in the waged rebalancer. - Add the delayed rebalance scheduling logic. - Add the necessary tests. And fix TestMixedModeAutoRebalance and all delayed rebalance tests. --- .../rebalancer/DelayedAutoRebalancer.java | 203 ++-------------- .../rebalancer/util/DelayedRebalanceUtil.java | 267 +++++++++++++++++++++ .../rebalancer/waged/WagedRebalancer.java | 159 ++++++++++-- .../StrictMatchExternalViewVerifier.java | 6 +- .../java/org/apache/helix/common/ZkTestBase.java | 4 +- .../rebalancer/waged/TestWagedRebalancer.java | 35 ++- .../TestDelayedAutoRebalance.java | 57 +++-- ...stDelayedAutoRebalanceWithDisabledInstance.java | 33 +-- .../TestDelayedAutoRebalanceWithRackaware.java | 5 +- .../rebalancer/TestMixedModeAutoRebalance.java | 101 +++++--- .../rebalancer/TestZeroReplicaAvoidance.java | 74 ++++-- .../WagedRebalancer/TestDelayedWagedRebalance.java | 102 ++++++++ ...tDelayedWagedRebalanceWithDisabledInstance.java | 103 ++++++++ .../TestDelayedWagedRebalanceWithRackaware.java | 102 ++++++++ .../TestMixedModeWagedRebalance.java | 66 +++++ .../WagedRebalancer/TestWagedRebalance.java | 23 +- .../TestWagedRebalanceFaultZone.java | 10 +- .../apache/helix/tools/TestClusterVerifier.java | 45 +++- 18 files changed, 1059 insertions(+), 336 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java index 65b3f84..1073d6d 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java @@ -32,11 +32,10 @@ import org.apache.helix.HelixDefinedState; import org.apache.helix.ZNRecord; import org.apache.helix.api.config.StateTransitionThrottleConfig; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; -import org.apache.helix.controller.rebalancer.util.RebalanceScheduler; +import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil; import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.IdealState; -import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.Partition; import org.apache.helix.model.Resource; import org.apache.helix.model.ResourceAssignment; @@ -50,7 +49,6 @@ import org.slf4j.LoggerFactory; */ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceControllerDataProvider> { private static final Logger LOG = LoggerFactory.getLogger(DelayedAutoRebalancer.class); - private static RebalanceScheduler _rebalanceScheduler = new RebalanceScheduler(); @Override public IdealState computeNewIdealState(String resourceName, @@ -79,7 +77,8 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController ClusterConfig clusterConfig = clusterData.getClusterConfig(); ResourceConfig resourceConfig = clusterData.getResourceConfig(resourceName); - boolean delayRebalanceEnabled = isDelayRebalanceEnabled(currentIdealState, clusterConfig); + boolean delayRebalanceEnabled = + DelayedRebalanceUtil.isDelayRebalanceEnabled(currentIdealState, clusterConfig); if (resourceConfig != null) { userDefinedPreferenceList = resourceConfig.getPreferenceLists(); @@ -110,16 +109,18 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController Set<String> activeNodes = liveEnabledNodes; if (delayRebalanceEnabled) { - long delay = getRebalanceDelay(currentIdealState, clusterConfig); - activeNodes = getActiveInstances(allNodes, currentIdealState, liveEnabledNodes, - clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(), - clusterData.getInstanceConfigMap(), delay, clusterConfig); + long delay = DelayedRebalanceUtil.getRebalanceDelay(currentIdealState, clusterConfig); + activeNodes = DelayedRebalanceUtil + .getActiveNodes(allNodes, currentIdealState, liveEnabledNodes, + clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(), + clusterData.getInstanceConfigMap(), delay, clusterConfig); Set<String> offlineOrDisabledInstances = new HashSet<>(activeNodes); offlineOrDisabledInstances.removeAll(liveEnabledNodes); - setRebalanceScheduler(currentIdealState, offlineOrDisabledInstances, - clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(), - clusterData.getInstanceConfigMap(), delay, clusterConfig); + DelayedRebalanceUtil.setRebalanceScheduler(currentIdealState.getResourceName(), true, + offlineOrDisabledInstances, clusterData.getInstanceOfflineTimeMap(), + clusterData.getLiveInstances().keySet(), clusterData.getInstanceConfigMap(), delay, + clusterConfig, _manager); } if (allNodes.isEmpty() || activeNodes.isEmpty()) { @@ -162,16 +163,16 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController .computePartitionAssignment(allNodeList, liveEnabledNodeList, currentMapping, clusterData); ZNRecord finalMapping = newIdealMapping; - if (isDelayRebalanceEnabled(currentIdealState, clusterConfig)) { + if (DelayedRebalanceUtil.isDelayRebalanceEnabled(currentIdealState, clusterConfig)) { List<String> activeNodeList = new ArrayList<>(activeNodes); Collections.sort(activeNodeList); - int minActiveReplicas = getMinActiveReplica(currentIdealState, replicaCount); + int minActiveReplicas = + DelayedRebalanceUtil.getMinActiveReplica(currentIdealState, replicaCount); ZNRecord newActiveMapping = _rebalanceStrategy .computePartitionAssignment(allNodeList, activeNodeList, currentMapping, clusterData); - finalMapping = - getFinalDelayedMapping(currentIdealState, newIdealMapping, newActiveMapping, liveEnabledNodes, - replicaCount, minActiveReplicas); + finalMapping = getFinalDelayedMapping(currentIdealState, newIdealMapping, newActiveMapping, + liveEnabledNodes, replicaCount, minActiveReplicas); } finalMapping.getListFields().putAll(userDefinedPreferenceList); @@ -202,162 +203,15 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController return newIdealState; } - /* get all active instances (live instances plus offline-yet-active instances */ - private Set<String> getActiveInstances(Set<String> allNodes, IdealState idealState, - Set<String> liveEnabledNodes, Map<String, Long> instanceOfflineTimeMap, Set<String> liveNodes, - Map<String, InstanceConfig> instanceConfigMap, long delay, ClusterConfig clusterConfig) { - Set<String> activeInstances = new HashSet<>(liveEnabledNodes); - - if (!isDelayRebalanceEnabled(idealState, clusterConfig)) { - return activeInstances; - } - - Set<String> offlineOrDisabledInstances = new HashSet<>(allNodes); - offlineOrDisabledInstances.removeAll(liveEnabledNodes); - - long currentTime = System.currentTimeMillis(); - for (String ins : offlineOrDisabledInstances) { - long inactiveTime = getInactiveTime(ins, liveNodes, instanceOfflineTimeMap.get(ins), delay, - instanceConfigMap.get(ins), clusterConfig); - InstanceConfig instanceConfig = instanceConfigMap.get(ins); - if (inactiveTime > currentTime && instanceConfig != null && instanceConfig - .isDelayRebalanceEnabled()) { - activeInstances.add(ins); - } - } - - return activeInstances; - } - - /* Set a rebalance scheduler for the closest future rebalance time. */ - private void setRebalanceScheduler(IdealState idealState, Set<String> offlineOrDisabledInstances, - Map<String, Long> instanceOfflineTimeMap, Set<String> liveNodes, - Map<String, InstanceConfig> instanceConfigMap, long delay, - ClusterConfig clusterConfig) { - String resourceName = idealState.getResourceName(); - if (!isDelayRebalanceEnabled(idealState, clusterConfig)) { - _rebalanceScheduler.removeScheduledRebalance(resourceName); - return; - } - - long currentTime = System.currentTimeMillis(); - long nextRebalanceTime = Long.MAX_VALUE; - // calculate the closest future rebalance time - for (String ins : offlineOrDisabledInstances) { - long inactiveTime = getInactiveTime(ins, liveNodes, instanceOfflineTimeMap.get(ins), delay, - instanceConfigMap.get(ins), clusterConfig); - if (inactiveTime != -1 && inactiveTime > currentTime && inactiveTime < nextRebalanceTime) { - nextRebalanceTime = inactiveTime; - } - } - - if (nextRebalanceTime == Long.MAX_VALUE) { - long startTime = _rebalanceScheduler.removeScheduledRebalance(resourceName); - if (LOG.isDebugEnabled()) { - LOG.debug(String - .format("Remove exist rebalance timer for resource %s at %d\n", resourceName, startTime)); - } - } else { - long currentScheduledTime = _rebalanceScheduler.getRebalanceTime(resourceName); - if (currentScheduledTime < 0 || currentScheduledTime > nextRebalanceTime) { - _rebalanceScheduler.scheduleRebalance(_manager, resourceName, nextRebalanceTime); - if (LOG.isDebugEnabled()) { - LOG.debug(String - .format("Set next rebalance time for resource %s at time %d\n", resourceName, - nextRebalanceTime)); - } - } - } - } - - /** - * The time when an offline or disabled instance should be treated as inactive. return -1 if it is - * inactive now. - * - * @return - */ - private long getInactiveTime(String instance, Set<String> liveInstances, Long offlineTime, - long delay, InstanceConfig instanceConfig, ClusterConfig clusterConfig) { - long inactiveTime = Long.MAX_VALUE; - - // check the time instance went offline. - if (!liveInstances.contains(instance)) { - if (offlineTime != null && offlineTime > 0 && offlineTime + delay < inactiveTime) { - inactiveTime = offlineTime + delay; - } - } - - // check the time instance got disabled. - if (!instanceConfig.getInstanceEnabled() || (clusterConfig.getDisabledInstances() != null - && clusterConfig.getDisabledInstances().containsKey(instance))) { - long disabledTime = instanceConfig.getInstanceEnabledTime(); - if (clusterConfig.getDisabledInstances() != null && clusterConfig.getDisabledInstances() - .containsKey(instance)) { - // Update batch disable time - long batchDisableTime = Long.parseLong(clusterConfig.getDisabledInstances().get(instance)); - if (disabledTime == -1 || disabledTime > batchDisableTime) { - disabledTime = batchDisableTime; - } - } - if (disabledTime > 0 && disabledTime + delay < inactiveTime) { - inactiveTime = disabledTime + delay; - } - } - - if (inactiveTime == Long.MAX_VALUE) { - return -1; - } - - return inactiveTime; - } - - private long getRebalanceDelay(IdealState idealState, ClusterConfig clusterConfig) { - long delayTime = idealState.getRebalanceDelay(); - if (delayTime < 0) { - delayTime = clusterConfig.getRebalanceDelayTime(); - } - return delayTime; - } - - private boolean isDelayRebalanceEnabled(IdealState idealState, ClusterConfig clusterConfig) { - long delay = getRebalanceDelay(idealState, clusterConfig); - return (delay > 0 && idealState.isDelayRebalanceEnabled() && clusterConfig - . isDelayRebalaceEnabled()); - } - private ZNRecord getFinalDelayedMapping(IdealState idealState, ZNRecord newIdealMapping, ZNRecord newActiveMapping, Set<String> liveInstances, int numReplica, int minActiveReplica) { if (minActiveReplica >= numReplica) { return newIdealMapping; } ZNRecord finalMapping = new ZNRecord(idealState.getResourceName()); - for (String partition : newIdealMapping.getListFields().keySet()) { - List<String> idealList = newIdealMapping.getListField(partition); - List<String> activeList = newActiveMapping.getListField(partition); - - List<String> liveList = new ArrayList<>(); - int activeReplica = 0; - for (String ins : activeList) { - if (liveInstances.contains(ins)) { - activeReplica++; - liveList.add(ins); - } - } - - if (activeReplica >= minActiveReplica) { - finalMapping.setListField(partition, activeList); - } else { - List<String> candidates = new ArrayList<String>(idealList); - candidates.removeAll(activeList); - for (String liveIns : candidates) { - liveList.add(liveIns); - if (liveList.size() >= minActiveReplica) { - break; - } - } - finalMapping.setListField(partition, liveList); - } - } + finalMapping.setListFields(DelayedRebalanceUtil + .getFinalDelayedMapping(newIdealMapping.getListFields(), newActiveMapping.getListFields(), + liveInstances, minActiveReplica)); return finalMapping; } @@ -391,10 +245,11 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController Set<String> liveNodes = cache.getLiveInstances().keySet(); ClusterConfig clusterConfig = cache.getClusterConfig(); - long delayTime = getRebalanceDelay(idealState, clusterConfig); - Set<String> activeNodes = getActiveInstances(allNodes, idealState, liveNodes, - cache.getInstanceOfflineTimeMap(), cache.getLiveInstances().keySet(), - cache.getInstanceConfigMap(), delayTime, clusterConfig); + long delayTime = DelayedRebalanceUtil.getRebalanceDelay(idealState, clusterConfig); + Set<String> activeNodes = DelayedRebalanceUtil + .getActiveNodes(allNodes, idealState, liveNodes, cache.getInstanceOfflineTimeMap(), + cache.getLiveInstances().keySet(), cache.getInstanceConfigMap(), delayTime, + clusterConfig); String stateModelDefName = idealState.getStateModelDefRef(); StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName); @@ -419,14 +274,6 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController return partitionMapping; } - private int getMinActiveReplica(IdealState idealState, int replicaCount) { - int minActiveReplicas = idealState.getMinActiveReplicas(); - if (minActiveReplicas < 0) { - minActiveReplicas = replicaCount; - } - return minActiveReplicas; - } - /** * compute best state for resource in AUTO ideal state mode * @param liveInstances diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java new file mode 100644 index 0000000..1342860 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java @@ -0,0 +1,267 @@ +package org.apache.helix.controller.rebalancer.util; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.helix.HelixManager; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The util for supporting delayed rebalance logic. + */ +public class DelayedRebalanceUtil { + private static final Logger LOG = LoggerFactory.getLogger(DelayedRebalanceUtil.class); + + private static RebalanceScheduler REBALANCE_SCHEDULER = new RebalanceScheduler(); + + /** + * @return true if delay rebalance is configured and enabled in the ClusterConfig configurations. + */ + public static boolean isDelayRebalanceEnabled(ClusterConfig clusterConfig) { + long delay = clusterConfig.getRebalanceDelayTime(); + return (delay > 0 && clusterConfig.isDelayRebalaceEnabled()); + } + + /** + * @return true if delay rebalance is configured and enabled in Resource IdealState and the + * ClusterConfig configurations. + */ + public static boolean isDelayRebalanceEnabled(IdealState idealState, + ClusterConfig clusterConfig) { + long delay = getRebalanceDelay(idealState, clusterConfig); + return (delay > 0 && idealState.isDelayRebalanceEnabled() && clusterConfig + .isDelayRebalaceEnabled()); + } + + /** + * @return the rebalance delay based on Resource IdealState and the ClusterConfig configurations. + */ + public static long getRebalanceDelay(IdealState idealState, ClusterConfig clusterConfig) { + long delayTime = idealState.getRebalanceDelay(); + if (delayTime < 0) { + delayTime = clusterConfig.getRebalanceDelayTime(); + } + return delayTime; + } + + /** + * @return all active nodes (live nodes plus offline-yet-active nodes) while considering cluster + * delay rebalance configurations. + */ + public static Set<String> getActiveNodes(Set<String> allNodes, Set<String> liveEnabledNodes, + Map<String, Long> instanceOfflineTimeMap, Set<String> liveNodes, + Map<String, InstanceConfig> instanceConfigMap, ClusterConfig clusterConfig) { + if (!isDelayRebalanceEnabled(clusterConfig)) { + return new HashSet<>(liveEnabledNodes); + } + return getActiveNodes(allNodes, liveEnabledNodes, instanceOfflineTimeMap, liveNodes, + instanceConfigMap, clusterConfig.getRebalanceDelayTime(), clusterConfig); + } + + /** + * @return all active nodes (live nodes plus offline-yet-active nodes) while considering cluster + * and the resource delay rebalance configurations. + */ + public static Set<String> getActiveNodes(Set<String> allNodes, IdealState idealState, + Set<String> liveEnabledNodes, Map<String, Long> instanceOfflineTimeMap, Set<String> liveNodes, + Map<String, InstanceConfig> instanceConfigMap, long delay, ClusterConfig clusterConfig) { + if (!isDelayRebalanceEnabled(idealState, clusterConfig)) { + return new HashSet<>(liveEnabledNodes); + } + return getActiveNodes(allNodes, liveEnabledNodes, instanceOfflineTimeMap, liveNodes, + instanceConfigMap, delay, clusterConfig); + } + + private static Set<String> getActiveNodes(Set<String> allNodes, Set<String> liveEnabledNodes, + Map<String, Long> instanceOfflineTimeMap, Set<String> liveNodes, + Map<String, InstanceConfig> instanceConfigMap, long delay, ClusterConfig clusterConfig) { + Set<String> activeNodes = new HashSet<>(liveEnabledNodes); + Set<String> offlineOrDisabledInstances = new HashSet<>(allNodes); + offlineOrDisabledInstances.removeAll(liveEnabledNodes); + long currentTime = System.currentTimeMillis(); + for (String ins : offlineOrDisabledInstances) { + long inactiveTime = getInactiveTime(ins, liveNodes, instanceOfflineTimeMap.get(ins), delay, + instanceConfigMap.get(ins), clusterConfig); + InstanceConfig instanceConfig = instanceConfigMap.get(ins); + if (inactiveTime > currentTime && instanceConfig != null && instanceConfig + .isDelayRebalanceEnabled()) { + activeNodes.add(ins); + } + } + return activeNodes; + } + + /** + * @return The time when an offline or disabled instance should be treated as inactive. + * Return -1 if it is inactive now. + */ + private static long getInactiveTime(String instance, Set<String> liveInstances, Long offlineTime, + long delay, InstanceConfig instanceConfig, ClusterConfig clusterConfig) { + long inactiveTime = Long.MAX_VALUE; + + // check the time instance went offline. + if (!liveInstances.contains(instance)) { + if (offlineTime != null && offlineTime > 0 && offlineTime + delay < inactiveTime) { + inactiveTime = offlineTime + delay; + } + } + + // check the time instance got disabled. + if (!instanceConfig.getInstanceEnabled() || (clusterConfig.getDisabledInstances() != null + && clusterConfig.getDisabledInstances().containsKey(instance))) { + long disabledTime = instanceConfig.getInstanceEnabledTime(); + if (clusterConfig.getDisabledInstances() != null && clusterConfig.getDisabledInstances() + .containsKey(instance)) { + // Update batch disable time + long batchDisableTime = Long.parseLong(clusterConfig.getDisabledInstances().get(instance)); + if (disabledTime == -1 || disabledTime > batchDisableTime) { + disabledTime = batchDisableTime; + } + } + if (disabledTime > 0 && disabledTime + delay < inactiveTime) { + inactiveTime = disabledTime + delay; + } + } + + if (inactiveTime == Long.MAX_VALUE) { + return -1; + } + + return inactiveTime; + } + + /** + * Merge the new ideal preference list with the delayed mapping that is calculated based on the + * delayed rebalance configurations. + * The method will prioritize the "active" preference list so as to avoid unnecessary transient + * state transitions. + * + * @param newIdealPreferenceList the ideal mapping that was calculated based on the current + * instance status + * @param newDelayedPreferenceList the delayed mapping that was calculated based on the delayed + * instance status + * @param liveEnabledInstances list of all the nodes that are both alive and enabled. + * @param minActiveReplica the minimum replica count to ensure a valid mapping. + * If the active list does not have enough replica assignment, + * this method will fill the list with the new ideal mapping until + * the replica count satisfies the minimum requirement. + * @return the merged state mapping. + */ + public static Map<String, List<String>> getFinalDelayedMapping( + Map<String, List<String>> newIdealPreferenceList, + Map<String, List<String>> newDelayedPreferenceList, Set<String> liveEnabledInstances, + int minActiveReplica) { + Map<String, List<String>> finalPreferenceList = new HashMap<>(); + for (String partition : newIdealPreferenceList.keySet()) { + List<String> idealList = newIdealPreferenceList.get(partition); + List<String> delayedIdealList = newDelayedPreferenceList.get(partition); + + List<String> liveList = new ArrayList<>(); + for (String ins : delayedIdealList) { + if (liveEnabledInstances.contains(ins)) { + liveList.add(ins); + } + } + + if (liveList.size() >= minActiveReplica) { + finalPreferenceList.put(partition, delayedIdealList); + } else { + List<String> candidates = new ArrayList<>(idealList); + candidates.removeAll(delayedIdealList); + for (String liveIns : candidates) { + liveList.add(liveIns); + if (liveList.size() >= minActiveReplica) { + break; + } + } + finalPreferenceList.put(partition, liveList); + } + } + return finalPreferenceList; + } + + /** + * Get the minimum active replica count threshold that allows delayed rebalance. + * + * @param idealState the resource Ideal State + * @param replicaCount the expected active replica count. + * @return the expected minimum active replica count that is required + */ + public static int getMinActiveReplica(IdealState idealState, int replicaCount) { + int minActiveReplicas = idealState.getMinActiveReplicas(); + if (minActiveReplicas < 0) { + minActiveReplicas = replicaCount; + } + return minActiveReplicas; + } + + /** + * Set a rebalance scheduler for the closest future rebalance time. + */ + public static void setRebalanceScheduler(String resourceName, boolean isDelayedRebalanceEnabled, + Set<String> offlineOrDisabledInstances, Map<String, Long> instanceOfflineTimeMap, + Set<String> liveNodes, Map<String, InstanceConfig> instanceConfigMap, long delay, + ClusterConfig clusterConfig, HelixManager manager) { + if (!isDelayedRebalanceEnabled) { + REBALANCE_SCHEDULER.removeScheduledRebalance(resourceName); + return; + } + + long currentTime = System.currentTimeMillis(); + long nextRebalanceTime = Long.MAX_VALUE; + // calculate the closest future rebalance time + for (String ins : offlineOrDisabledInstances) { + long inactiveTime = getInactiveTime(ins, liveNodes, instanceOfflineTimeMap.get(ins), delay, + instanceConfigMap.get(ins), clusterConfig); + if (inactiveTime != -1 && inactiveTime > currentTime && inactiveTime < nextRebalanceTime) { + nextRebalanceTime = inactiveTime; + } + } + + if (nextRebalanceTime == Long.MAX_VALUE) { + long startTime = REBALANCE_SCHEDULER.removeScheduledRebalance(resourceName); + if (LOG.isDebugEnabled()) { + LOG.debug(String + .format("Remove exist rebalance timer for resource %s at %d\n", resourceName, + startTime)); + } + } else { + long currentScheduledTime = REBALANCE_SCHEDULER.getRebalanceTime(resourceName); + if (currentScheduledTime < 0 || currentScheduledTime > nextRebalanceTime) { + REBALANCE_SCHEDULER.scheduleRebalance(manager, resourceName, nextRebalanceTime); + if (LOG.isDebugEnabled()) { + LOG.debug(String + .format("Set next rebalance time for resource %s at time %d\n", resourceName, + nextRebalanceTime)); + } + } + } + } +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java index 1861e10..d211884 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java @@ -19,6 +19,7 @@ package org.apache.helix.controller.rebalancer.waged; * under the License. */ +import com.google.common.collect.ImmutableSet; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -27,7 +28,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; - import org.apache.helix.HelixConstants; import org.apache.helix.HelixException; import org.apache.helix.HelixManager; @@ -36,6 +36,7 @@ import org.apache.helix.controller.changedetector.ResourceChangeDetector; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer; import org.apache.helix.controller.rebalancer.internal.MappingCalculator; +import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil; import org.apache.helix.controller.rebalancer.waged.constraints.ConstraintBasedAlgorithmFactory; import org.apache.helix.controller.rebalancer.waged.model.ClusterModel; import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider; @@ -46,12 +47,10 @@ import org.apache.helix.model.IdealState; import org.apache.helix.model.Partition; import org.apache.helix.model.Resource; import org.apache.helix.model.ResourceAssignment; +import org.apache.helix.model.ResourceConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableSet; - /** * Weight-Aware Globally-Even Distribute Rebalancer. * @see <a @@ -73,6 +72,7 @@ public class WagedRebalancer { // Make it static to avoid unnecessary reinitialization. private static final ThreadLocal<ResourceChangeDetector> CHANGE_DETECTOR_THREAD_LOCAL = new ThreadLocal<>(); + private final HelixManager _manager; private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator; private final AssignmentMetadataStore _assignmentMetadataStore; private final RebalanceAlgorithm _rebalanceAlgorithm; @@ -97,11 +97,18 @@ public class WagedRebalancer { // Mapping calculator will translate the best possible assignment into the applicable state // mapping based on the current states. // TODO abstract and separate the main mapping calculator logic from DelayedAutoRebalancer - new DelayedAutoRebalancer()); + new DelayedAutoRebalancer(), + // Helix Manager is required for the rebalancer scheduler + helixManager); } - private WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore, + protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore, RebalanceAlgorithm algorithm, MappingCalculator mappingCalculator) { + this(assignmentMetadataStore, algorithm, mappingCalculator, null); + } + + private WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore, + RebalanceAlgorithm algorithm, MappingCalculator mappingCalculator, HelixManager manager) { if (assignmentMetadataStore == null) { LOG.warn("Assignment Metadata Store is not configured properly." + " The rebalancer will not access the assignment store during the rebalance."); @@ -109,12 +116,7 @@ public class WagedRebalancer { _assignmentMetadataStore = assignmentMetadataStore; _rebalanceAlgorithm = algorithm; _mappingCalculator = mappingCalculator; - } - - @VisibleForTesting - protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore, - RebalanceAlgorithm algorithm) { - this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer()); + _manager = manager; } // Release all the resources. @@ -196,29 +198,59 @@ public class WagedRebalancer { clusterChanges.putIfAbsent(HelixConstants.ChangeType.CLUSTER_CONFIG, Collections.emptySet()); } + Set<String> activeNodes = DelayedRebalanceUtil + .getActiveNodes(clusterData.getAllInstances(), clusterData.getEnabledLiveInstances(), + clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(), + clusterData.getInstanceConfigMap(), clusterData.getClusterConfig()); + + // Schedule (or unschedule) delayed rebalance according to the delayed rebalance config. + delayedRebalanceSchedule(clusterData, activeNodes, resourceMap.keySet()); + Map<String, ResourceAssignment> newAssignment = - partialRebalance(clusterData, clusterChanges, resourceMap, currentStateOutput); + partialRebalance(clusterData, clusterChanges, resourceMap, activeNodes, + currentStateOutput); + // <ResourceName, <State, Priority>> + Map<String, Map<String, Integer>> resourceStatePriorityMap = new HashMap<>(); // Convert the assignments into IdealState for the following state mapping calculation. - Map<String, IdealState> finalIdealState = new HashMap<>(); + Map<String, IdealState> finalIdealStateMap = new HashMap<>(); for (String resourceName : newAssignment.keySet()) { - IdealState newIdeaState; + IdealState newIdealState; try { IdealState currentIdealState = clusterData.getIdealState(resourceName); Map<String, Integer> statePriorityMap = clusterData .getStateModelDef(currentIdealState.getStateModelDefRef()).getStatePriorityMap(); + // Keep the priority map for the rebalance overwrite logic later. + resourceStatePriorityMap.put(resourceName, statePriorityMap); // Create a new IdealState instance contains the new calculated assignment in the preference // list. - newIdeaState = generateIdealStateWithAssignment(resourceName, currentIdealState, + newIdealState = generateIdealStateWithAssignment(resourceName, currentIdealState, newAssignment.get(resourceName), statePriorityMap); } catch (Exception ex) { throw new HelixRebalanceException( "Fail to calculate the new IdealState for resource: " + resourceName, HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex); } - finalIdealState.put(resourceName, newIdeaState); + finalIdealStateMap.put(resourceName, newIdealState); + } + + // The additional rebalance overwrite is required since the calculated mapping may contains + // some delayed rebalanced assignments. + if (!activeNodes.equals(clusterData.getEnabledLiveInstances())) { + applyRebalanceOverwrite(finalIdealStateMap, clusterData, resourceMap, clusterChanges, + resourceStatePriorityMap, + getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, + resourceMap.keySet())); } - return finalIdealState; + // Replace the assignment if user-defined preference list is configured. + // Note the user-defined list is intentionally applied to the final mapping after calculation. + // This is to avoid persisting it into the assignment store, which impacts the long term + // assignment evenness and partition movements. + finalIdealStateMap.entrySet().stream().forEach( + idealStateEntry -> applyUserDefinedPreferenceList( + clusterData.getResourceConfig(idealStateEntry.getKey()), idealStateEntry.getValue())); + + return finalIdealStateMap; } // TODO make the Baseline calculation async if complicated algorithm is used for the Baseline @@ -253,7 +285,8 @@ public class WagedRebalancer { private Map<String, ResourceAssignment> partialRebalance( ResourceControllerDataProvider clusterData, Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String, Resource> resourceMap, - final CurrentStateOutput currentStateOutput) throws HelixRebalanceException { + Set<String> activeNodes, final CurrentStateOutput currentStateOutput) + throws HelixRebalanceException { LOG.info("Start calculating the new best possible assignment."); Map<String, ResourceAssignment> currentBaseline = getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet()); @@ -261,8 +294,8 @@ public class WagedRebalancer { getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet()); Map<String, ResourceAssignment> newAssignment = - calculateAssignment(clusterData, clusterChanges, resourceMap, - clusterData.getEnabledLiveInstances(), currentBaseline, currentBestPossibleAssignment); + calculateAssignment(clusterData, clusterChanges, resourceMap, activeNodes, currentBaseline, + currentBestPossibleAssignment); if (_assignmentMetadataStore != null) { try { @@ -458,4 +491,88 @@ public class WagedRebalancer { } return currentStateAssignment; } + + /** + * Schedule rebalance according to the delayed rebalance logic. + * @param clusterData the current cluster data cache + * @param delayedActiveNodes the active nodes set that is calculated with the delay time window + * @param resourceSet the rebalanced resourceSet + */ + private void delayedRebalanceSchedule(ResourceControllerDataProvider clusterData, + Set<String> delayedActiveNodes, Set<String> resourceSet) { + if (_manager != null) { + // Schedule for the next delayed rebalance in case no cluster change event happens. + ClusterConfig clusterConfig = clusterData.getClusterConfig(); + boolean delayedRebalanceEnabled = DelayedRebalanceUtil.isDelayRebalanceEnabled(clusterConfig); + Set<String> offlineOrDisabledInstances = new HashSet<>(delayedActiveNodes); + offlineOrDisabledInstances.removeAll(clusterData.getEnabledLiveInstances()); + for (String resource : resourceSet) { + DelayedRebalanceUtil + .setRebalanceScheduler(resource, delayedRebalanceEnabled, offlineOrDisabledInstances, + clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(), + clusterData.getInstanceConfigMap(), clusterConfig.getRebalanceDelayTime(), + clusterConfig, _manager); + } + } else { + LOG.warn("Skip scheduling a delayed rebalancer since HelixManager is not specified."); + } + } + + /** + * Update the rebalanced ideal states according to the real active nodes. + * Since the rebalancing might be done with the delayed logic, the rebalanced ideal states + * might include inactive nodes. + * This overwrite will adjust the final mapping, so as to ensure the result is completely valid. + * @param idealStateMap the calculated ideal states. + * @param clusterData the cluster data cache. + * @param resourceMap the rebalanaced resource map. + * @param clusterChanges the detected cluster changes that triggeres the rebalance. + * @param resourceStatePriorityMap the state priority map for each resource. + * @param baseline the baseline assignment + */ + private void applyRebalanceOverwrite(Map<String, IdealState> idealStateMap, + ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap, + Map<HelixConstants.ChangeType, Set<String>> clusterChanges, + Map<String, Map<String, Integer>> resourceStatePriorityMap, + Map<String, ResourceAssignment> baseline) + throws HelixRebalanceException { + Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances(); + // Note that the calculation used the baseline as the input only. This is for minimizing unnecessary partition movement. + Map<String, ResourceAssignment> activeAssignment = + calculateAssignment(clusterData, clusterChanges, resourceMap, enabledLiveInstances, + Collections.emptyMap(), baseline); + for (String resourceName : idealStateMap.keySet()) { + IdealState is = idealStateMap.get(resourceName); + if (!activeAssignment.containsKey(resourceName)) { + throw new HelixRebalanceException( + "Failed to calculate the complete partition assignment with all active nodes. Cannot find the resource assignment for " + + resourceName, HelixRebalanceException.Type.FAILED_TO_CALCULATE); + } + IdealState currentIdealState = clusterData.getIdealState(resourceName); + IdealState newActiveIdealState = + generateIdealStateWithAssignment(resourceName, currentIdealState, + activeAssignment.get(resourceName), resourceStatePriorityMap.get(resourceName)); + + int numReplia = currentIdealState.getReplicaCount(enabledLiveInstances.size()); + int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(currentIdealState, numReplia); + Map<String, List<String>> finalPreferenceLists = DelayedRebalanceUtil + .getFinalDelayedMapping(newActiveIdealState.getPreferenceLists(), is.getPreferenceLists(), + enabledLiveInstances, Math.min(minActiveReplica, numReplia)); + + is.setPreferenceLists(finalPreferenceLists); + } + } + + private void applyUserDefinedPreferenceList(ResourceConfig resourceConfig, + IdealState idealState) { + if (resourceConfig != null) { + Map<String, List<String>> userDefinedPreferenceList = resourceConfig.getPreferenceLists(); + if (!userDefinedPreferenceList.isEmpty()) { + LOG.info("Using user defined preference list for partitions."); + for (String partition : userDefinedPreferenceList.keySet()) { + idealState.setPreferenceList(partition, userDefinedPreferenceList.get(partition)); + } + } + } + } } diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java index 85f0397..f3bca9e 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java +++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java @@ -28,6 +28,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; + import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixException; import org.apache.helix.PropertyKey; @@ -250,11 +251,12 @@ public class StrictMatchExternalViewVerifier extends ZkHelixClusterVerifier { + "is enabled.")); } for (String partition : idealState.getPartitionSet()) { - if (idealState.getPreferenceList(partition) == null || idealState.getPreferenceList(partition).isEmpty()) { + if (idealState.getInstanceStateMap(partition) == null || idealState + .getInstanceStateMap(partition).isEmpty()) { return false; } } - idealPartitionState = computeIdealPartitionState(dataCache, idealState); + idealPartitionState = idealState.getRecord().getMapFields(); break; case SEMI_AUTO: case USER_DEFINED: diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java index b9284b9..e166e13 100644 --- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java +++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java @@ -353,9 +353,9 @@ public class ZkTestBase { } protected IdealState createResourceWithWagedRebalance(String clusterName, String db, - String stateModel, int numPartition, int replica, int minActiveReplica, long delay) { + String stateModel, int numPartition, int replica, int minActiveReplica) { return createResource(clusterName, db, stateModel, numPartition, replica, minActiveReplica, - delay, WagedRebalancer.class.getName(), null); + -1, WagedRebalancer.class.getName(), null); } private IdealState createResource(String clusterName, String db, String stateModel, diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java index e7368be..96b6523 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java @@ -26,10 +26,10 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; - import org.apache.helix.HelixConstants; import org.apache.helix.HelixRebalanceException; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer; import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy; import org.apache.helix.controller.rebalancer.waged.constraints.MockRebalanceAlgorithm; import org.apache.helix.controller.rebalancer.waged.model.AbstractTestClusterModel; @@ -112,7 +112,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel { @Test public void testRebalance() throws IOException, HelixRebalanceException { _metadataStore.clearMetadataStore(); - WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm); + WagedRebalancer rebalancer = + new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer()); // Generate the input for the rebalancer. ResourceControllerDataProvider clusterData = setupClusterDataCache(); @@ -132,9 +133,11 @@ public class TestWagedRebalancer extends AbstractTestClusterModel { } @Test(dependsOnMethods = "testRebalance") - public void testPartialRebalance() throws IOException, HelixRebalanceException { + public void testPartialRebalance() + throws IOException, HelixRebalanceException { _metadataStore.clearMetadataStore(); - WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm); + WagedRebalancer rebalancer = + new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer()); // Generate the input for the rebalancer. ResourceControllerDataProvider clusterData = setupClusterDataCache(); @@ -159,7 +162,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel { @Test(dependsOnMethods = "testRebalance") public void testRebalanceWithCurrentState() throws IOException, HelixRebalanceException { _metadataStore.clearMetadataStore(); - WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm); + WagedRebalancer rebalancer = + new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer()); // Generate the input for the rebalancer. ResourceControllerDataProvider clusterData = setupClusterDataCache(); @@ -216,9 +220,11 @@ public class TestWagedRebalancer extends AbstractTestClusterModel { } @Test(dependsOnMethods = "testRebalance", expectedExceptions = HelixRebalanceException.class, expectedExceptionsMessageRegExp = "Input contains invalid resource\\(s\\) that cannot be rebalanced by the WAGED rebalancer. \\[Resource1\\] Failure Type: INVALID_INPUT") - public void testNonCompatibleConfiguration() throws IOException, HelixRebalanceException { + public void testNonCompatibleConfiguration() + throws IOException, HelixRebalanceException { _metadataStore.clearMetadataStore(); - WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm); + WagedRebalancer rebalancer = + new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer()); ResourceControllerDataProvider clusterData = setupClusterDataCache(); String nonCompatibleResourceName = _resourceNames.get(0); @@ -237,9 +243,11 @@ public class TestWagedRebalancer extends AbstractTestClusterModel { // TODO test with invalid capacity configuration which will fail the cluster model constructing. @Test(dependsOnMethods = "testRebalance") - public void testInvalidClusterStatus() throws IOException { + public void testInvalidClusterStatus() + throws IOException { _metadataStore.clearMetadataStore(); - WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm); + WagedRebalancer rebalancer = + new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer()); ResourceControllerDataProvider clusterData = setupClusterDataCache(); String invalidResource = _resourceNames.get(0); @@ -264,7 +272,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel { AssignmentMetadataStore metadataStore = Mockito.mock(AssignmentMetadataStore.class); when(metadataStore.getBaseline()) .thenThrow(new RuntimeException("Mock Error. Metadata store fails.")); - WagedRebalancer rebalancer = new WagedRebalancer(metadataStore, _algorithm); + WagedRebalancer rebalancer = + new WagedRebalancer(metadataStore, _algorithm, new DelayedAutoRebalancer()); ResourceControllerDataProvider clusterData = setupClusterDataCache(); // The input resource Map shall contain all the valid resources. @@ -288,7 +297,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel { HelixRebalanceException.Type.FAILED_TO_CALCULATE)); _metadataStore.clearMetadataStore(); - WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, badAlgorithm); + WagedRebalancer rebalancer = + new WagedRebalancer(_metadataStore, badAlgorithm, new DelayedAutoRebalancer()); ResourceControllerDataProvider clusterData = setupClusterDataCache(); Map<String, Resource> resourceMap = clusterData.getIdealStates().keySet().stream().collect( @@ -312,7 +322,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel { // won't propagate any existing assignment from the cluster model. _metadataStore.clearMetadataStore(); - WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm); + WagedRebalancer rebalancer = + new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer()); // 1. rebalance with baseline calculation done // Generate the input for the rebalancer. diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java index 0105a51..7d4965e 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java @@ -28,7 +28,6 @@ import java.util.Map; import java.util.Set; import org.apache.helix.common.ZkTestBase; import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy; -import org.apache.helix.controller.rebalancer.util.RebalanceScheduler; import org.apache.helix.integration.manager.ClusterControllerManager; import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.model.BuiltInStateModelDefinitions; @@ -44,19 +43,22 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; public class TestDelayedAutoRebalance extends ZkTestBase { - final int NUM_NODE = 5; + static final int NUM_NODE = 5; protected static final int START_PORT = 12918; - protected static final int _PARTITIONS = 5; + protected static final int PARTITIONS = 5; + // TODO: remove this wait time once we have a better way to determine if the rebalance has been + // TODO: done as a reaction of the test operations. + protected static final int DEFAULT_REBALANCE_PROCESSING_WAIT_TIME = 1000; protected final String CLASS_NAME = getShortClassName(); protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; protected ClusterControllerManager _controller; - List<MockParticipantManager> _participants = new ArrayList<>(); - int _replica = 3; - int _minActiveReplica = _replica - 1; - ZkHelixClusterVerifier _clusterVerifier; - List<String> _testDBs = new ArrayList<String>(); + protected List<MockParticipantManager> _participants = new ArrayList<>(); + protected int _replica = 3; + protected int _minActiveReplica = _replica - 1; + protected ZkHelixClusterVerifier _clusterVerifier; + protected List<String> _testDBs = new ArrayList<>(); @BeforeClass public void beforeClass() throws Exception { @@ -80,8 +82,7 @@ public class TestDelayedAutoRebalance extends ZkTestBase { _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); _controller.syncStart(); - _clusterVerifier = - new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build(); + _clusterVerifier = getClusterVerifier(); enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); } @@ -123,7 +124,8 @@ public class TestDelayedAutoRebalance extends ZkTestBase { // bring down another node, the minimal active replica for each partition should be maintained. _participants.get(3).syncStop(); - Thread.sleep(500); + Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME); + Assert.assertTrue(_clusterVerifier.verifyByPolling()); for (String db : _testDBs) { ExternalView ev = _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); @@ -141,10 +143,11 @@ public class TestDelayedAutoRebalance extends ZkTestBase { enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); long delay = 4000; - Map<String, ExternalView> externalViewsBefore = createTestDBs(delay); + setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, delay); + Map<String, ExternalView> externalViewsBefore = createTestDBs(-1); validateDelayedMovements(externalViewsBefore); - Thread.sleep(delay + 200); + Thread.sleep(delay + DEFAULT_REBALANCE_PROCESSING_WAIT_TIME); Assert.assertTrue(_clusterVerifier.verifyByPolling()); // after delay time, it should maintain required number of replicas. for (String db : _testDBs) { @@ -157,7 +160,8 @@ public class TestDelayedAutoRebalance extends ZkTestBase { @Test (dependsOnMethods = {"testMinimalActiveReplicaMaintain"}) public void testDisableDelayRebalanceInResource() throws Exception { - Map<String, ExternalView> externalViewsBefore = createTestDBs(1000000); + setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, 1000000); + Map<String, ExternalView> externalViewsBefore = createTestDBs(-1); validateDelayedMovements(externalViewsBefore); // disable delay rebalance for one db, partition should be moved immediately @@ -166,7 +170,7 @@ public class TestDelayedAutoRebalance extends ZkTestBase { CLUSTER_NAME, testDb); idealState.setDelayRebalanceEnabled(false); _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, testDb, idealState); - + Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME); Assert.assertTrue(_clusterVerifier.verifyByPolling()); // once delay rebalance is disabled, it should maintain required number of replicas for that db. @@ -190,13 +194,13 @@ public class TestDelayedAutoRebalance extends ZkTestBase { @Test (dependsOnMethods = {"testDisableDelayRebalanceInResource"}) public void testDisableDelayRebalanceInCluster() throws Exception { enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true); - - Map<String, ExternalView> externalViewsBefore = createTestDBs(1000000); + setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, 1000000); + Map<String, ExternalView> externalViewsBefore = createTestDBs(-1); validateDelayedMovements(externalViewsBefore); // disable delay rebalance for the entire cluster. enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, false); - Thread.sleep(100); + Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME); Assert.assertTrue(_clusterVerifier.verifyByPolling()); for (String db : _testDBs) { ExternalView ev = @@ -210,13 +214,14 @@ public class TestDelayedAutoRebalance extends ZkTestBase { @Test (dependsOnMethods = {"testDisableDelayRebalanceInCluster"}) public void testDisableDelayRebalanceInInstance() throws Exception { - Map<String, ExternalView> externalViewsBefore = createTestDBs(1000000); + setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, 1000000); + Map<String, ExternalView> externalViewsBefore = createTestDBs(-1); validateDelayedMovements(externalViewsBefore); String disabledInstanceName = _participants.get(0).getInstanceName(); enableDelayRebalanceInInstance(_gZkClient, CLUSTER_NAME, disabledInstanceName, false); + Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME); Assert.assertTrue(_clusterVerifier.verifyByPolling()); - for (String db : _testDBs) { IdealState is = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); Map<String, List<String>> preferenceLists = is.getPreferenceLists(); @@ -234,7 +239,7 @@ public class TestDelayedAutoRebalance extends ZkTestBase { _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, db); } _testDBs.clear(); - Thread.sleep(50); + Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME); } @BeforeMethod @@ -249,17 +254,21 @@ public class TestDelayedAutoRebalance extends ZkTestBase { } } + protected ZkHelixClusterVerifier getClusterVerifier() { + return new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build(); + } + // create test DBs, wait it converged and return externalviews protected Map<String, ExternalView> createTestDBs(long delayTime) throws InterruptedException { Map<String, ExternalView> externalViews = new HashMap<String, ExternalView>(); int i = 0; for (String stateModel : TestStateModels) { String db = "Test-DB-" + i++; - createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica, + createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _minActiveReplica, delayTime, CrushRebalanceStrategy.class.getName()); _testDBs.add(db); } - Thread.sleep(100); + Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME); Assert.assertTrue(_clusterVerifier.verifyByPolling()); for (String db : _testDBs) { ExternalView ev = @@ -302,7 +311,7 @@ public class TestDelayedAutoRebalance extends ZkTestBase { private void validateDelayedMovements(Map<String, ExternalView> externalViewsBefore) throws InterruptedException { _participants.get(0).syncStop(); - Thread.sleep(100); + Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME); Assert.assertTrue(_clusterVerifier.verifyByPolling()); for (String db : _testDBs) { diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java index 746bdf3..145148f 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java @@ -21,7 +21,6 @@ package org.apache.helix.integration.rebalancer.DelayedAutoRebalancer; import java.util.Map; import org.apache.helix.ConfigAccessor; -import org.apache.helix.controller.rebalancer.util.RebalanceScheduler; import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; @@ -56,7 +55,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut String instance = _participants.get(0).getInstanceName(); enableInstance(instance, false); - Thread.sleep(300); + Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME); Assert.assertTrue(_clusterVerifier.verifyByPolling()); for (String db : _testDBs) { @@ -79,7 +78,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut String instance = _participants.get(0).getInstanceName(); enableInstance(instance, false); - Thread.sleep(100); + Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME); Assert.assertTrue(_clusterVerifier.verifyByPolling()); for (String db : _testDBs) { @@ -106,7 +105,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut // disable one node, no partition should be moved. enableInstance(_participants.get(0).getInstanceName(), false); - Thread.sleep(100); + Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME); Assert.assertTrue(_clusterVerifier.verifyByPolling()); for (String db : _testDBs) { @@ -120,7 +119,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut // disable another node, the minimal active replica for each partition should be maintained. enableInstance(_participants.get(3).getInstanceName(), false); - Thread.sleep(1000); + Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME); Assert.assertTrue(_clusterVerifier.verifyByPolling()); for (String db : _testDBs) { @@ -143,7 +142,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut // disable one node, no partition should be moved. enableInstance(_participants.get(0).getInstanceName(), false); - Thread.sleep(100); + Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME); Assert.assertTrue(_clusterVerifier.verifyByPolling()); for (String db : _testDBs) { @@ -157,7 +156,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut // bring down another node, the minimal active replica for each partition should be maintained. _participants.get(3).syncStop(); - Thread.sleep(100); + Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME); Assert.assertTrue(_clusterVerifier.verifyByPolling()); for (String db : _testDBs) { @@ -178,11 +177,12 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); long delay = 10000; - Map<String, ExternalView> externalViewsBefore = createTestDBs(delay); + setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, delay); + Map<String, ExternalView> externalViewsBefore = createTestDBs(-1); // disable one node, no partition should be moved. enableInstance(_participants.get(0).getInstanceName(), false); - Thread.sleep(100); + Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME); Assert.assertTrue(_clusterVerifier.verifyByPolling()); for (String db : _testDBs) { ExternalView ev = @@ -193,7 +193,8 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut _participants.get(0).getInstanceName(), true); } - Thread.sleep(delay + 500); + Thread.sleep(delay + DEFAULT_REBALANCE_PROCESSING_WAIT_TIME); + Assert.assertTrue(_clusterVerifier.verifyByPolling()); // after delay time, it should maintain required number of replicas. for (String db : _testDBs) { ExternalView ev = @@ -210,7 +211,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut // disable one node, no partition should be moved. enableInstance(_participants.get(0).getInstanceName(), false); - Thread.sleep(100); + Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME); Assert.assertTrue(_clusterVerifier.verifyByPolling()); for (String db : _testDBs) { @@ -228,7 +229,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut CLUSTER_NAME, testDb); idealState.setDelayRebalanceEnabled(false); _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, testDb, idealState); - Thread.sleep(2000); + Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME); Assert.assertTrue(_clusterVerifier.verifyByPolling()); // once delay rebalance is disabled, it should maintain required number of replicas for that db. @@ -253,12 +254,12 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut @Override public void testDisableDelayRebalanceInCluster() throws Exception { enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true); - - Map<String, ExternalView> externalViewsBefore = createTestDBs(1000000); + setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, 1000000); + Map<String, ExternalView> externalViewsBefore = createTestDBs(-1); // disable one node, no partition should be moved. enableInstance(_participants.get(0).getInstanceName(), false); - Thread.sleep(100); + Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME); Assert.assertTrue(_clusterVerifier.verifyByPolling()); for (String db : _testDBs) { @@ -272,7 +273,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut // disable delay rebalance for the entire cluster. enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, false); - Thread.sleep(2000); + Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME); Assert.assertTrue(_clusterVerifier.verifyByPolling()); for (String db : _testDBs) { ExternalView ev = diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithRackaware.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithRackaware.java index f768684..f85f07f 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithRackaware.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithRackaware.java @@ -29,7 +29,7 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; public class TestDelayedAutoRebalanceWithRackaware extends TestDelayedAutoRebalance { - final int NUM_NODE = 9; + static final int NUM_NODE = 9; @BeforeClass public void beforeClass() throws Exception { @@ -58,8 +58,7 @@ public class TestDelayedAutoRebalanceWithRackaware extends TestDelayedAutoRebala _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); _controller.syncStart(); - _clusterVerifier = - new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build(); + _clusterVerifier = getClusterVerifier(); } @Override diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java index 76560e9..33dab8d 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java @@ -25,16 +25,15 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; + import org.apache.helix.ConfigAccessor; -import org.apache.helix.HelixDataAccessor; import org.apache.helix.NotificationContext; +import org.apache.helix.TestHelper; import org.apache.helix.common.ZkTestBase; import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy; import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy; -import org.apache.helix.controller.rebalancer.util.RebalanceScheduler; import org.apache.helix.integration.manager.ClusterControllerManager; import org.apache.helix.integration.manager.MockParticipantManager; -import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.mock.participant.MockMSModelFactory; import org.apache.helix.mock.participant.MockMSStateModel; import org.apache.helix.mock.participant.MockTransition; @@ -49,6 +48,7 @@ import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; import org.testng.Assert; import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -60,13 +60,13 @@ public class TestMixedModeAutoRebalance extends ZkTestBase { private final String CLASS_NAME = getShortClassName(); private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; - private ClusterControllerManager _controller; + protected static final String DB_NAME = "Test-DB"; + private ClusterControllerManager _controller; private List<MockParticipantManager> _participants = new ArrayList<>(); private int _replica = 3; private ZkHelixClusterVerifier _clusterVerifier; private ConfigAccessor _configAccessor; - private HelixDataAccessor _dataAccessor; @BeforeClass public void beforeClass() throws Exception { @@ -90,13 +90,11 @@ public class TestMixedModeAutoRebalance extends ZkTestBase { _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); _controller.syncStart(); - _clusterVerifier = - new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build(); + _clusterVerifier = getClusterVerifier(); enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); _configAccessor = new ConfigAccessor(_gZkClient); - _dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor); } @DataProvider(name = "stateModels") @@ -112,19 +110,28 @@ public class TestMixedModeAutoRebalance extends ZkTestBase { }; } - @Test(dataProvider = "stateModels") - public void testUserDefinedPreferenceListsInFullAuto( - String stateModel, boolean delayEnabled, String rebalanceStrateyName) throws Exception { - String db = "Test-DB-" + stateModel; + protected ZkHelixClusterVerifier getClusterVerifier() { + return new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build(); + } + + protected void createResource(String stateModel, int numPartition, int replica, + boolean delayEnabled, String rebalanceStrategy) { if (delayEnabled) { - createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica, - _replica - 1, 200, rebalanceStrateyName); + createResourceWithDelayedRebalance(CLUSTER_NAME, DB_NAME, stateModel, numPartition, replica, + replica - 1, 200, rebalanceStrategy); } else { - createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica, - _replica, 0, rebalanceStrateyName); + createResourceWithDelayedRebalance(CLUSTER_NAME, DB_NAME, stateModel, numPartition, replica, + replica, 0, rebalanceStrategy); } + } + + @Test(dataProvider = "stateModels") + public void testUserDefinedPreferenceListsInFullAuto(String stateModel, boolean delayEnabled, + String rebalanceStrateyName) throws Exception { + createResource(stateModel, _PARTITIONS, _replica, delayEnabled, + rebalanceStrateyName); IdealState idealState = - _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, DB_NAME); Map<String, List<String>> userDefinedPreferenceLists = idealState.getPreferenceLists(); List<String> userDefinedPartitions = new ArrayList<>(); for (String partition : userDefinedPreferenceLists.keySet()) { @@ -138,33 +145,34 @@ public class TestMixedModeAutoRebalance extends ZkTestBase { } ResourceConfig resourceConfig = - new ResourceConfig.Builder(db).setPreferenceLists(userDefinedPreferenceLists).build(); - _configAccessor.setResourceConfig(CLUSTER_NAME, db, resourceConfig); + new ResourceConfig.Builder(DB_NAME).setPreferenceLists(userDefinedPreferenceLists).build(); + _configAccessor.setResourceConfig(CLUSTER_NAME, DB_NAME, resourceConfig); - Assert.assertTrue(_clusterVerifier.verify(1000)); - verifyUserDefinedPreferenceLists(db, userDefinedPreferenceLists, userDefinedPartitions); + Assert.assertTrue(_clusterVerifier.verify(3000)); + verifyUserDefinedPreferenceLists(DB_NAME, userDefinedPreferenceLists, userDefinedPartitions); while (userDefinedPartitions.size() > 0) { - IdealState originIS = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + IdealState originIS = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, + DB_NAME); Set<String> nonUserDefinedPartitions = new HashSet<>(originIS.getPartitionSet()); nonUserDefinedPartitions.removeAll(userDefinedPartitions); - removePartitionFromUserDefinedList(db, userDefinedPartitions); - Assert.assertTrue(_clusterVerifier.verify(1000)); - verifyUserDefinedPreferenceLists(db, userDefinedPreferenceLists, userDefinedPartitions); - verifyNonUserDefinedAssignment(db, originIS, nonUserDefinedPartitions); + removePartitionFromUserDefinedList(DB_NAME, userDefinedPartitions); + // TODO: Remove wait once we enable the BestPossibleExternalViewVerifier for the WAGED rebalancer. + Thread.sleep(1000); + Assert.assertTrue(_clusterVerifier.verify(3000)); + verifyUserDefinedPreferenceLists(DB_NAME, userDefinedPreferenceLists, userDefinedPartitions); + verifyNonUserDefinedAssignment(DB_NAME, originIS, nonUserDefinedPartitions); } } @Test public void testUserDefinedPreferenceListsInFullAutoWithErrors() throws Exception { - String db = "Test-DB-1"; - createResourceWithDelayedRebalance(CLUSTER_NAME, db, - BuiltInStateModelDefinitions.MasterSlave.name(), 5, _replica, _replica, 0, - CrushRebalanceStrategy.class.getName()); + createResource(BuiltInStateModelDefinitions.MasterSlave.name(), 5, _replica, + false, CrushRebalanceStrategy.class.getName()); IdealState idealState = - _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, DB_NAME); Map<String, List<String>> userDefinedPreferenceLists = idealState.getPreferenceLists(); List<String> newNodes = new ArrayList<>(); @@ -187,13 +195,28 @@ public class TestMixedModeAutoRebalance extends ZkTestBase { } ResourceConfig resourceConfig = - new ResourceConfig.Builder(db).setPreferenceLists(userDefinedPreferenceLists).build(); - _configAccessor.setResourceConfig(CLUSTER_NAME, db, resourceConfig); + new ResourceConfig.Builder(DB_NAME).setPreferenceLists(userDefinedPreferenceLists).build(); + _configAccessor.setResourceConfig(CLUSTER_NAME, DB_NAME, resourceConfig); + + TestHelper.verify(() -> { + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, DB_NAME); + if (ev != null) { + for (String partition : ev.getPartitionSet()) { + Map<String, String> stateMap = ev.getStateMap(partition); + if (stateMap.values().contains("ERROR")) { + return true; + } + } + } + return false; + }, 2000); + Assert.assertTrue(_clusterVerifier.verify(3000)); - Thread.sleep(1000); ExternalView ev = - _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); - IdealState is = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, DB_NAME); + IdealState is = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, + DB_NAME); validateMinActiveAndTopStateReplica(is, ev, _replica, NUM_NODE); } @@ -238,6 +261,12 @@ public class TestMixedModeAutoRebalance extends ZkTestBase { _configAccessor.setResourceConfig(CLUSTER_NAME, db, resourceConfig); } + @AfterMethod + public void afterMethod() { + _gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, DB_NAME); + getClusterVerifier().verify(5000); + } + @AfterClass public void afterClass() throws Exception { /** diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestZeroReplicaAvoidance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestZeroReplicaAvoidance.java index ab4a263..7090cbf 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestZeroReplicaAvoidance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestZeroReplicaAvoidance.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Map; + import org.apache.helix.ExternalViewChangeListener; import org.apache.helix.HelixManager; import org.apache.helix.HelixManagerFactory; @@ -39,10 +40,11 @@ import org.apache.helix.model.IdealState; import org.apache.helix.model.Message; import org.apache.helix.model.StateModelDefinition; import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; +import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier; import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; public class TestZeroReplicaAvoidance extends ZkTestBase @@ -53,16 +55,13 @@ public class TestZeroReplicaAvoidance extends ZkTestBase private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; private List<MockParticipantManager> _participants = new ArrayList<>(); - private ZkHelixClusterVerifier _clusterVerifier; private boolean _testSuccess = true; private boolean _startListen = false; private ClusterControllerManager _controller; - @BeforeClass - public void beforeClass() throws Exception { - System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); - + @BeforeMethod + public void beforeMethod() { _gSetupTool.addCluster(CLUSTER_NAME, true); for (int i = 0; i < NUM_NODE; i++) { String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); @@ -77,13 +76,11 @@ public class TestZeroReplicaAvoidance extends ZkTestBase String controllerName = CONTROLLER_PREFIX + "_0"; _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); _controller.syncStart(); - - _clusterVerifier = - new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build(); } - @AfterClass - public void afterClass() { + @AfterMethod + public void afterMethod() { + _startListen = false; if (_controller != null && _controller.isConnected()) { _controller.syncStop(); } @@ -92,6 +89,7 @@ public class TestZeroReplicaAvoidance extends ZkTestBase participant.syncStop(); } } + _participants.clear(); deleteCluster(CLUSTER_NAME); } @@ -102,7 +100,8 @@ public class TestZeroReplicaAvoidance extends ZkTestBase }; @Test - public void test() throws Exception { + public void testDelayedRebalancer() throws Exception { + System.out.println("START testDelayedRebalancer at " + new Date(System.currentTimeMillis())); HelixManager manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, null, InstanceType.SPECTATOR, ZK_ADDR); manager.connect(); @@ -123,7 +122,51 @@ public class TestZeroReplicaAvoidance extends ZkTestBase createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, partition, replica, replica, 0); } - Assert.assertTrue(_clusterVerifier.verifyByPolling(50000L, 100L)); + ZkHelixClusterVerifier clusterVerifier = + new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build(); + Assert.assertTrue(clusterVerifier.verifyByPolling(50000L, 100L)); + + _startListen = true; + DelayedTransition.setDelay(5); + + // add the other half of nodes. + for (; i < NUM_NODE; i++) { + _participants.get(i).syncStart(); + } + Assert.assertTrue(clusterVerifier.verify(70000L)); + Assert.assertTrue(_testSuccess); + + if (manager.isConnected()) { + manager.disconnect(); + } + System.out.println("END testDelayedRebalancer at " + new Date(System.currentTimeMillis())); + } + + @Test + public void testWagedRebalancer() throws Exception { + System.out.println("START testWagedRebalancer at " + new Date(System.currentTimeMillis())); + HelixManager manager = + HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, null, InstanceType.SPECTATOR, ZK_ADDR); + manager.connect(); + manager.addExternalViewChangeListener(this); + manager.addIdealStateChangeListener(this); + enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); + + // Start half number of nodes. + int i = 0; + for (; i < NUM_NODE / 2; i++) { + _participants.get(i).syncStart(); + } + + int replica = 3; + int partition = 30; + for (String stateModel : TestStateModels) { + String db = "Test-DB-" + stateModel; + createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, partition, replica, replica); + } + ZkHelixClusterVerifier clusterVerifier = + new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build(); + Assert.assertTrue(clusterVerifier.verifyByPolling(50000L, 100L)); _startListen = true; DelayedTransition.setDelay(5); @@ -132,12 +175,13 @@ public class TestZeroReplicaAvoidance extends ZkTestBase for (; i < NUM_NODE; i++) { _participants.get(i).syncStart(); } - Assert.assertTrue(_clusterVerifier.verify(70000L)); + Assert.assertTrue(clusterVerifier.verify(70000L)); Assert.assertTrue(_testSuccess); if (manager.isConnected()) { manager.disconnect(); } + System.out.println("END testWagedRebalancer at " + new Date(System.currentTimeMillis())); } /** diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java new file mode 100644 index 0000000..8587f40 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java @@ -0,0 +1,102 @@ +package org.apache.helix.integration.rebalancer.WagedRebalancer; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.helix.integration.rebalancer.DelayedAutoRebalancer.TestDelayedAutoRebalance; +import org.apache.helix.model.ExternalView; +import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier; +import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; +import org.testng.Assert; +import org.testng.annotations.Test; + +/** + * Inherit TestDelayedAutoRebalance to ensure the test logic is the same. + */ +public class TestDelayedWagedRebalance extends TestDelayedAutoRebalance { + protected ZkHelixClusterVerifier getClusterVerifier() { + Set<String> dbNames = new HashSet<>(); + int i = 0; + for (String stateModel : TestStateModels) { + dbNames.add("Test-DB-" + i++); + } + return new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setResources(dbNames) + .setZkAddr(ZK_ADDR).build(); + } + + // create test DBs, wait it converged and return externalviews + protected Map<String, ExternalView> createTestDBs(long delayTime) throws InterruptedException { + Map<String, ExternalView> externalViews = new HashMap<String, ExternalView>(); + int i = 0; + for (String stateModel : TestStateModels) { + String db = "Test-DB-" + i++; + createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, + _minActiveReplica); + _testDBs.add(db); + } + Thread.sleep(100); + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + for (String db : _testDBs) { + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + externalViews.put(db, ev); + } + return externalViews; + } + + @Test + public void testDelayedPartitionMovement() { + // Waged Rebalancer takes cluster level delay config only. Skip this test. + } + + @Test + public void testDisableDelayRebalanceInResource() { + // Waged Rebalancer takes cluster level delay config only. Skip this test. + } + + @Test(dependsOnMethods = { "testDelayedPartitionMovement" }) + public void testDelayedPartitionMovementWithClusterConfigedDelay() throws Exception { + super.testDelayedPartitionMovementWithClusterConfigedDelay(); + } + + @Test(dependsOnMethods = { "testDelayedPartitionMovementWithClusterConfigedDelay" }) + public void testMinimalActiveReplicaMaintain() throws Exception { + super.testMinimalActiveReplicaMaintain(); + } + + @Test(dependsOnMethods = { "testMinimalActiveReplicaMaintain" }) + public void testPartitionMovementAfterDelayTime() throws Exception { + super.testPartitionMovementAfterDelayTime(); + } + + @Test(dependsOnMethods = { "testDisableDelayRebalanceInResource" }) + public void testDisableDelayRebalanceInCluster() throws Exception { + super.testDisableDelayRebalanceInCluster(); + } + + @Test(dependsOnMethods = { "testDisableDelayRebalanceInCluster" }) + public void testDisableDelayRebalanceInInstance() throws Exception { + super.testDisableDelayRebalanceInInstance(); + } +} diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java new file mode 100644 index 0000000..fab254c --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java @@ -0,0 +1,103 @@ +package org.apache.helix.integration.rebalancer.WagedRebalancer; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.helix.integration.rebalancer.DelayedAutoRebalancer.TestDelayedAutoRebalanceWithDisabledInstance; +import org.apache.helix.model.ExternalView; +import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier; +import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; +import org.testng.Assert; +import org.testng.annotations.Test; + +/** + * Inherit TestDelayedAutoRebalanceWithDisabledInstance to ensure the test logic is the same. + */ +public class TestDelayedWagedRebalanceWithDisabledInstance + extends TestDelayedAutoRebalanceWithDisabledInstance { + protected ZkHelixClusterVerifier getClusterVerifier() { + Set<String> dbNames = new HashSet<>(); + int i = 0; + for (String stateModel : TestStateModels) { + dbNames.add("Test-DB-" + i++); + } + return new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setResources(dbNames) + .setZkAddr(ZK_ADDR).build(); + } + + // create test DBs, wait it converged and return externalviews + protected Map<String, ExternalView> createTestDBs(long delayTime) throws InterruptedException { + Map<String, ExternalView> externalViews = new HashMap<String, ExternalView>(); + int i = 0; + for (String stateModel : TestStateModels) { + String db = "Test-DB-" + i++; + createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, + _minActiveReplica); + _testDBs.add(db); + } + Thread.sleep(100); + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + for (String db : _testDBs) { + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + externalViews.put(db, ev); + } + return externalViews; + } + + @Test + public void testDelayedPartitionMovement() { + // Waged Rebalancer takes cluster level delay config only. Skip this test. + } + + @Test + public void testDisableDelayRebalanceInResource() { + // Waged Rebalancer takes cluster level delay config only. Skip this test. + } + + @Test(dependsOnMethods = { "testDelayedPartitionMovement" }) + public void testDelayedPartitionMovementWithClusterConfigedDelay() throws Exception { + super.testDelayedPartitionMovementWithClusterConfigedDelay(); + } + + @Test(dependsOnMethods = { "testDelayedPartitionMovementWithClusterConfigedDelay" }) + public void testMinimalActiveReplicaMaintain() throws Exception { + super.testMinimalActiveReplicaMaintain(); + } + + @Test(dependsOnMethods = { "testMinimalActiveReplicaMaintain" }) + public void testPartitionMovementAfterDelayTime() throws Exception { + super.testPartitionMovementAfterDelayTime(); + } + + @Test(dependsOnMethods = { "testDisableDelayRebalanceInResource" }) + public void testDisableDelayRebalanceInCluster() throws Exception { + super.testDisableDelayRebalanceInCluster(); + } + + @Test(dependsOnMethods = { "testDisableDelayRebalanceInCluster" }) + public void testDisableDelayRebalanceInInstance() throws Exception { + super.testDisableDelayRebalanceInInstance(); + } +} diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithRackaware.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithRackaware.java new file mode 100644 index 0000000..4791e6e --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithRackaware.java @@ -0,0 +1,102 @@ +package org.apache.helix.integration.rebalancer.WagedRebalancer; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.helix.integration.rebalancer.DelayedAutoRebalancer.TestDelayedAutoRebalanceWithRackaware; +import org.apache.helix.model.ExternalView; +import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier; +import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; +import org.testng.Assert; +import org.testng.annotations.Test; + +/** + * Inherit TestDelayedAutoRebalanceWithRackaware to ensure the test logic is the same. + */ +public class TestDelayedWagedRebalanceWithRackaware extends TestDelayedAutoRebalanceWithRackaware { + protected ZkHelixClusterVerifier getClusterVerifier() { + Set<String> dbNames = new HashSet<>(); + int i = 0; + for (String stateModel : TestStateModels) { + dbNames.add("Test-DB-" + i++); + } + return new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setResources(dbNames) + .setZkAddr(ZK_ADDR).build(); + } + + // create test DBs, wait it converged and return externalviews + protected Map<String, ExternalView> createTestDBs(long delayTime) throws InterruptedException { + Map<String, ExternalView> externalViews = new HashMap<String, ExternalView>(); + int i = 0; + for (String stateModel : TestStateModels) { + String db = "Test-DB-" + i++; + createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, + _minActiveReplica); + _testDBs.add(db); + } + Thread.sleep(100); + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + for (String db : _testDBs) { + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + externalViews.put(db, ev); + } + return externalViews; + } + + @Test + public void testDelayedPartitionMovement() { + // Waged Rebalancer takes cluster level delay config only. Skip this test. + } + + @Test + public void testDisableDelayRebalanceInResource() { + // Waged Rebalancer takes cluster level delay config only. Skip this test. + } + + @Test(dependsOnMethods = { "testDelayedPartitionMovement" }) + public void testDelayedPartitionMovementWithClusterConfigedDelay() throws Exception { + super.testDelayedPartitionMovementWithClusterConfigedDelay(); + } + + @Test(dependsOnMethods = { "testDelayedPartitionMovementWithClusterConfigedDelay" }) + public void testMinimalActiveReplicaMaintain() throws Exception { + super.testMinimalActiveReplicaMaintain(); + } + + @Test(dependsOnMethods = { "testMinimalActiveReplicaMaintain" }) + public void testPartitionMovementAfterDelayTime() throws Exception { + super.testPartitionMovementAfterDelayTime(); + } + + @Test(dependsOnMethods = { "testDisableDelayRebalanceInResource" }) + public void testDisableDelayRebalanceInCluster() throws Exception { + super.testDisableDelayRebalanceInCluster(); + } + + @Test(dependsOnMethods = { "testDisableDelayRebalanceInCluster" }) + public void testDisableDelayRebalanceInInstance() throws Exception { + super.testDisableDelayRebalanceInInstance(); + } +} diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestMixedModeWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestMixedModeWagedRebalance.java new file mode 100644 index 0000000..7087dfc --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestMixedModeWagedRebalance.java @@ -0,0 +1,66 @@ +package org.apache.helix.integration.rebalancer.WagedRebalancer; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Collections; +import org.apache.helix.integration.rebalancer.TestMixedModeAutoRebalance; +import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier; +import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.DataProvider; + +public class TestMixedModeWagedRebalance extends TestMixedModeAutoRebalance { + private final String CLASS_NAME = getShortClassName(); + private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; + + @DataProvider(name = "stateModels") + public static Object[][] stateModels() { + return new Object[][] { { BuiltInStateModelDefinitions.MasterSlave.name(), true, null }, + { BuiltInStateModelDefinitions.OnlineOffline.name(), true, null }, + { BuiltInStateModelDefinitions.LeaderStandby.name(), true, null }, + { BuiltInStateModelDefinitions.MasterSlave.name(), false, null }, + { BuiltInStateModelDefinitions.OnlineOffline.name(), false, null }, + { BuiltInStateModelDefinitions.LeaderStandby.name(), false, null } + }; + } + + protected ZkHelixClusterVerifier getClusterVerifier() { + return new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) + .setResources(Collections.singleton(DB_NAME)).build(); + } + + protected void createResource(String stateModel, int numPartition, + int replica, boolean delayEnabled, String rebalanceStrategy) { + if (delayEnabled) { + setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, 200); + createResourceWithWagedRebalance(CLUSTER_NAME, DB_NAME, stateModel, numPartition, replica, + replica - 1); + } else { + createResourceWithWagedRebalance(CLUSTER_NAME, DB_NAME, stateModel, numPartition, replica, replica); + } + } + + @AfterMethod + public void afterMethod() { + super.afterMethod(); + setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, -1); + } +} diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java index fb5375c..37c1229 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java @@ -109,8 +109,7 @@ public class TestWagedRebalance extends ZkTestBase { int i = 0; for (String stateModel : _testModels) { String db = "Test-DB-" + i++; - createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica, - -1); + createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); _allDBs.add(db); } @@ -123,7 +122,7 @@ public class TestWagedRebalance extends ZkTestBase { for (String stateModel : _testModels) { String moreDB = "More-Test-DB-" + i++; createResourceWithWagedRebalance(CLUSTER_NAME, moreDB, stateModel, PARTITIONS, _replica, - _replica, -1); + _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, moreDB, _replica); _allDBs.add(moreDB); @@ -151,7 +150,7 @@ public class TestWagedRebalance extends ZkTestBase { for (String tag : tags) { String db = "Test-DB-" + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, - BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica, -1); + BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica); IdealState is = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); is.setInstanceGroupTag(tag); @@ -167,7 +166,7 @@ public class TestWagedRebalance extends ZkTestBase { public void testChangeIdealState() throws InterruptedException { String dbName = "Test-DB"; createResourceWithWagedRebalance(CLUSTER_NAME, dbName, - BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica, -1); + BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, _replica); _allDBs.add(dbName); Thread.sleep(300); @@ -201,7 +200,7 @@ public class TestWagedRebalance extends ZkTestBase { public void testDisableInstance() throws InterruptedException { String dbName = "Test-DB"; createResourceWithWagedRebalance(CLUSTER_NAME, dbName, - BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica, -1); + BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, _replica); _allDBs.add(dbName); Thread.sleep(300); @@ -256,8 +255,8 @@ public class TestWagedRebalance extends ZkTestBase { int j = 0; for (String stateModel : _testModels) { String db = "Test-DB-" + j++; - createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica, - -1); + createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, + _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); _allDBs.add(db); } @@ -295,8 +294,8 @@ public class TestWagedRebalance extends ZkTestBase { int j = 0; for (String stateModel : _testModels) { String db = "Test-DB-" + j++; - createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica, - -1); + createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, + _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); _allDBs.add(db); } @@ -334,7 +333,7 @@ public class TestWagedRebalance extends ZkTestBase { IdealState.RebalanceMode.FULL_AUTO + "", CrushEdRebalanceStrategy.class.getName()); } else { createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, - _replica, -1); + _replica); } _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); _allDBs.add(db); @@ -357,7 +356,7 @@ public class TestWagedRebalance extends ZkTestBase { for (String stateModel : _testModels) { String db = "Test-DB-" + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, - _replica, -1); + _replica); if (i == 1) { // The limited resource has additional limitation, so even the other resources can be assigned // later, this resource will still be blocked by the max partition limitation. diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java index 0b020db..84c6ac4 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java @@ -112,7 +112,7 @@ public class TestWagedRebalanceFaultZone extends ZkTestBase { for (String stateModel : _testModels) { String db = "Test-DB-" + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, - _replica, -1); + _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); _allDBs.add(db); } @@ -128,7 +128,7 @@ public class TestWagedRebalanceFaultZone extends ZkTestBase { for (String tag : tags) { String db = "Test-DB-" + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, - BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica, -1); + BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica); IdealState is = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); is.setInstanceGroupTag(tag); @@ -156,7 +156,7 @@ public class TestWagedRebalanceFaultZone extends ZkTestBase { for (String stateModel : _testModels) { String db = "Test-DB-" + j++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, - _replica, -1); + _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); _allDBs.add(db); } @@ -198,7 +198,7 @@ public class TestWagedRebalanceFaultZone extends ZkTestBase { for (String stateModel : _testModels) { String db = "Test-DB-" + j++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, - _replica, -1); + _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); _allDBs.add(db); } @@ -230,7 +230,7 @@ public class TestWagedRebalanceFaultZone extends ZkTestBase { for (String stateModel : _testModels) { String db = "Test-DB-" + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, - _replica, -1); + _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); _allDBs.add(db); } diff --git a/helix-core/src/test/java/org/apache/helix/tools/TestClusterVerifier.java b/helix-core/src/test/java/org/apache/helix/tools/TestClusterVerifier.java index ca6b6b6..e1ecdee 100644 --- a/helix-core/src/test/java/org/apache/helix/tools/TestClusterVerifier.java +++ b/helix-core/src/test/java/org/apache/helix/tools/TestClusterVerifier.java @@ -125,19 +125,42 @@ public class TestClusterVerifier extends ZkUnitTestBase { new BestPossibleExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient).build(); Assert.assertTrue(bestPossibleVerifier.verify(10000)); + // Disable partition for 1 instance, then Full-Auto ExternalView should not match IdealState. + _admin.enablePartition(false, _clusterName, _participants[0].getInstanceName(), + FULL_AUTO_RESOURCES[0], Lists.newArrayList(FULL_AUTO_RESOURCES[0] + "_0")); + Thread.sleep(1000); + Assert.assertTrue(bestPossibleVerifier.verify(3000)); + + // Enable the partition back + _admin.enablePartition(true, _clusterName, _participants[0].getInstanceName(), + FULL_AUTO_RESOURCES[0], Lists.newArrayList(FULL_AUTO_RESOURCES[0] + "_0")); + Thread.sleep(1000); + Assert.assertTrue(bestPossibleVerifier.verify(10000)); + + // Make 1 instance non-live + _participants[0].syncStop(); + Thread.sleep(1000); + Assert.assertTrue(bestPossibleVerifier.verify(10000)); + + // Recover the participant before next test + String id = _participants[0].getInstanceName(); + _participants[0] = new MockParticipantManager(ZK_ADDR, _clusterName, id); + _participants[0].syncStart(); + HelixClusterVerifier strictMatchVerifier = - new StrictMatchExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient).build(); + new StrictMatchExternalViewVerifier.Builder(_clusterName) + .setResources(Sets.newHashSet(RESOURCES)).setZkClient(_gZkClient).build(); Assert.assertTrue(strictMatchVerifier.verify(10000)); // Disable partition for 1 instance, then Full-Auto ExternalView should not match IdealState. - _admin.enablePartition(false, _clusterName, _participants[0].getInstanceName(), FULL_AUTO_RESOURCES[0], - Lists.newArrayList(FULL_AUTO_RESOURCES[0] + "_0")); + _admin.enablePartition(false, _clusterName, _participants[0].getInstanceName(), + FULL_AUTO_RESOURCES[0], Lists.newArrayList(FULL_AUTO_RESOURCES[0] + "_0")); Thread.sleep(1000); - Assert.assertFalse(strictMatchVerifier.verify(3000)); + Assert.assertTrue(strictMatchVerifier.verify(3000)); // Enable the partition back - _admin.enablePartition(true, _clusterName, _participants[0].getInstanceName(), FULL_AUTO_RESOURCES[0], - Lists.newArrayList(FULL_AUTO_RESOURCES[0] + "_0")); + _admin.enablePartition(true, _clusterName, _participants[0].getInstanceName(), + FULL_AUTO_RESOURCES[0], Lists.newArrayList(FULL_AUTO_RESOURCES[0] + "_0")); Thread.sleep(1000); Assert.assertTrue(strictMatchVerifier.verify(10000)); @@ -148,14 +171,16 @@ public class TestClusterVerifier extends ZkUnitTestBase { // Semi-Auto ExternalView should not match IdealState for (String resource : SEMI_AUTO_RESOURCES) { System.out.println("Un-verify resource: " + resource); - strictMatchVerifier = new StrictMatchExternalViewVerifier.Builder(_clusterName) - .setZkClient(_gZkClient).setResources(Sets.newHashSet(resource)).build(); + strictMatchVerifier = + new StrictMatchExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient) + .setResources(Sets.newHashSet(resource)).build(); Assert.assertFalse(strictMatchVerifier.verify(3000)); } // Full-Auto still match, because preference list wouldn't contain non-live instances - strictMatchVerifier = new StrictMatchExternalViewVerifier.Builder(_clusterName) - .setZkClient(_gZkClient).setResources(Sets.newHashSet(FULL_AUTO_RESOURCES)).build(); + strictMatchVerifier = + new StrictMatchExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient) + .setResources(Sets.newHashSet(FULL_AUTO_RESOURCES)).build(); Assert.assertTrue(strictMatchVerifier.verify(10000)); }
