Fix Semi-Auto State Transition This fix is for Semi-Auto state transition jump to top state when instance just connected back.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/2804cd02 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/2804cd02 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/2804cd02 Branch: refs/heads/master Commit: 2804cd025ad5e96cbed83574383cc15b4fe82c65 Parents: 5798bfd Author: Junkai Xue <[email protected]> Authored: Mon Sep 25 14:54:15 2017 -0700 Committer: Junkai Xue <[email protected]> Committed: Mon Sep 25 14:54:15 2017 -0700 ---------------------------------------------------------------------- .../rebalancer/AbstractRebalancer.java | 132 +++++++++++++++-- .../rebalancer/DelayedAutoRebalancer.java | 25 ++-- .../rebalancer/SemiAutoRebalancer.java | 107 ++++++++++++++ .../util/ConstraintBasedAssignment.java | 146 ------------------- .../helix/model/StateModelDefinition.java | 3 +- .../StrictMatchExternalViewVerifier.java | 4 +- .../rebalancer/TestAutoRebalanceStrategy.java | 7 +- .../mock/participant/MockDelayMSStateModel.java | 59 ++++++++ .../MockDelayMSStateModelFactory.java | 36 +++++ .../helix/task/TestSemiAutoStateTransition.java | 108 ++++++++++++++ 10 files changed, 445 insertions(+), 182 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/2804cd02/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java index 9cd2f96..fa106b6 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java @@ -19,15 +19,24 @@ package org.apache.helix.controller.rebalancer; * under the License. */ +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.helix.HelixDefinedState; import org.apache.helix.HelixException; import org.apache.helix.HelixManager; import org.apache.helix.controller.rebalancer.internal.MappingCalculator; import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy; import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy; -import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment; import org.apache.helix.controller.stages.ClusterDataCache; import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.model.IdealState; +import org.apache.helix.model.LiveInstance; import org.apache.helix.model.Partition; import org.apache.helix.model.Resource; import org.apache.helix.model.ResourceAssignment; @@ -35,13 +44,6 @@ import org.apache.helix.model.StateModelDefinition; import org.apache.helix.util.HelixUtil; import org.apache.log4j.Logger; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - /** * This is a abstract rebalancer that defines some default behaviors for Helix rebalancer * as well as all utility functions that will be used by all specific rebalancers. @@ -65,7 +67,7 @@ public abstract class AbstractRebalancer implements Rebalancer, MappingCalculato /** * Compute the best state for all partitions. - * This is the default ConstraintBasedAssignment implementation, subclasses should re-implement + * This is the default implementation, subclasses should re-implement * this method if its logic to generate bestpossible map for each partition is different from the default one here. * * @param cache @@ -89,13 +91,11 @@ public abstract class AbstractRebalancer implements Rebalancer, MappingCalculato currentStateOutput.getCurrentStateMap(resource.getResourceName(), partition); Set<String> disabledInstancesForPartition = cache.getDisabledInstancesForPartition(resource.getResourceName(), partition.toString()); - List<String> preferenceList = ConstraintBasedAssignment - .getPreferenceList(partition, idealState, - Collections.unmodifiableSet(cache.getLiveInstances().keySet())); + List<String> preferenceList = getPreferenceList(partition, idealState, + Collections.unmodifiableSet(cache.getLiveInstances().keySet())); Map<String, String> bestStateForPartition = - ConstraintBasedAssignment.computeAutoBestStateForPartition(cache, stateModelDef, - preferenceList, currentStateMap, disabledInstancesForPartition, - idealState.isEnabled()); + computeAutoBestStateForPartition(cache, stateModelDef, preferenceList, currentStateMap, + disabledInstancesForPartition, idealState.isEnabled()); partitionMapping.addReplicaMap(partition, bestStateForPartition); } return partitionMapping; @@ -155,4 +155,106 @@ public abstract class AbstractRebalancer implements Rebalancer, MappingCalculato return rebalanceStrategy; } + + /** + * compute best state for resource in AUTO ideal state mode + * @param cache + * @param stateModelDef + * @param instancePreferenceList + * @param currentStateMap + * : instance->state for each partition + * @param disabledInstancesForPartition + * @param isResourceEnabled + * @return + */ + public Map<String, String> computeAutoBestStateForPartition(ClusterDataCache cache, + StateModelDefinition stateModelDef, List<String> instancePreferenceList, + Map<String, String> currentStateMap, Set<String> disabledInstancesForPartition, + boolean isResourceEnabled) { + Map<String, String> instanceStateMap = new HashMap<String, String>(); + + if (currentStateMap != null) { + for (String instance : currentStateMap.keySet()) { + if (instancePreferenceList == null || !instancePreferenceList.contains(instance)) { + // The partition is dropped from preference list. + // Transit to DROPPED no matter the instance is disabled or not. + instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString()); + } else { + // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE) + if (disabledInstancesForPartition.contains(instance) || !isResourceEnabled) { + if (currentStateMap.get(instance) == null || !currentStateMap.get(instance) + .equals(HelixDefinedState.ERROR.name())) { + instanceStateMap.put(instance, stateModelDef.getInitialState()); + } + } + } + } + } + + // if the ideal state is deleted, instancePreferenceList will be empty and + // we should drop all resources. + if (instancePreferenceList == null) { + return instanceStateMap; + } + + List<String> statesPriorityList = stateModelDef.getStatesPriorityList(); + boolean assigned[] = new boolean[instancePreferenceList.size()]; + + Map<String, LiveInstance> liveInstancesMap = cache.getLiveInstances(); + + for (String state : statesPriorityList) { + String num = stateModelDef.getNumInstancesPerState(state); + int stateCount = -1; + if ("N".equals(num)) { + Set<String> liveAndEnabled = new HashSet<String>(liveInstancesMap.keySet()); + liveAndEnabled.removeAll(disabledInstancesForPartition); + stateCount = isResourceEnabled ? liveAndEnabled.size() : 0; + } else if ("R".equals(num)) { + stateCount = instancePreferenceList.size(); + } else { + try { + stateCount = Integer.parseInt(num); + } catch (Exception e) { + LOG.error("Invalid count for state:" + state + " ,count=" + num); + } + } + if (stateCount > -1) { + int count = 0; + for (int i = 0; i < instancePreferenceList.size(); i++) { + String instanceName = instancePreferenceList.get(i); + + boolean notInErrorState = + currentStateMap == null || currentStateMap.get(instanceName) == null + || !currentStateMap.get(instanceName).equals(HelixDefinedState.ERROR.toString()); + + boolean enabled = + !disabledInstancesForPartition.contains(instanceName) && isResourceEnabled; + if (liveInstancesMap.containsKey(instanceName) && !assigned[i] && notInErrorState + && enabled) { + instanceStateMap.put(instanceName, state); + count = count + 1; + assigned[i] = true; + if (count == stateCount) { + break; + } + } + } + } + } + return instanceStateMap; + } + + public static List<String> getPreferenceList(Partition partition, IdealState idealState, + Set<String> eligibleInstances) { + List<String> listField = idealState.getPreferenceList(partition.getPartitionName()); + + if (listField != null && listField.size() == 1 + && IdealState.IdealStateConstants.ANY_LIVEINSTANCE.toString().equals(listField.get(0))) { + List<String> prefList = new ArrayList<String>(eligibleInstances); + Collections.sort(prefList); + return prefList; + } else { + return listField; + } + } } http://git-wip-us.apache.org/repos/asf/helix/blob/2804cd02/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java index 9f52d4f..f8868a2 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java @@ -19,8 +19,15 @@ package org.apache.helix.controller.rebalancer; * under the License. */ +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.helix.ZNRecord; -import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment; import org.apache.helix.controller.rebalancer.util.RebalanceScheduler; import org.apache.helix.controller.stages.ClusterDataCache; import org.apache.helix.controller.stages.CurrentStateOutput; @@ -32,15 +39,6 @@ import org.apache.helix.model.ResourceAssignment; import org.apache.helix.model.StateModelDefinition; import org.apache.log4j.Logger; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - /** * This is the Full-Auto Rebalancer that is featured with delayed partition movement. */ @@ -319,10 +317,9 @@ public class DelayedAutoRebalancer extends AbstractRebalancer { currentStateOutput.getCurrentStateMap(resource.getResourceName(), partition); Set<String> disabledInstancesForPartition = cache.getDisabledInstancesForPartition(resource.getResourceName(), partition.toString()); - List<String> preferenceList = - ConstraintBasedAssignment.getPreferenceList(partition, idealState, activeNodes); - Map<String, String> bestStateForPartition = ConstraintBasedAssignment - .computeAutoBestStateForPartition(cache, stateModelDef, preferenceList, currentStateMap, + List<String> preferenceList = getPreferenceList(partition, idealState, activeNodes); + Map<String, String> bestStateForPartition = + computeAutoBestStateForPartition(cache, stateModelDef, preferenceList, currentStateMap, disabledInstancesForPartition, idealState.isEnabled()); if (preferenceList == null) { http://git-wip-us.apache.org/repos/asf/helix/blob/2804cd02/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java index 6ef93c1..a8fe2ec 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java @@ -18,9 +18,18 @@ package org.apache.helix.controller.rebalancer; * specific language governing permissions and limitations * under the License. */ +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.helix.HelixDefinedState; import org.apache.helix.controller.stages.ClusterDataCache; import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.model.IdealState; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.StateModelDefinition; import org.apache.log4j.Logger; /** @@ -40,4 +49,102 @@ public class SemiAutoRebalancer extends AbstractRebalancer { CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) { return currentIdealState; } + + @Override + public Map<String, String> computeAutoBestStateForPartition(ClusterDataCache cache, + StateModelDefinition stateModelDef, List<String> instancePreferenceList, + Map<String, String> currentStateMap, Set<String> disabledInstancesForPartition, + boolean isResourceEnabled) { + Map<String, String> instanceStateMap = new HashMap<String, String>(); + + if (currentStateMap != null) { + for (String instance : currentStateMap.keySet()) { + if (instancePreferenceList == null || !instancePreferenceList.contains(instance)) { + // The partition is dropped from preference list. + // Transit to DROPPED no matter the instance is disabled or not. + instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString()); + } else { + // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE) + if (disabledInstancesForPartition.contains(instance) || !isResourceEnabled) { + if (currentStateMap.get(instance) == null || !currentStateMap.get(instance) + .equals(HelixDefinedState.ERROR.name())) { + instanceStateMap.put(instance, stateModelDef.getInitialState()); + } + } + } + } + } + + // if the ideal state is deleted, instancePreferenceList will be empty and + // we should drop all resources. + if (instancePreferenceList == null) { + return instanceStateMap; + } + + List<String> statesPriorityList = stateModelDef.getStatesPriorityList(); + boolean assigned[] = new boolean[instancePreferenceList.size()]; + + Map<String, LiveInstance> liveInstancesMap = cache.getLiveInstances(); + Set<String> secondTopStates = stateModelDef.getSecondTopStates(); + String topState = statesPriorityList.get(0); + int occupiedTopState = 0; + for (String currentState : currentStateMap.values()) { + if (currentState.equals(topState)) { + occupiedTopState++; + } + } + + for (String state : statesPriorityList) { + String num = stateModelDef.getNumInstancesPerState(state); + int stateCount = -1; + if ("N".equals(num)) { + Set<String> liveAndEnabled = new HashSet<String>(liveInstancesMap.keySet()); + liveAndEnabled.removeAll(disabledInstancesForPartition); + stateCount = isResourceEnabled ? liveAndEnabled.size() : 0; + } else if ("R".equals(num)) { + stateCount = instancePreferenceList.size(); + } else { + try { + stateCount = Integer.parseInt(num); + } catch (Exception e) { + LOG.error("Invalid count for state:" + state + " ,count=" + num); + } + } + if (stateCount > -1) { + int count = 0; + for (int i = 0; i < instancePreferenceList.size(); i++) { + String instanceName = instancePreferenceList.get(i); + + boolean notInErrorState = + currentStateMap == null || currentStateMap.get(instanceName) == null + || !currentStateMap.get(instanceName).equals(HelixDefinedState.ERROR.toString()); + + boolean enabled = + !disabledInstancesForPartition.contains(instanceName) && isResourceEnabled; + + String currentState = + (currentStateMap == null || currentStateMap.get(instanceName) == null) + ? stateModelDef.getInitialState() : currentStateMap.get(instanceName); + if (liveInstancesMap.containsKey(instanceName) && !assigned[i] && notInErrorState + && enabled) { + // If target state is top state : 1. Still have extra top state count not assigned + // 2. Current state is is at second top state + // 3. Current state is at top state + if (state.equals(topState) && stateCount - occupiedTopState <= 0 && !currentState + .equals(topState) && !secondTopStates.contains(currentState)) { + continue; + } + instanceStateMap.put(instanceName, state); + count = count + 1; + assigned[i] = true; + occupiedTopState++; + if (count == stateCount) { + break; + } + } + } + } + } + return instanceStateMap; + } } http://git-wip-us.apache.org/repos/asf/helix/blob/2804cd02/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java deleted file mode 100644 index 91462b2..0000000 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java +++ /dev/null @@ -1,146 +0,0 @@ -package org.apache.helix.controller.rebalancer.util; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.helix.HelixDefinedState; -import org.apache.helix.controller.stages.ClusterDataCache; -import org.apache.helix.model.IdealState; -import org.apache.helix.model.LiveInstance; -import org.apache.helix.model.Partition; -import org.apache.helix.model.StateModelDefinition; -import org.apache.log4j.Logger; - -/** - * Collection of functions that will compute the best possible states given the live instances and - * an ideal state. - */ -public class ConstraintBasedAssignment { - private static Logger logger = Logger.getLogger(ConstraintBasedAssignment.class); - - public static List<String> getPreferenceList(Partition partition, IdealState idealState, - Set<String> eligibleInstances) { - List<String> listField = idealState.getPreferenceList(partition.getPartitionName()); - - if (listField != null && listField.size() == 1 - && IdealState.IdealStateConstants.ANY_LIVEINSTANCE.toString().equals(listField.get(0))) { - List<String> prefList = new ArrayList<String>(eligibleInstances); - Collections.sort(prefList); - return prefList; - } else { - return listField; - } - } - - /** - * compute best state for resource in AUTO ideal state mode - * @param cache - * @param stateModelDef - * @param instancePreferenceList - * @param currentStateMap - * : instance->state for each partition - * @param disabledInstancesForPartition - * @param isResourceEnabled - * @return - */ - public static Map<String, String> computeAutoBestStateForPartition(ClusterDataCache cache, - StateModelDefinition stateModelDef, List<String> instancePreferenceList, - Map<String, String> currentStateMap, Set<String> disabledInstancesForPartition, - boolean isResourceEnabled) { - Map<String, String> instanceStateMap = new HashMap<String, String>(); - - if (currentStateMap != null) { - for (String instance : currentStateMap.keySet()) { - if (instancePreferenceList == null || !instancePreferenceList.contains(instance)) { - // The partition is dropped from preference list. - // Transit to DROPPED no matter the instance is disabled or not. - instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString()); - } else { - // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE) - if (disabledInstancesForPartition.contains(instance) || !isResourceEnabled) { - if (currentStateMap.get(instance) == null || !currentStateMap.get(instance) - .equals(HelixDefinedState.ERROR.name())) { - instanceStateMap.put(instance, stateModelDef.getInitialState()); - } - } - } - } - } - - // if the ideal state is deleted, instancePreferenceList will be empty and - // we should drop all resources. - if (instancePreferenceList == null) { - return instanceStateMap; - } - - List<String> statesPriorityList = stateModelDef.getStatesPriorityList(); - boolean assigned[] = new boolean[instancePreferenceList.size()]; - - Map<String, LiveInstance> liveInstancesMap = cache.getLiveInstances(); - - for (String state : statesPriorityList) { - String num = stateModelDef.getNumInstancesPerState(state); - int stateCount = -1; - if ("N".equals(num)) { - Set<String> liveAndEnabled = new HashSet<String>(liveInstancesMap.keySet()); - liveAndEnabled.removeAll(disabledInstancesForPartition); - stateCount = isResourceEnabled ? liveAndEnabled.size() : 0; - } else if ("R".equals(num)) { - stateCount = instancePreferenceList.size(); - } else { - try { - stateCount = Integer.parseInt(num); - } catch (Exception e) { - logger.error("Invalid count for state:" + state + " ,count=" + num); - } - } - if (stateCount > -1) { - int count = 0; - for (int i = 0; i < instancePreferenceList.size(); i++) { - String instanceName = instancePreferenceList.get(i); - - boolean notInErrorState = - currentStateMap == null || currentStateMap.get(instanceName) == null - || !currentStateMap.get(instanceName).equals(HelixDefinedState.ERROR.toString()); - - boolean enabled = - !disabledInstancesForPartition.contains(instanceName) && isResourceEnabled; - if (liveInstancesMap.containsKey(instanceName) && !assigned[i] && notInErrorState - && enabled) { - instanceStateMap.put(instanceName, state); - count = count + 1; - assigned[i] = true; - if (count == stateCount) { - break; - } - } - } - } - } - return instanceStateMap; - } -} http://git-wip-us.apache.org/repos/asf/helix/blob/2804cd02/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java index d86e286..ce2d3d9 100644 --- a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java +++ b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java @@ -198,7 +198,8 @@ public class StateModelDefinition extends HelixProperty { String topState = _statesPriorityList.get(0); for (String state : _stateTransitionTable.keySet()) { Map<String, String> transitionMap = _stateTransitionTable.get(state); - if (transitionMap.containsKey(topState) && transitionMap.get(topState).equals(topState)) { + if (transitionMap != null && transitionMap.containsKey(topState) && transitionMap + .get(topState).equals(topState)) { secondTopStates.add(state); } } http://git-wip-us.apache.org/repos/asf/helix/blob/2804cd02/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java index be79444..1602f9b 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java +++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java @@ -20,7 +20,7 @@ package org.apache.helix.tools.ClusterVerifiers; */ import org.apache.helix.PropertyKey; -import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment; +import org.apache.helix.controller.rebalancer.AbstractRebalancer; import org.apache.helix.controller.stages.ClusterDataCache; import org.apache.helix.manager.zk.ZkClient; import org.apache.helix.model.ExternalView; @@ -266,7 +266,7 @@ public class StrictMatchExternalViewVerifier extends ZkHelixClusterVerifier { liveEnabledInstances.removeAll(cache.getDisabledInstances()); for (String partition : idealState.getPartitionSet()) { - List<String> preferenceList = ConstraintBasedAssignment + List<String> preferenceList = AbstractRebalancer .getPreferenceList(new Partition(partition), idealState, liveEnabledInstances); Map<String, String> idealMapping = computeIdealMapping(preferenceList, stateModelDef, liveEnabledInstances); http://git-wip-us.apache.org/repos/asf/helix/blob/2804cd02/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java index 2fb914c..83aded6 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java @@ -40,7 +40,6 @@ import org.apache.helix.PropertyKey.Builder; import org.apache.helix.ZNRecord; import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy; import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy; -import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment; import org.apache.helix.controller.stages.ClusterDataCache; import org.apache.helix.model.LiveInstance; import org.apache.helix.model.StateModelDefinition; @@ -226,9 +225,9 @@ public class TestAutoRebalanceStrategy { List<String> preferenceList = listResult.get(partition); Map<String, String> currentStateMap = _currentMapping.get(partition); Set<String> disabled = Collections.emptySet(); - Map<String, String> assignment = - ConstraintBasedAssignment.computeAutoBestStateForPartition(cache, _stateModelDef, - preferenceList, currentStateMap, disabled, true); + Map<String, String> assignment = new AutoRebalancer() + .computeAutoBestStateForPartition(cache, _stateModelDef, preferenceList, + currentStateMap, disabled, true); mapResult.put(partition, assignment); } return mapResult; http://git-wip-us.apache.org/repos/asf/helix/blob/2804cd02/helix-core/src/test/java/org/apache/helix/mock/participant/MockDelayMSStateModel.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/MockDelayMSStateModel.java b/helix-core/src/test/java/org/apache/helix/mock/participant/MockDelayMSStateModel.java new file mode 100644 index 0000000..f1848a5 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/mock/participant/MockDelayMSStateModel.java @@ -0,0 +1,59 @@ +package org.apache.helix.mock.participant; + +import org.apache.helix.NotificationContext; +import org.apache.helix.model.Message; +import org.apache.helix.participant.statemachine.StateModel; +import org.apache.helix.participant.statemachine.StateModelInfo; +import org.apache.helix.participant.statemachine.Transition; +import org.apache.log4j.Logger; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +// mock delay master-slave state model +@StateModelInfo(initialState = "OFFLINE", states = { + "MASTER", "SLAVE", "ERROR" +}) +public class MockDelayMSStateModel extends StateModel { + private static Logger LOG = Logger.getLogger(MockDelayMSStateModel.class); + + private long _delay; + + public MockDelayMSStateModel(long delay) { + _delay = delay; + } + + @Transition(to = "SLAVE", from = "OFFLINE") + public void onBecomeSLAVEFromOffline(Message message, NotificationContext context) { + if (_delay > 0) { + try { + Thread.sleep(_delay); + } catch (InterruptedException e) { + LOG.error("Failed to sleep for " + _delay); + } + } + LOG.info("Become SLAVE from OFFLINE"); + } + + @Transition(to = "ONLINE", from = "SLAVE") + public void onBecomeMasterFromSlave(Message message, NotificationContext context) { + LOG.info("Become ONLINE from SLAVE"); + } + + +} http://git-wip-us.apache.org/repos/asf/helix/blob/2804cd02/helix-core/src/test/java/org/apache/helix/mock/participant/MockDelayMSStateModelFactory.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/MockDelayMSStateModelFactory.java b/helix-core/src/test/java/org/apache/helix/mock/participant/MockDelayMSStateModelFactory.java new file mode 100644 index 0000000..b6cf55e --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/mock/participant/MockDelayMSStateModelFactory.java @@ -0,0 +1,36 @@ +package org.apache.helix.mock.participant; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.participant.statemachine.StateModelFactory; + +public class MockDelayMSStateModelFactory extends StateModelFactory<MockDelayMSStateModel> { + private long _delay; + @Override + public MockDelayMSStateModel createNewStateModel(String resourceName, String partitionKey) { + MockDelayMSStateModel model = new MockDelayMSStateModel(_delay); + return model; + } + + public MockDelayMSStateModelFactory setDelay(long delay) { + _delay = delay; + return this; + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/2804cd02/helix-core/src/test/java/org/apache/helix/task/TestSemiAutoStateTransition.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/task/TestSemiAutoStateTransition.java b/helix-core/src/test/java/org/apache/helix/task/TestSemiAutoStateTransition.java new file mode 100644 index 0000000..4a62cca --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/task/TestSemiAutoStateTransition.java @@ -0,0 +1,108 @@ +package org.apache.helix.task; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Map; + +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.PropertyKey; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.integration.task.TaskTestBase; +import org.apache.helix.integration.task.WorkflowGenerator; +import org.apache.helix.manager.zk.ZKHelixDataAccessor; +import org.apache.helix.mock.participant.MockDelayMSStateModelFactory; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.helix.participant.StateMachineEngine; +import org.apache.helix.tools.ClusterSetup; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestSemiAutoStateTransition extends TaskTestBase { + + protected HelixDataAccessor _accessor; + protected PropertyKey.Builder _keyBuilder; + + @BeforeClass + public void beforeClass() throws Exception { + _numParitions = 1; + + String namespace = "/" + CLUSTER_NAME; + if (_gZkClient.exists(namespace)) { + _gZkClient.deleteRecursive(namespace); + } + + _setupTool = new ClusterSetup(ZK_ADDR); + _setupTool.addCluster(CLUSTER_NAME, true); + _accessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor); + _keyBuilder = _accessor.keyBuilder(); + setupParticipants(); + + for (int i = 0; i < _numDbs; i++) { + String db = WorkflowGenerator.DEFAULT_TGT_DB + i; + _setupTool.addResourceToCluster(CLUSTER_NAME, db, _numParitions, MASTER_SLAVE_STATE_MODEL, + IdealState.RebalanceMode.SEMI_AUTO.toString()); + _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _numReplicas); + _testDbs.add(db); + } + + startParticipants(); + // start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + Thread.sleep(2000L); + + createManagers(); + } + + @Test public void testOfflineToSecondTopState() throws Exception { + _participants[0].syncStop(); + Thread.sleep(2000L); + + ExternalView externalView = + _accessor.getProperty(_keyBuilder.externalView(WorkflowGenerator.DEFAULT_TGT_DB + "0")); + Map<String, String> stateMap = + externalView.getStateMap(WorkflowGenerator.DEFAULT_TGT_DB + "0_0"); + Assert.assertEquals("MASTER", stateMap.get(PARTICIPANT_PREFIX + "_" + (_startPort + 1))); + Assert.assertEquals("SLAVE", stateMap.get(PARTICIPANT_PREFIX + "_" + (_startPort + 2))); + + String instanceName = PARTICIPANT_PREFIX + "_" + _startPort; + _participants[0] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName); + + // add a state model with non-OFFLINE initial state + StateMachineEngine stateMach = _participants[0].getStateMachineEngine(); + MockDelayMSStateModelFactory delayFactory = + new MockDelayMSStateModelFactory().setDelay(300000L); + stateMach.registerStateModelFactory(MASTER_SLAVE_STATE_MODEL, delayFactory); + + _participants[0].syncStart(); + Thread.sleep(2000L); + + externalView = + _accessor.getProperty(_keyBuilder.externalView(WorkflowGenerator.DEFAULT_TGT_DB + "0")); + stateMap = externalView.getStateMap(WorkflowGenerator.DEFAULT_TGT_DB + "0_0"); + Assert.assertEquals("OFFLINE", stateMap.get(PARTICIPANT_PREFIX + "_" + _startPort)); + Assert.assertEquals("MASTER", stateMap.get(PARTICIPANT_PREFIX + "_" + (_startPort + 1))); + Assert.assertEquals("SLAVE", stateMap.get(PARTICIPANT_PREFIX + "_" + (_startPort + 2))); + } +}
