Add state transition throttling logic into intermediateStateCalcStage.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/4e487196 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/4e487196 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/4e487196 Branch: refs/heads/master Commit: 4e4871967db07cee191debb9d26bfcd53c401945 Parents: 79ebc04 Author: Lei Xia <[email protected]> Authored: Fri Jan 6 16:31:38 2017 -0800 Committer: Lei Xia <[email protected]> Committed: Mon Oct 2 19:06:26 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/helix/ConfigAccessor.java | 14 +- .../config/StateTransitionThrottleConfig.java | 162 ++--------- .../stages/BestPossibleStateCalcStage.java | 4 +- .../stages/BestPossibleStateOutput.java | 3 +- .../stages/CurrentStateComputationStage.java | 4 +- .../stages/ExternalViewComputeStage.java | 4 +- .../stages/IntermediateStateCalcStage.java | 291 ++++++++++++++++++- .../stages/MessageGenerationPhase.java | 8 +- .../stages/MessageSelectionStage.java | 8 +- .../controller/stages/MessageThrottleStage.java | 6 +- .../stages/PersistAssignmentStage.java | 27 +- .../stages/ResourceComputationStage.java | 2 +- .../stages/ResourceValidationStage.java | 2 +- .../StateTransitionThrottleController.java | 176 +++++++++++ .../controller/stages/TaskAssignmentStage.java | 4 +- .../org/apache/helix/manager/zk/ZKUtil.java | 2 - .../org/apache/helix/model/ClusterConfig.java | 22 +- .../tools/ClusterExternalViewVerifier.java | 9 +- .../helix/tools/ClusterStateVerifier.java | 3 +- .../BestPossibleExternalViewVerifier.java | 2 +- .../ClusterExternalViewVerifier.java | 2 +- .../ClusterVerifiers/ClusterStateVerifier.java | 8 +- .../TestBestPossibleCalcStageCompatibility.java | 12 +- .../stages/TestBestPossibleStateCalcStage.java | 6 +- .../TestCurrentStateComputationStage.java | 12 +- .../stages/TestMessageThrottleStage.java | 8 +- .../stages/TestRebalancePipeline.java | 14 +- .../stages/TestResourceComputationStage.java | 6 +- .../stages/TestResourceValidationStage.java | 12 +- .../helix/integration/TestAutoRebalance.java | 3 +- .../TestPartitionMovementThrottle.java | 283 ++++++++++++++++++ .../helix/integration/task/TaskTestUtil.java | 2 +- 32 files changed, 885 insertions(+), 236 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java index 27a30cb..5970de0 100644 --- a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java @@ -162,20 +162,14 @@ public class ConfigAccessor { LOG.error("fail to get configs. invalid config scope. scope: " + scope + ", keys: " + keys); return null; } + ZNRecord record = getConfigZnRecord(scope); - String clusterName = scope.getClusterName(); - if (!ZKUtil.isClusterSetup(clusterName, zkClient)) { - throw new HelixException("fail to get configs. cluster " + clusterName + " is not setup yet"); - } - - Map<String, String> map = new HashMap<String, String>(); - - ZNRecord record = zkClient.readData(scope.getZkPath(), true); if (record == null) { LOG.warn("No config found at " + scope.getZkPath()); return null; } + Map<String, String> map = new HashMap<String, String>(); String mapKey = scope.getMapKey(); if (mapKey == null) { for (String key : keys) { @@ -304,8 +298,8 @@ public class ConfigAccessor { } } - String zkPath = scope.getZkPath(); String mapKey = scope.getMapKey(); + String zkPath = scope.getZkPath(); String id = zkPath.substring(zkPath.lastIndexOf('/') + 1); ZNRecord update = new ZNRecord(id); if (mapKey == null) { @@ -313,6 +307,7 @@ public class ConfigAccessor { } else { update.setMapField(mapKey, keyValueMap); } + ZKUtil.createOrUpdate(zkClient, zkPath, update, true, true); return; } @@ -620,6 +615,7 @@ public class ConfigAccessor { } /** +<<<<<<< HEAD * Set config of the given resource. * The current Resource config will be replaced with the given clusterConfig. * http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java index 39bd458..1ca25c5 100644 --- a/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java +++ b/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java @@ -35,128 +35,42 @@ public class StateTransitionThrottleConfig { private enum ConfigProperty { CONFIG_TYPE, REBALANCE_TYPE, - THROTTLE_SCOPE + THROTTLE_SCOPE, + MAX_PARTITION_IN_TRANSITION } public enum ThrottleScope { CLUSTER, RESOURCE, - INSTANCE, - PARTITION + INSTANCE } public enum RebalanceType { LOAD_BALANCE, - RECOVERY_BALANCE, - ANY + RECOVERY_BALANCE } - public static class StateTransitionType { - final static String ANY_STATE = "*"; - final static String FROM_KEY = "from"; - final static String TO_KEY = "to"; - String _fromState; - String _toState; + RebalanceType _rebalanceType; + ThrottleScope _throttleScope; + Long _maxPartitionInTransition; - StateTransitionType(String fromState, String toState) { - _fromState = fromState; - _toState = toState; - } - - @Override - public String toString() { - return FROM_KEY + "." + _fromState + "." + TO_KEY + "." + _toState; - } - - public static StateTransitionType parseFromString(String stateTransTypeStr) { - String states[] = stateTransTypeStr.split("."); - if (states.length < 4 || !states[0].equalsIgnoreCase(FROM_KEY) || !states[2] - .equalsIgnoreCase(TO_KEY)) { - return null; - } - return new StateTransitionType(states[1], states[3]); - } - } - - private ThrottleScope _throttleScope; - private RebalanceType _rebalanceType; - private Map<StateTransitionType, Long> _maxPendingStateTransitionMap; - - public StateTransitionThrottleConfig(RebalanceType rebalanceType, ThrottleScope throttleScope) { + public StateTransitionThrottleConfig(RebalanceType rebalanceType, + ThrottleScope throttleScope, long maxPartitionInTransition) { _rebalanceType = rebalanceType; _throttleScope = throttleScope; - _maxPendingStateTransitionMap = new HashMap<StateTransitionType, Long>(); + _maxPartitionInTransition = maxPartitionInTransition; } - /** - * Add a max pending transition from given from state to the specified to state. - * - * @param fromState - * @param toState - * @param maxPendingStateTransition - * @return - */ - public StateTransitionThrottleConfig addThrottle(String fromState, String toState, - long maxPendingStateTransition) { - _maxPendingStateTransitionMap - .put(new StateTransitionType(fromState, toState), maxPendingStateTransition); - return this; + public RebalanceType getRebalanceType() { + return _rebalanceType; } - /** - * Add a max pending transition from ANY state to ANY state. - * - * @param maxPendingStateTransition - * @return - */ - public StateTransitionThrottleConfig addThrottle(long maxPendingStateTransition) { - _maxPendingStateTransitionMap - .put(new StateTransitionType(StateTransitionType.ANY_STATE, StateTransitionType.ANY_STATE), - maxPendingStateTransition); - return this; + public ThrottleScope getThrottleScope() { + return _throttleScope; } - /** - * Add a max pending transition for a given state transition type. - * - * @param stateTransitionType - * @param maxPendingStateTransition - * @return - */ - public StateTransitionThrottleConfig addThrottle(StateTransitionType stateTransitionType, - long maxPendingStateTransition) { - _maxPendingStateTransitionMap.put(stateTransitionType, maxPendingStateTransition); - return this; - } - - /** - * Add a max pending transition from ANY state to the specified state. - * - * @param toState - * @param maxPendingStateTransition - * @return - */ - public StateTransitionThrottleConfig addThrottleFromAnyState(String toState, - long maxPendingStateTransition) { - _maxPendingStateTransitionMap - .put(new StateTransitionType(StateTransitionType.ANY_STATE, toState), - maxPendingStateTransition); - return this; - } - - /** - * Add a max pending transition from given state to ANY state. - * - * @param fromState - * @param maxPendingStateTransition - * @return - */ - public StateTransitionThrottleConfig addThrottleToAnyState(String fromState, - long maxPendingStateTransition) { - _maxPendingStateTransitionMap - .put(new StateTransitionType(fromState, StateTransitionType.ANY_STATE), - maxPendingStateTransition); - return this; + public Long getMaxPartitionInTransition() { + return _maxPartitionInTransition; } private final static ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @@ -167,21 +81,18 @@ public class StateTransitionThrottleConfig { * @return Json String for this config. */ public String toJSON() { - Map<String, String> configsMap = new HashMap<String, String>(); - - configsMap.put(ConfigProperty.REBALANCE_TYPE.name(), _rebalanceType.name()); - configsMap.put(ConfigProperty.THROTTLE_SCOPE.name(), _throttleScope.name()); - - for (Map.Entry<StateTransitionType, Long> e : _maxPendingStateTransitionMap.entrySet()) { - configsMap.put(e.getKey().toString(), String.valueOf(e.getValue())); - } + Map<String, String> configMap = new HashMap<String, String>(); + configMap.put(ConfigProperty.REBALANCE_TYPE.name(), _rebalanceType.name()); + configMap.put(ConfigProperty.THROTTLE_SCOPE.name(), _throttleScope.name()); + configMap.put(ConfigProperty.MAX_PARTITION_IN_TRANSITION.name(), + String.valueOf(_maxPartitionInTransition)); String jsonStr = null; try { ObjectWriter objectWriter = OBJECT_MAPPER.writer(); - jsonStr = objectWriter.writeValueAsString(configsMap); + jsonStr = objectWriter.writeValueAsString(configMap); } catch (IOException e) { - logger.error("Failed to convert config map to JSON object! " + configsMap); + logger.error("Failed to convert config map to JSON object! " + configMap); } return jsonStr; @@ -206,16 +117,17 @@ public class StateTransitionThrottleConfig { return throttleConfig; } - /** * Instantiate a throttle config from a config map * * @param configsMap - * @return StateTransitionThrottleConfig or null if the given configs map is not a valid StateTransitionThrottleConfig. + * + * @return StateTransitionThrottleConfig or null if the given configs map is not a valid + * StateTransitionThrottleConfig. */ public static StateTransitionThrottleConfig fromConfigMap(Map<String, String> configsMap) { - if (!configsMap.containsKey(ConfigProperty.REBALANCE_TYPE.name()) || - !configsMap.containsKey(ConfigProperty.THROTTLE_SCOPE.name())) { + if (!configsMap.containsKey(ConfigProperty.REBALANCE_TYPE.name()) || !configsMap + .containsKey(ConfigProperty.THROTTLE_SCOPE.name())) { // not a valid StateTransitionThrottleConfig return null; } @@ -226,25 +138,13 @@ public class StateTransitionThrottleConfig { RebalanceType.valueOf(configsMap.get(ConfigProperty.REBALANCE_TYPE.name())); ThrottleScope throttleScope = ThrottleScope.valueOf(configsMap.get(ConfigProperty.THROTTLE_SCOPE.name())); - config = new StateTransitionThrottleConfig(rebalanceType, throttleScope); + Long maxPartition = + Long.valueOf(configsMap.get(ConfigProperty.MAX_PARTITION_IN_TRANSITION.name())); + config = new StateTransitionThrottleConfig(rebalanceType, throttleScope, maxPartition); } catch (IllegalArgumentException ex) { return null; } - for (String configKey : configsMap.keySet()) { - StateTransitionType transitionType = StateTransitionType.parseFromString(configKey); - if (transitionType != null) { - try { - long value = Long.valueOf(configsMap.get(configKey)); - config.addThrottle(transitionType, value); - } catch (NumberFormatException ex) { - // ignore the config item with invalid number. - logger.warn(String.format("Invalid config entry, key=%s, value=%s", configKey, - configsMap.get(configKey))); - } - } - } - return config; } } http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java index fbb7f86..0a13a8d 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java @@ -57,8 +57,8 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage { logger.info("START BestPossibleStateCalcStage.process()"); CurrentStateOutput currentStateOutput = - event.getAttribute(AttributeName.CURRENT_STATE.toString()); - Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString()); + event.getAttribute(AttributeName.CURRENT_STATE.name()); + Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name()); ClusterDataCache cache = event.getAttribute("ClusterDataCache"); if (currentStateOutput == null || resourceMap == null || cache == null) { http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java index c64c8cf..9b5faea 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java @@ -40,7 +40,8 @@ public class BestPossibleStateOutput extends ResourcesStateMap { * @return */ // TODO: remove this. - @Deprecated public Map<Partition, Map<String, String>> getResourceMap(String resourceName) { + @Deprecated + public Map<Partition, Map<String, String>> getResourceMap(String resourceName) { PartitionStateMap map = _resourceStateMap.get(resourceName); if (map != null) { return map.getStateMap(); http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java index 624698d..0dd4165 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java @@ -40,7 +40,7 @@ public class CurrentStateComputationStage extends AbstractBaseStage { @Override public void process(ClusterEvent event) throws Exception { ClusterDataCache cache = event.getAttribute("ClusterDataCache"); - Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString()); + Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name()); if (cache == null || resourceMap == null) { throw new StageException("Missing attributes in event:" + event @@ -126,6 +126,6 @@ public class CurrentStateComputationStage extends AbstractBaseStage { } } } - event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput); + event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); } } http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java index d83518d..5eaf08a 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java @@ -59,7 +59,7 @@ public class ExternalViewComputeStage extends AbstractBaseStage { LOG.info("START ExternalViewComputeStage.process()"); HelixManager manager = event.getAttribute("helixmanager"); - Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString()); + Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name()); ClusterDataCache cache = event.getAttribute("ClusterDataCache"); if (manager == null || resourceMap == null || cache == null) { @@ -71,7 +71,7 @@ public class ExternalViewComputeStage extends AbstractBaseStage { PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder(); CurrentStateOutput currentStateOutput = - event.getAttribute(AttributeName.CURRENT_STATE.toString()); + event.getAttribute(AttributeName.CURRENT_STATE.name()); List<ExternalView> newExtViews = new ArrayList<ExternalView>(); http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java index 5a13c7a..babc938 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java @@ -19,31 +19,41 @@ package org.apache.helix.controller.stages; * under the License. */ +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import org.apache.helix.HelixDefinedState; +import org.apache.helix.api.config.StateTransitionThrottleConfig; +import org.apache.helix.controller.common.PartitionStateMap; import org.apache.helix.controller.pipeline.AbstractBaseStage; import org.apache.helix.controller.pipeline.StageException; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.Partition; import org.apache.helix.model.Resource; +import org.apache.helix.model.StateModelDefinition; import org.apache.log4j.Logger; +import java.util.List; import java.util.Map; +import java.util.Set; /** - * For partition compute the Intermediate State (instance,state) pair based on - * the BestPossible State and Current State, with all constraints applied (such as state transition throttling). + * For partition compute the Intermediate State (instance,state) pair based on the BestPossible + * State and Current State, with all constraints applied (such as state transition throttling). */ public class IntermediateStateCalcStage extends AbstractBaseStage { private static final Logger logger = Logger.getLogger(IntermediateStateCalcStage.class.getName()); - @Override - public void process(ClusterEvent event) throws Exception { + @Override public void process(ClusterEvent event) throws Exception { long startTime = System.currentTimeMillis(); logger.info("START Intermediate.process()"); CurrentStateOutput currentStateOutput = - event.getAttribute(AttributeName.CURRENT_STATE.toString()); + event.getAttribute(AttributeName.CURRENT_STATE.name()); BestPossibleStateOutput bestPossibleStateOutput = - event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString()); - Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString()); + event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name()); + Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name()); ClusterDataCache cache = event.getAttribute("ClusterDataCache"); if (currentStateOutput == null || bestPossibleStateOutput == null || resourceMap == null @@ -53,26 +63,281 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { } IntermediateStateOutput immediateStateOutput = - compute(event, resourceMap, currentStateOutput, bestPossibleStateOutput); + compute(cache, resourceMap, currentStateOutput, bestPossibleStateOutput); + event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), immediateStateOutput); long endTime = System.currentTimeMillis(); logger.info("END ImmediateStateCalcStage.process(). took: " + (endTime - startTime) + " ms"); } - private IntermediateStateOutput compute(ClusterEvent event, Map<String, Resource> resourceMap, - CurrentStateOutput currentStateOutput, BestPossibleStateOutput bestPossibleStateOutput) { + private IntermediateStateOutput compute(ClusterDataCache dataCache, + Map<String, Resource> resourceMap, CurrentStateOutput currentStateOutput, + BestPossibleStateOutput bestPossibleStateOutput) { // for each resource // get the best possible state and current state // try to bring immediate state close to best possible state until // the possible pending state transition numbers reach the set throttle number. IntermediateStateOutput output = new IntermediateStateOutput(); - // TODO: add throttling logic here. + StateTransitionThrottleController throttleController = + new StateTransitionThrottleController(resourceMap.keySet(), dataCache.getClusterConfig(), + dataCache.getLiveInstances().keySet()); + for (String resourceName : resourceMap.keySet()) { - logger.debug("Processing resource:" + resourceName); - output.setState(resourceName, bestPossibleStateOutput.getPartitionStateMap(resourceName)); + PartitionStateMap intermediatePartitionStateMap = + computeIntermediatePartitionState(dataCache, dataCache.getIdealState(resourceName), + resourceMap.get(resourceName), currentStateOutput, + bestPossibleStateOutput.getPartitionStateMap(resourceName), throttleController); + output.setState(resourceName, intermediatePartitionStateMap); } return output; } + + public PartitionStateMap computeIntermediatePartitionState(ClusterDataCache cache, + IdealState idealState, Resource resource, CurrentStateOutput currentStateOutput, + PartitionStateMap bestPossiblePartitionStateMap, + StateTransitionThrottleController throttleController) { + String resourceName = resource.getResourceName(); + logger.info("Processing resource:" + resourceName); + + if (!throttleController.isThrottleEnabled()) { + logger.info("None of any type of transition throttling is set for resource " + resourceName + + " skip computing intermediate partition state."); + return bestPossiblePartitionStateMap; + } + + String stateModelDefName = idealState.getStateModelDefRef(); + StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName); + + boolean pendingRecoveryRebalance = false; + + // check and charge pending transitions + for (Partition partition : resource.getPartitions()) { + Map<String, String> currentStateMap = + currentStateOutput.getCurrentStateMap(resourceName, partition); + Map<String, String> pendingMap = + currentStateOutput.getPendingStateMap(resourceName, partition); + Map<String, String> bestPossibleMap = + bestPossiblePartitionStateMap.getPartitionMap(partition); + + StateTransitionThrottleConfig.RebalanceType rebalanceType; + if (needRecoveryRebalance(bestPossibleMap, stateModelDef, currentStateMap)) { + rebalanceType = StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE; + pendingRecoveryRebalance = true; + } else { + rebalanceType = StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE; + } + + if (pendingMap.size() > 0) { + throttleController.chargeCluster(rebalanceType); + throttleController.chargeResource(rebalanceType, resourceName); + } + + Set<String> allInstances = new HashSet<String>(currentStateMap.keySet()); + allInstances.addAll(pendingMap.keySet()); + + for (String ins : allInstances) { + String currentState = currentStateMap.get(ins); + String pendingState = pendingMap.get(ins); + if (pendingState != null && !pendingState.equals(currentState)) { + throttleController.chargeInstance(rebalanceType, ins); + } + } + } + + PartitionStateMap output = new PartitionStateMap(resourceName); + + int recoveryNeededCount = 0, recoveryThrottledCount = 0; + int loadbalanceNeededCount = 0, loadbalanceThrottledCount = 0; + + Set<Partition> partitionsNeedRecovery = new HashSet<Partition>(); + Set<Partition> partitionsNeedLoadbalance = new HashSet<Partition>(); + Set<Partition> partitionsRecoveryThrotted = new HashSet<Partition>(); + Set<Partition> partitionsLoadbalanceThrottled = new HashSet<Partition>(); + + // check recovery rebalance + for (Partition partition : resource.getPartitions()) { + Map<String, String> currentStateMap = + currentStateOutput.getCurrentStateMap(resourceName, partition); + Map<String, String> bestPossibleMap = + bestPossiblePartitionStateMap.getPartitionMap(partition); + Map<String, String> intermediateMap = new HashMap<String, String>(); + + if (currentStateMap.equals(bestPossibleMap)) { + // no rebalance needed. + intermediateMap.putAll(bestPossibleMap); + } else if (needRecoveryRebalance(bestPossibleMap, stateModelDef, currentStateMap)) { + //TODO: add throttling on recovery balance + recoveryNeededCount++; + intermediateMap.putAll(bestPossibleMap); + pendingRecoveryRebalance = true; + partitionsNeedRecovery.add(partition); + } else { + partitionsNeedLoadbalance.add(partition); + } + output.setState(partition, intermediateMap); + } + + // perform load balance only if no partition need recovery rebalance. + loadbalanceNeededCount = partitionsNeedLoadbalance.size(); + if (!pendingRecoveryRebalance) { + for (Partition partition : partitionsNeedLoadbalance) { + Map<String, String> currentStateMap = + currentStateOutput.getCurrentStateMap(resourceName, partition); + Map<String, String> bestPossibleMap = + bestPossiblePartitionStateMap.getPartitionMap(partition); + Map<String, String> intermediateMap = new HashMap<String, String>(); + ; + + Set<String> allInstances = new HashSet<String>(currentStateMap.keySet()); + allInstances.addAll(bestPossibleMap.keySet()); + + boolean throttled = false; + if (throttleController + .throttleforResource(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE, + resourceName)) { + throttled = true; + logger.debug("Load balance throttled on resource for " + resourceName + " " + partition + .getPartitionName()); + } else { + // throttle the load balance if any of the instance can not handle the state transition + // TODO: may need finer grained control here. + for (String ins : allInstances) { + String currentState = currentStateMap.get(ins); + String bestPossibleState = bestPossibleMap.get(ins); + if (bestPossibleState != null && !bestPossibleState.equals(currentState)) { + if (throttleController + .throttleForInstance(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE, + ins)) { + throttled = true; + logger.debug( + "Load balance throttled because instance " + ins + " for " + resourceName + " " + + partition.getPartitionName()); + } + } + } + } + + if (!throttled) { + intermediateMap.putAll(bestPossibleMap); + for (String ins : allInstances) { + String currentState = currentStateMap.get(ins); + String bestPossibleState = bestPossibleMap.get(ins); + if (bestPossibleState != null && !bestPossibleState.equals(currentState)) { + throttleController + .chargeInstance(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE, ins); + } + } + + throttleController + .chargeCluster(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE); + throttleController + .chargeResource(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE, + resourceName); + } else { + intermediateMap.putAll(currentStateMap); + loadbalanceThrottledCount++; + partitionsLoadbalanceThrottled.add(partition); + } + output.setState(partition, intermediateMap); + } + } + + logger.info(String.format( + "RecoveryNeeded: %d, RecoveryThrottled: %d, loadbalanceNeeded: %d, loadbalanceThrottled: %d", + recoveryNeededCount, recoveryThrottledCount, loadbalanceNeededCount, + loadbalanceThrottledCount)); + + if (logger.isDebugEnabled()) { + logParitionMapState(resourceName, new HashSet(resource.getPartitions()), + partitionsNeedRecovery, partitionsRecoveryThrotted, partitionsNeedLoadbalance, + partitionsLoadbalanceThrottled, currentStateOutput, bestPossiblePartitionStateMap, + output); + } + + logger.info("End processing resource:" + resourceName); + + return output; + } + + private void logParitionMapState(String resource, Set<Partition> allPartitions, + Set<Partition> recoveryPartitions, Set<Partition> recoveryThrottledPartitions, + Set<Partition> loadbalancePartitions, Set<Partition> loadbalanceThrottledPartitions, + CurrentStateOutput currentStateOutput, + PartitionStateMap bestPossibleStateMap, + PartitionStateMap intermediateStateMap) { + + logger.debug("Partitions need recovery: " + recoveryPartitions + + "\nPartitions get throttled on recovery: " + recoveryThrottledPartitions); + logger.debug("Partitions need loadbalance: " + loadbalancePartitions + + "\nPartitions get throttled on load-balance: " + loadbalanceThrottledPartitions); + + for (Partition partition : allPartitions) { + if (recoveryPartitions.contains(partition)) { + logger + .debug("recovery balance needed for " + resource + " " + partition.getPartitionName()); + if (recoveryThrottledPartitions.contains(partition)) { + logger.debug("Recovery balance throttled on resource for " + resource + " " + partition + .getPartitionName()); + } + } else if (loadbalancePartitions.contains(partition)) { + logger.debug("load balance needed for " + resource + " " + partition.getPartitionName()); + if (loadbalanceThrottledPartitions.contains(partition)) { + logger.debug("Load balance throttled on resource for " + resource + " " + partition + .getPartitionName()); + } + } else { + logger.debug("no balance needed for " + resource + " " + partition.getPartitionName()); + } + + logger.debug( + partition + ": Best possible map: " + bestPossibleStateMap.getPartitionMap(partition)); + logger.debug(partition + ": Current State: " + currentStateOutput + .getCurrentStateMap(resource, partition)); + logger.debug(partition + ": Pending state: " + currentStateOutput + .getPendingMessageMap(resource, partition)); + logger.debug( + partition + ": Intermediate state: " + intermediateStateMap.getPartitionMap(partition)); + } + } + + private boolean needRecoveryRebalance(Map<String, String> bestPossibleMap, + StateModelDefinition stateModelDef, Map<String, String> currentStateMap) { + boolean recoveryBalanceNeeded = false; + List<String> states = stateModelDef.getStatesPriorityList(); + Map<String, Long> bestPossibleStateCounts = getStateCounts(bestPossibleMap); + Map<String, Long> currentStateCounts = getStateCounts(currentStateMap); + + for (String state : states) { + Long bestPossibleCount = bestPossibleStateCounts.get(state); + Long currentCount = currentStateCounts.get(state); + + if (bestPossibleCount == null && currentCount == null) { + continue; + } else if (bestPossibleCount == null || currentCount == null || + !bestPossibleCount.equals(currentCount)) { + if (!state.equals(HelixDefinedState.DROPPED.name()) && + !state.equals(HelixDefinedState.ERROR.name()) && + !state.equals(stateModelDef.getInitialState())) { + recoveryBalanceNeeded = true; + break; + } + } + } + + return recoveryBalanceNeeded; + } + + /* given instance->state map, return the state counts */ + private Map<String, Long> getStateCounts(Map<String, String> stateMap) { + Map<String, Long> stateCounts = new HashMap<String, Long>(); + for (String state : stateMap.values()) { + if (!stateCounts.containsKey(state)) { + stateCounts.put(state, 0L); + } + stateCounts.put(state, stateCounts.get(state) + 1); + } + return stateCounts; + } } http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java index 2f4a331..f5f912e 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java @@ -51,11 +51,11 @@ public class MessageGenerationPhase extends AbstractBaseStage { public void process(ClusterEvent event) throws Exception { HelixManager manager = event.getAttribute("helixmanager"); ClusterDataCache cache = event.getAttribute("ClusterDataCache"); - Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString()); + Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name()); CurrentStateOutput currentStateOutput = - event.getAttribute(AttributeName.CURRENT_STATE.toString()); + event.getAttribute(AttributeName.CURRENT_STATE.name()); IntermediateStateOutput intermediateStateOutput = - event.getAttribute(AttributeName.INTERMEDIATE_STATE.toString()); + event.getAttribute(AttributeName.INTERMEDIATE_STATE.name()); if (manager == null || cache == null || resourceMap == null || currentStateOutput == null || intermediateStateOutput == null) { throw new StageException("Missing attributes in event:" + event @@ -168,7 +168,7 @@ public class MessageGenerationPhase extends AbstractBaseStage { } // end of for-each-partition } - event.addAttribute(AttributeName.MESSAGES_ALL.toString(), output); + event.addAttribute(AttributeName.MESSAGES_ALL.name(), output); } private Message createMessage(HelixManager manager, Resource resource, String partitionName, http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java index 0bc1905..8e50d83 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java @@ -76,11 +76,11 @@ public class MessageSelectionStage extends AbstractBaseStage { @Override public void process(ClusterEvent event) throws Exception { ClusterDataCache cache = event.getAttribute("ClusterDataCache"); - Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString()); + Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name()); CurrentStateOutput currentStateOutput = - event.getAttribute(AttributeName.CURRENT_STATE.toString()); + event.getAttribute(AttributeName.CURRENT_STATE.name()); MessageGenerationOutput messageGenOutput = - event.getAttribute(AttributeName.MESSAGES_ALL.toString()); + event.getAttribute(AttributeName.MESSAGES_ALL.name()); if (cache == null || resourceMap == null || currentStateOutput == null || messageGenOutput == null) { throw new StageException("Missing attributes in event:" + event @@ -107,7 +107,7 @@ public class MessageSelectionStage extends AbstractBaseStage { output.addMessages(resourceName, partition, selectedMessages); } } - event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), output); + event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), output); } private void increaseStateCnt(Map<String, Bounds> stateConstraints, String state, http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java index 6bf610a..9a764ff 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java @@ -114,8 +114,8 @@ public class MessageThrottleStage extends AbstractBaseStage { public void process(ClusterEvent event) throws Exception { ClusterDataCache cache = event.getAttribute("ClusterDataCache"); MessageSelectionStageOutput msgSelectionOutput = - event.getAttribute(AttributeName.MESSAGES_SELECTED.toString()); - Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString()); + event.getAttribute(AttributeName.MESSAGES_SELECTED.name()); + Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name()); if (cache == null || resourceMap == null || msgSelectionOutput == null) { throw new StageException("Missing attributes in event: " + event @@ -148,7 +148,7 @@ public class MessageThrottleStage extends AbstractBaseStage { } } - event.addAttribute(AttributeName.MESSAGES_THROTTLE.toString(), output); + event.addAttribute(AttributeName.MESSAGES_THROTTLE.name(), output); } private List<Message> throttle(Map<String, Integer> throttleMap, ClusterConstraints constraint, http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java index 8255cf4..b55a838 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java @@ -28,6 +28,7 @@ import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.PropertyKey; import org.apache.helix.ZNRecord; +import org.apache.helix.controller.common.PartitionStateMap; import org.apache.helix.controller.pipeline.AbstractBaseStage; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.IdealState; @@ -48,7 +49,8 @@ public class PersistAssignmentStage extends AbstractBaseStage { ClusterDataCache cache = event.getAttribute("ClusterDataCache"); ClusterConfig clusterConfig = cache.getClusterConfig(); - if (!clusterConfig.isPersistBestPossibleAssignment()) { + if (!clusterConfig.isPersistBestPossibleAssignment() && !clusterConfig + .isPersistIntermediateAssignment()) { return; } @@ -86,13 +88,19 @@ public class PersistAssignmentStage extends AbstractBaseStage { } } - Map<Partition, Map<String, String>> bestPossibleAssignements = - bestPossibleAssignment.getResourceMap(resourceId); + PartitionStateMap partitionStateMap = + bestPossibleAssignment.getPartitionStateMap(resourceId); + if (clusterConfig.isPersistIntermediateAssignment()) { + IntermediateStateOutput intermediateAssignment = + event.getAttribute(AttributeName.INTERMEDIATE_STATE.name()); + partitionStateMap = intermediateAssignment.getPartitionStateMap(resourceId); + } + + Map<Partition, Map<String, String>> assignmentToPersist = partitionStateMap.getStateMap()); - if (bestPossibleAssignements != null && hasInstanceMapChanged(bestPossibleAssignements, - idealState)) { - for (Partition partition : bestPossibleAssignements.keySet()) { - Map<String, String> instanceMap = bestPossibleAssignements.get(partition); + if (assignmentToPersist != null && hasInstanceMapChanged(assignmentToPersist, idealState)) { + for (Partition partition : assignmentToPersist.keySet()) { + Map<String, String> instanceMap = assignmentToPersist.get(partition); idealState.setInstanceStateMap(partition.getPartitionName(), instanceMap); } needPersist = true; @@ -101,8 +109,7 @@ public class PersistAssignmentStage extends AbstractBaseStage { if (needPersist) { // Update instead of set to ensure any intermediate changes that the controller does not update are kept. accessor.updateProperty(keyBuilder.idealStates(resourceId), new DataUpdater<ZNRecord>() { - @Override - public ZNRecord update(ZNRecord current) { + @Override public ZNRecord update(ZNRecord current) { if (current != null) { // Overwrite MapFields and ListFields items with the same key. // Note that default merge will keep old values in the maps or lists unchanged, which is not desired. @@ -117,7 +124,7 @@ public class PersistAssignmentStage extends AbstractBaseStage { } long endTime = System.currentTimeMillis(); - LOG.info("END PersistAssignmentStage.process(), took " + (endTime - startTime) + " ms"); + LOG.info("END PersistAssignmentStage.process() took " + (endTime - startTime) + " ms"); } /** http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java index bde2904..65b94ab 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java @@ -127,7 +127,7 @@ public class ResourceComputationStage extends AbstractBaseStage { } } - event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap); + event.addAttribute(AttributeName.RESOURCES.name(), resourceMap); } private void addResource(String resource, Map<String, Resource> resourceMap) { http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java index e552797..09cbca6 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java @@ -37,7 +37,7 @@ public class ResourceValidationStage extends AbstractBaseStage { if (cache == null) { throw new StageException("Missing attributes in event:" + event + ". Requires DataCache"); } - Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString()); + Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name()); if (resourceMap == null) { throw new StageException("Resources must be computed prior to validation!"); } http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java b/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java new file mode 100644 index 0000000..6acfd9e --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java @@ -0,0 +1,176 @@ +package org.apache.helix.controller.stages; + +import java.util.Set; +import org.apache.helix.api.config.StateTransitionThrottleConfig; +import org.apache.helix.model.ClusterConfig; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.log4j.Logger; + +/** + * Output for IntermediateStateCalStage. + */ +class StateTransitionThrottleController { + private static Logger logger = Logger.getLogger(StateTransitionThrottleController.class); + + // pending allowed transition counts in the cluster level for recovery and load balance + Map<StateTransitionThrottleConfig.RebalanceType, Long> _pendingTransitionAllowedInCluster; + + // pending allowed transition counts for each instance and resource + Map<String, Map<StateTransitionThrottleConfig.RebalanceType, Long>> + _pendingTransitionAllowedPerInstance; + Map<String, Map<StateTransitionThrottleConfig.RebalanceType, Long>> + _pendingTransitionAllowedPerResource; + + private boolean _throttleEnabled = false; + + public StateTransitionThrottleController(Set<String> resources, ClusterConfig clusterConfig, + Set<String> liveInstances) { + super(); + _pendingTransitionAllowedInCluster = + new HashMap<StateTransitionThrottleConfig.RebalanceType, Long>(); + _pendingTransitionAllowedPerInstance = + new HashMap<String, Map<StateTransitionThrottleConfig.RebalanceType, Long>>(); + _pendingTransitionAllowedPerResource = + new HashMap<String, Map<StateTransitionThrottleConfig.RebalanceType, Long>>(); + + if (clusterConfig == null) { + logger.warn("Cluster config is not found, no throttle config set!"); + return; + } + + List<StateTransitionThrottleConfig> throttleConfigs = + clusterConfig.getStateTransitionThrottleConfigs(); + + if (throttleConfigs == null || throttleConfigs.isEmpty()) { + logger.info("No throttle config is set!"); + return; + } + + for (StateTransitionThrottleConfig config : throttleConfigs) { + switch (config.getThrottleScope()) { + case CLUSTER: + _pendingTransitionAllowedInCluster + .put(config.getRebalanceType(), config.getMaxPartitionInTransition()); + _throttleEnabled = true; + break; + case RESOURCE: + for (String resource : resources) { + if (!_pendingTransitionAllowedPerResource.containsKey(resource)) { + _pendingTransitionAllowedPerResource + .put(resource, new HashMap<StateTransitionThrottleConfig.RebalanceType, Long>()); + } + _pendingTransitionAllowedPerResource.get(resource) + .put(config.getRebalanceType(), config.getMaxPartitionInTransition()); + } + _throttleEnabled = true; + break; + case INSTANCE: + for (String instance : liveInstances) { + if (!_pendingTransitionAllowedPerInstance.containsKey(instance)) { + _pendingTransitionAllowedPerInstance + .put(instance, new HashMap<StateTransitionThrottleConfig.RebalanceType, Long>()); + } + _pendingTransitionAllowedPerInstance.get(instance) + .put(config.getRebalanceType(), config.getMaxPartitionInTransition()); + } + _throttleEnabled = true; + break; + } + } + } + + /** + * Whether any throttle config enabled for this cluster. + * + * @return + */ + protected boolean isThrottleEnabled() { + return _throttleEnabled; + } + + /** + * Check if state transition on a partition should be throttled. + * + * @return true if it should be throttled, otherwise, false. + */ + protected boolean throttleforCluster( + StateTransitionThrottleConfig.RebalanceType rebalanceType) { + Long clusterThrottle = _pendingTransitionAllowedInCluster.get(rebalanceType); + if (clusterThrottle != null) { + if (clusterThrottle <= 0) { + return true; + } + } + + return false; + } + + protected boolean throttleforResource( + StateTransitionThrottleConfig.RebalanceType rebalanceType, String resourceName) { + if (throttleforCluster(rebalanceType)) { + return true; + } + + Long resouceThrottle; + if (_pendingTransitionAllowedPerResource.containsKey(resourceName)) { + resouceThrottle = _pendingTransitionAllowedPerResource.get(resourceName).get(rebalanceType); + if (resouceThrottle != null && resouceThrottle <= 0) { + return true; + } + } + + return false; + } + + protected boolean throttleForInstance( + StateTransitionThrottleConfig.RebalanceType rebalanceType, String instanceName) { + if (throttleforCluster(rebalanceType)) { + return true; + } + + Long instanceThrottle; + if (_pendingTransitionAllowedPerInstance.containsKey(instanceName)) { + instanceThrottle = _pendingTransitionAllowedPerInstance.get(instanceName).get(rebalanceType); + if (instanceThrottle != null && instanceThrottle <= 0) { + return true; + } + } + + return false; + } + + protected void chargeCluster(StateTransitionThrottleConfig.RebalanceType rebalanceType) { + if (_pendingTransitionAllowedInCluster.containsKey(rebalanceType)) { + Long clusterThrottle = _pendingTransitionAllowedInCluster.get(rebalanceType); + if (clusterThrottle > 0) { + _pendingTransitionAllowedInCluster.put(rebalanceType, clusterThrottle - 1); + } + } + } + + protected void chargeResource(StateTransitionThrottleConfig.RebalanceType rebalanceType, + String resource) { + if (_pendingTransitionAllowedPerResource.containsKey(resource) + && _pendingTransitionAllowedPerResource.get(resource).containsKey(rebalanceType)) { + Long resouceThrottle = _pendingTransitionAllowedPerResource.get(resource).get(rebalanceType); + if (resouceThrottle > 0) { + _pendingTransitionAllowedPerResource.get(resource).put(rebalanceType, resouceThrottle - 1); + } + } + } + + protected void chargeInstance(StateTransitionThrottleConfig.RebalanceType rebalanceType, + String instance) { + if (_pendingTransitionAllowedPerInstance.containsKey(instance) + && _pendingTransitionAllowedPerInstance.get(instance).containsKey(rebalanceType)) { + Long instanceThrottle = _pendingTransitionAllowedPerInstance.get(instance).get(rebalanceType); + if (instanceThrottle > 0) { + _pendingTransitionAllowedPerInstance.get(instance).put(rebalanceType, instanceThrottle - 1); + } + } + } +} + http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java index c466bc6..8aed23e 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java @@ -48,9 +48,9 @@ public class TaskAssignmentStage extends AbstractBaseStage { logger.info("START TaskAssignmentStage.process()"); HelixManager manager = event.getAttribute("helixmanager"); - Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString()); + Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name()); MessageThrottleStageOutput messageOutput = - event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString()); + event.getAttribute(AttributeName.MESSAGES_THROTTLE.name()); ClusterDataCache cache = event.getAttribute("ClusterDataCache"); Map<String, LiveInstance> liveInstanceMap = cache.getLiveInstances(); http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java index 2b4cfb2..38b74cb 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java @@ -27,9 +27,7 @@ import org.I0Itec.zkclient.DataUpdater; import org.apache.helix.BaseDataAccessor; import org.apache.helix.InstanceType; import org.apache.helix.PropertyPathBuilder; -import org.apache.helix.PropertyType; import org.apache.helix.ZNRecord; -import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty; import org.apache.log4j.Logger; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java index f679b3f..79bb6fa 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java @@ -36,8 +36,9 @@ public class ClusterConfig extends HelixProperty { */ public enum ClusterConfigProperty { HELIX_DISABLE_PIPELINE_TRIGGERS, - TOPOLOGY, // cluster topology definition, for example, "/zone/rack/host/instance" PERSIST_BEST_POSSIBLE_ASSIGNMENT, + PERSIST_INTERMEDIATE_ASSIGNMENT, + TOPOLOGY, // cluster topology definition, for example, "/zone/rack/host/instance" FAULT_ZONE_TYPE, // the type in which isolation should be applied on when Helix places the replicas from same partition. DELAY_REBALANCE_DISABLED, // enabled the delayed rebalaning in case node goes offline. DELAY_REBALANCE_TIME, // delayed time in ms that the delay time Helix should hold until rebalancing. @@ -91,9 +92,28 @@ public class ClusterConfig extends HelixProperty { } /** + * Whether to persist IntermediateAssignment in a resource's idealstate. + * + * @return + */ + public Boolean isPersistIntermediateAssignment() { + return _record + .getBooleanField(ClusterConfigProperty.PERSIST_INTERMEDIATE_ASSIGNMENT.toString(), false); + } + + /** + * Enable/Disable persist IntermediateAssignment in a resource's idealstate. * * @return */ + public void setPersistIntermediateAssignment(Boolean enable) { + if (enable == null) { + _record.getSimpleFields().remove(ClusterConfigProperty.PERSIST_INTERMEDIATE_ASSIGNMENT.toString()); + } else { + _record.setBooleanField(ClusterConfigProperty.PERSIST_INTERMEDIATE_ASSIGNMENT.toString(), enable); + } + } + public Boolean isPipelineTriggersDisabled() { return _record .getBooleanField(ClusterConfigProperty.HELIX_DISABLE_PIPELINE_TRIGGERS.toString(), false); http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java index 03c79d2..179f89a 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java +++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java @@ -28,11 +28,14 @@ import org.apache.helix.manager.zk.ZkClient; */ /** - * This class is deprecated, please use BestPossibleExternalViewVerifier in tools.ClusterVerifiers instead. + * This class is deprecated, please use BestPossibleExternalViewVerifier in tools.ClusterVerifiers + * instead. */ @Deprecated -public class ClusterExternalViewVerifier extends org.apache.helix.tools.ClusterVerifiers.ClusterExternalViewVerifier { - public ClusterExternalViewVerifier(ZkClient zkclient, String clusterName, List<String> expectLiveNodes) { +public class ClusterExternalViewVerifier + extends org.apache.helix.tools.ClusterVerifiers.ClusterExternalViewVerifier { + public ClusterExternalViewVerifier(ZkClient zkclient, String clusterName, + List<String> expectLiveNodes) { super(zkclient, clusterName, expectLiveNodes); } } http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java index fc87dca..576b2fe 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java +++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java @@ -24,5 +24,6 @@ package org.apache.helix.tools; * please use dedicated verifier classes, such as BestPossibleExternViewVerifier, etc, in tools.ClusterVerifiers */ @Deprecated -public class ClusterStateVerifier extends org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier{ +public class ClusterStateVerifier + extends org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier { } http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java index 6c79bed..2b6d92c 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java +++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java @@ -354,7 +354,7 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier { runStage(event, new BestPossibleStateCalcStage()); BestPossibleStateOutput output = - event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString()); + event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name()); return output; } http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterExternalViewVerifier.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterExternalViewVerifier.java index fa697c4..933acc2 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterExternalViewVerifier.java +++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterExternalViewVerifier.java @@ -101,7 +101,7 @@ public class ClusterExternalViewVerifier extends ClusterVerifier { runStage(event, stage); } - return event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString()); + return event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name()); } /** http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterStateVerifier.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterStateVerifier.java index d2a2d09..eace66f 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterStateVerifier.java +++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterStateVerifier.java @@ -247,7 +247,7 @@ public class ClusterStateVerifier { bestPossStateMap.get(resourceName).put(partition, new HashMap<String, String>()); } bestPossStateMap.get(resourceName).get(partition) - .put(instanceName, HelixDefinedState.ERROR.toString()); + .put(instanceName, HelixDefinedState.ERROR.name()); } } } @@ -281,7 +281,7 @@ public class ClusterStateVerifier { while (insIter.hasNext()) { Map.Entry<String, String> insEntry = insIter.next(); String state = insEntry.getValue(); - if (state.equalsIgnoreCase(HelixDefinedState.DROPPED.toString())) { + if (state.equalsIgnoreCase(HelixDefinedState.DROPPED.name())) { insIter.remove(); } } @@ -351,7 +351,7 @@ public class ClusterStateVerifier { // Filter resources if specified if (resources != null) { - Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString()); + Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name()); resourceMap.keySet().retainAll(resources); } @@ -359,7 +359,7 @@ public class ClusterStateVerifier { runStage(event, bpStage); BestPossibleStateOutput output = - event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString()); + event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name()); // System.out.println("output:" + output); return output; http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java index 33570a0..4ea93ac 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java @@ -53,8 +53,8 @@ public class TestBestPossibleCalcStageCompatibility extends BaseStageTest { Map<String, Resource> resourceMap = getResourceMap(); CurrentStateOutput currentStateOutput = new CurrentStateOutput(); - event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap); - event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput); + event.addAttribute(AttributeName.RESOURCES.name(), resourceMap); + event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); ReadClusterDataStage stage1 = new ReadClusterDataStage(); runStage(event, stage1); @@ -62,7 +62,7 @@ public class TestBestPossibleCalcStageCompatibility extends BaseStageTest { runStage(event, stage2); BestPossibleStateOutput output = - event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString()); + event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name()); for (int p = 0; p < 5; p++) { Partition resource = new Partition("testResourceName_" + p); AssertJUnit.assertEquals("MASTER", output.getInstanceStateMap("testResourceName", resource) @@ -86,8 +86,8 @@ public class TestBestPossibleCalcStageCompatibility extends BaseStageTest { Map<String, Resource> resourceMap = getResourceMap(); CurrentStateOutput currentStateOutput = new CurrentStateOutput(); - event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap); - event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput); + event.addAttribute(AttributeName.RESOURCES.name(), resourceMap); + event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); ReadClusterDataStage stage1 = new ReadClusterDataStage(); runStage(event, stage1); @@ -95,7 +95,7 @@ public class TestBestPossibleCalcStageCompatibility extends BaseStageTest { runStage(event, stage2); BestPossibleStateOutput output = - event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString()); + event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name()); for (int p = 0; p < 5; p++) { Partition resource = new Partition("testResourceName_" + p); AssertJUnit.assertNull(output.getInstanceStateMap("testResourceName", resource).get( http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java index 82c7b37..43e0e07 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java @@ -49,8 +49,8 @@ public class TestBestPossibleStateCalcStage extends BaseStageTest { Map<String, Resource> resourceMap = getResourceMap(); CurrentStateOutput currentStateOutput = new CurrentStateOutput(); - event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap); - event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput); + event.addAttribute(AttributeName.RESOURCES.name(), resourceMap); + event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); ReadClusterDataStage stage1 = new ReadClusterDataStage(); runStage(event, stage1); @@ -58,7 +58,7 @@ public class TestBestPossibleStateCalcStage extends BaseStageTest { runStage(event, stage2); BestPossibleStateOutput output = - event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString()); + event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name()); for (int p = 0; p < 5; p++) { Partition resource = new Partition("testResourceName_" + p); AssertJUnit.assertEquals("MASTER", output.getInstanceStateMap("testResourceName", resource) http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java index c5f54a5..ac1f262 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java @@ -39,11 +39,11 @@ public class TestCurrentStateComputationStage extends BaseStageTest { @Test public void testEmptyCS() { Map<String, Resource> resourceMap = getResourceMap(); - event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap); + event.addAttribute(AttributeName.RESOURCES.name(), resourceMap); CurrentStateComputationStage stage = new CurrentStateComputationStage(); runStage(event, new ReadClusterDataStage()); runStage(event, stage); - CurrentStateOutput output = event.getAttribute(AttributeName.CURRENT_STATE.toString()); + CurrentStateOutput output = event.getAttribute(AttributeName.CURRENT_STATE.name()); AssertJUnit.assertEquals( output.getCurrentStateMap("testResourceName", new Partition("testResourceName_0")).size(), 0); @@ -56,11 +56,11 @@ public class TestCurrentStateComputationStage extends BaseStageTest { setupLiveInstances(5); - event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap); + event.addAttribute(AttributeName.RESOURCES.name(), resourceMap); CurrentStateComputationStage stage = new CurrentStateComputationStage(); runStage(event, new ReadClusterDataStage()); runStage(event, stage); - CurrentStateOutput output1 = event.getAttribute(AttributeName.CURRENT_STATE.toString()); + CurrentStateOutput output1 = event.getAttribute(AttributeName.CURRENT_STATE.name()); AssertJUnit.assertEquals( output1.getCurrentStateMap("testResourceName", new Partition("testResourceName_0")).size(), 0); @@ -79,7 +79,7 @@ public class TestCurrentStateComputationStage extends BaseStageTest { runStage(event, new ReadClusterDataStage()); runStage(event, stage); - CurrentStateOutput output2 = event.getAttribute(AttributeName.CURRENT_STATE.toString()); + CurrentStateOutput output2 = event.getAttribute(AttributeName.CURRENT_STATE.name()); String pendingState = output2.getPendingState("testResourceName", new Partition("testResourceName_1"), "localhost_3").getToState(); @@ -104,7 +104,7 @@ public class TestCurrentStateComputationStage extends BaseStageTest { stateWithDeadSession); runStage(event, new ReadClusterDataStage()); runStage(event, stage); - CurrentStateOutput output3 = event.getAttribute(AttributeName.CURRENT_STATE.toString()); + CurrentStateOutput output3 = event.getAttribute(AttributeName.CURRENT_STATE.name()); String currentState = output3.getCurrentState("testResourceName", new Partition("testResourceName_1"), "localhost_3"); http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java index 3a321cc..965e0de 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java @@ -114,12 +114,12 @@ public class TestMessageThrottleStage extends ZkUnitTestBase { selectMessages.add(msg); msgSelectOutput.addMessages("TestDB", new Partition("TestDB_0"), selectMessages); - event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), msgSelectOutput); + event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), msgSelectOutput); runStage(event, throttleStage); MessageThrottleStageOutput msgThrottleOutput = - event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString()); + event.getAttribute(AttributeName.MESSAGES_THROTTLE.name()); Assert.assertEquals(msgThrottleOutput.getMessages("TestDB", new Partition("TestDB_0")).size(), 1); @@ -298,12 +298,12 @@ public class TestMessageThrottleStage extends ZkUnitTestBase { selectMessages.add(msg6); // should be throttled msgSelectOutput.addMessages("TestDB", new Partition("TestDB_0"), selectMessages); - event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), msgSelectOutput); + event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), msgSelectOutput); runStage(event, throttleStage); MessageThrottleStageOutput msgThrottleOutput = - event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString()); + event.getAttribute(AttributeName.MESSAGES_THROTTLE.name()); List<Message> throttleMessages = msgThrottleOutput.getMessages("TestDB", new Partition("TestDB_0")); Assert.assertEquals(throttleMessages.size(), 4); http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java index 18abf75..a6863ca 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java @@ -97,7 +97,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase { runPipeline(event, dataRefresh); runPipeline(event, rebalancePipeline); MessageSelectionStageOutput msgSelOutput = - event.getAttribute(AttributeName.MESSAGES_SELECTED.toString()); + event.getAttribute(AttributeName.MESSAGES_SELECTED.name()); List<Message> messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0")); Assert.assertEquals(messages.size(), 1, "Should output 1 message: OFFLINE-SLAVE for node0"); @@ -113,7 +113,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase { runPipeline(event, dataRefresh); runPipeline(event, rebalancePipeline); - msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString()); + msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name()); messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0")); Assert.assertEquals(messages.size(), 0, "Should NOT output 1 message: SLAVE-MASTER for node1"); @@ -249,7 +249,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase { runPipeline(event, dataRefresh); runPipeline(event, rebalancePipeline); MessageSelectionStageOutput msgSelOutput = - event.getAttribute(AttributeName.MESSAGES_SELECTED.toString()); + event.getAttribute(AttributeName.MESSAGES_SELECTED.name()); List<Message> messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0")); Assert.assertEquals(messages.size(), 1, "Should output 1 message: OFFLINE-SLAVE for node0"); @@ -267,7 +267,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase { runPipeline(event, dataRefresh); runPipeline(event, rebalancePipeline); - msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString()); + msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name()); messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0")); Assert.assertEquals(messages.size(), 1, "Should output only 1 message: OFFLINE->DROPPED for localhost_1"); @@ -284,7 +284,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase { accessor.removeProperty(keyBuilder.message("localhost_0", msgIds.get(0))); runPipeline(event, dataRefresh); runPipeline(event, rebalancePipeline); - msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString()); + msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name()); messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0")); Assert.assertEquals(messages.size(), 1, "Should output 1 message: OFFLINE->DROPPED for localhost_0"); @@ -345,7 +345,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase { runPipeline(event, dataRefresh); runPipeline(event, rebalancePipeline); MessageSelectionStageOutput msgSelOutput = - event.getAttribute(AttributeName.MESSAGES_SELECTED.toString()); + event.getAttribute(AttributeName.MESSAGES_SELECTED.name()); List<Message> messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0")); Assert.assertEquals(messages.size(), 1, "Should output 1 message: SLAVE-MASTER for node1"); @@ -364,7 +364,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase { runPipeline(event, dataRefresh); runPipeline(event, rebalancePipeline); - msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString()); + msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name()); messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0")); Assert.assertEquals(messages.size(), 0, "Should NOT output 1 message: SLAVE-MASTER for node0"); http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java index dcb955c..87c0516 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java @@ -69,7 +69,7 @@ public class TestResourceComputationStage extends BaseStageTest { runStage(event, new ReadClusterDataStage()); runStage(event, stage); - Map<String, Resource> resource = event.getAttribute(AttributeName.RESOURCES.toString()); + Map<String, Resource> resource = event.getAttribute(AttributeName.RESOURCES.name()); AssertJUnit.assertEquals(1, resource.size()); AssertJUnit.assertEquals(resource.keySet().iterator().next(), resourceName); @@ -91,7 +91,7 @@ public class TestResourceComputationStage extends BaseStageTest { runStage(event, new ReadClusterDataStage()); runStage(event, stage); - Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString()); + Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name()); AssertJUnit.assertEquals(resources.length, resourceMap.size()); for (int i = 0; i < resources.length; i++) { @@ -157,7 +157,7 @@ public class TestResourceComputationStage extends BaseStageTest { runStage(event, new ReadClusterDataStage()); runStage(event, stage); - Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString()); + Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name()); // +1 because it will have one for current state AssertJUnit.assertEquals(resources.length + 1, resourceMap.size()); http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceValidationStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceValidationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceValidationStage.java index 15d7fd8..9c86372 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceValidationStage.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceValidationStage.java @@ -70,14 +70,14 @@ public class TestResourceValidationStage { // run resource computation new ResourceComputationStage().process(event); - Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString()); + Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name()); Assert.assertTrue(resourceMap.containsKey(masterSlaveCustomResource)); Assert.assertTrue(resourceMap.containsKey(onlineOfflineFullAutoResource)); Assert.assertTrue(resourceMap.containsKey(masterSlaveSemiAutoInvalidResource)); // run resource validation new ResourceValidationStage().process(event); - Map<String, Resource> finalResourceMap = event.getAttribute(AttributeName.RESOURCES.toString()); + Map<String, Resource> finalResourceMap = event.getAttribute(AttributeName.RESOURCES.name()); Assert.assertTrue(finalResourceMap.containsKey(masterSlaveCustomResource)); Assert.assertTrue(finalResourceMap.containsKey(onlineOfflineFullAutoResource)); Assert.assertFalse(finalResourceMap.containsKey(masterSlaveSemiAutoInvalidResource)); @@ -102,12 +102,12 @@ public class TestResourceValidationStage { // run resource computation new ResourceComputationStage().process(event); - Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString()); + Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name()); Assert.assertTrue(resourceMap.containsKey(masterSlaveCustomResource)); // run resource validation new ResourceValidationStage().process(event); - Map<String, Resource> finalResourceMap = event.getAttribute(AttributeName.RESOURCES.toString()); + Map<String, Resource> finalResourceMap = event.getAttribute(AttributeName.RESOURCES.name()); Assert.assertTrue(finalResourceMap.containsKey(masterSlaveCustomResource)); } @@ -132,13 +132,13 @@ public class TestResourceValidationStage { // run resource computation new ResourceComputationStage().process(event); - Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString()); + Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name()); Assert.assertTrue(resourceMap.containsKey(masterSlaveCustomResource)); Assert.assertTrue(resourceMap.containsKey(leaderStandbyCustomResource)); // run resource validation new ResourceValidationStage().process(event); - Map<String, Resource> finalResourceMap = event.getAttribute(AttributeName.RESOURCES.toString()); + Map<String, Resource> finalResourceMap = event.getAttribute(AttributeName.RESOURCES.name()); Assert.assertTrue(finalResourceMap.containsKey(masterSlaveCustomResource)); Assert.assertFalse(finalResourceMap.containsKey(leaderStandbyCustomResource)); } http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java index f9bbc94..d21706c 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java @@ -90,7 +90,6 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBase { new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName); participant.syncStart(); _participants[i] = participant; - } // start controller @@ -140,7 +139,7 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBase { ClusterSetup.processCommandLineArgs(command.split(" ")); TestHelper.verifyWithTimeout("verifyEmptyCurStateAndExtView", 30 * 1000, CLUSTER_NAME, "MyDB2", - TestHelper.<String> setOf("localhost_12918", "localhost_12919", "localhost_12920", + TestHelper.<String>setOf("localhost_12918", "localhost_12919", "localhost_12920", "localhost_12921", "localhost_12922"), ZK_ADDR); }
