Fix no master replica when all of replica in new instances turn to ERROR state when migrating existing replicas to all new instances in DelayedAutoRebalancer.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/bf181daf Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/bf181daf Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/bf181daf Branch: refs/heads/master Commit: bf181daf8761d91fd4b8aeb03f8e99b9c8bb097a Parents: fc04d53 Author: Lei Xia <l...@linkedin.com> Authored: Mon Sep 11 11:00:59 2017 -0700 Committer: Junkai Xue <j...@linkedin.com> Committed: Mon Nov 6 17:07:38 2017 -0800 ---------------------------------------------------------------------- .../rebalancer/AbstractRebalancer.java | 23 ++-- .../rebalancer/DelayedAutoRebalancer.java | 51 +++++-- .../stages/IntermediateStateCalcStage.java | 134 +++++-------------- .../stages/PersistAssignmentStage.java | 3 +- .../helix/model/StateModelDefinition.java | 37 ++++- .../rebalancer/TestAbstractRebalancer.java | 3 +- .../rebalancer/TestAutoRebalanceStrategy.java | 5 +- .../rebalancer/TestZeroReplicaAvoidance.java | 12 +- .../common/ZkIntegrationTestBase.java | 46 +++++++ .../manager/MockParticipantManager.java | 16 +-- .../rebalancer/TestDelayedAutoRebalance.java | 56 +------- ...elayedAutoRebalanceWithDisabledInstance.java | 26 ++-- .../rebalancer/TestMixedModeAutoRebalance.java | 91 +++++++++++++ .../TestDelayedAutoRebalancer.MasterSlave.json | 3 +- ...TestDelayedAutoRebalancer.OnlineOffline.json | 6 +- 15 files changed, 303 insertions(+), 209 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/bf181daf/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java index 94908ba..7f79f7f 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java @@ -96,7 +96,7 @@ public abstract class AbstractRebalancer implements Rebalancer, MappingCalculato Collections.unmodifiableSet(cache.getLiveInstances().keySet())); Map<String, String> bestStateForPartition = computeBestPossibleStateForPartition(cache.getLiveInstances().keySet(), stateModelDef, preferenceList, - currentStateMap, disabledInstancesForPartition, idealState.isEnabled()); + currentStateMap, disabledInstancesForPartition, idealState); partitionMapping.addReplicaMap(partition, bestStateForPartition); } return partitionMapping; @@ -160,7 +160,7 @@ public abstract class AbstractRebalancer implements Rebalancer, MappingCalculato protected Map<String, String> computeBestPossibleStateForPartition(Set<String> liveInstances, StateModelDefinition stateModelDef, List<String> preferenceList, Map<String, String> currentStateMap, Set<String> disabledInstancesForPartition, - boolean isResourceEnabled) { + IdealState idealState) { if (currentStateMap == null) { currentStateMap = Collections.emptyMap(); @@ -173,7 +173,7 @@ public abstract class AbstractRebalancer implements Rebalancer, MappingCalculato } // (2) If resource disabled altogether, transit to initial-state (e.g. OFFLINE) if it's not in ERROR. - if (!isResourceEnabled) { + if (!idealState.isEnabled()) { return computeBestPossibleMapForDisabledResource(currentStateMap, stateModelDef); } @@ -243,7 +243,7 @@ public abstract class AbstractRebalancer implements Rebalancer, MappingCalculato protected Map<String, String> computeBestPossibleMap(List<String> preferenceList, StateModelDefinition stateModelDef, Map<String, String> currentStateMap, Set<String> liveInstances, Set<String> disabledInstancesForPartition) { - Map<String, String> bestPossibleStateMap = new HashMap<String, String>(); + Map<String, String> bestPossibleStateMap = new HashMap<>(); // (1) Instances that have current state but not in preference list, drop, no matter it's disabled or not. for (String instance : currentStateMap.keySet()) { @@ -276,8 +276,8 @@ public abstract class AbstractRebalancer implements Rebalancer, MappingCalculato // To achieve that, we sort the preferenceList based on CurrentState, by treating top-state and second-states with // same priority and rely on the fact that Collections.sort() is stable. List<String> statesPriorityList = stateModelDef.getStatesPriorityList(); - Set<String> assigned = new HashSet<String>(); - Set<String> liveAndEnabled = new HashSet<String>(liveInstances); + Set<String> assigned = new HashSet<>(); + Set<String> liveAndEnabled = new HashSet<>(liveInstances); liveAndEnabled.removeAll(disabledInstancesForPartition); for (String state : statesPriorityList) { @@ -294,11 +294,14 @@ public abstract class AbstractRebalancer implements Rebalancer, MappingCalculato if (stateCount <= 0) { break; } - boolean inError = HelixDefinedState.ERROR.toString().equals(currentStateMap.get(instance)); - if (!assigned.contains(instance) && liveAndEnabled.contains(instance) && !inError) { - bestPossibleStateMap.put(instance, state); + if (!assigned.contains(instance) && liveAndEnabled.contains(instance)) { + if (HelixDefinedState.ERROR.toString().equals(currentStateMap.get(instance))) { + bestPossibleStateMap.put(instance, HelixDefinedState.ERROR.toString()); + } else { + bestPossibleStateMap.put(instance, state); + stateCount--; + } assigned.add(instance); - stateCount--; } } } http://git-wip-us.apache.org/repos/asf/helix/blob/bf181daf/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java index bbbdda0..ea9678e 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java @@ -385,7 +385,7 @@ public class DelayedAutoRebalancer extends AbstractRebalancer { List<String> preferenceList = getPreferenceList(partition, idealState, activeNodes); Map<String, String> bestStateForPartition = computeBestPossibleStateForPartition(liveNodes, stateModelDef, preferenceList, currentStateMap, - disabledInstancesForPartition, idealState.isEnabled()); + disabledInstancesForPartition, idealState); partitionMapping.addReplicaMap(partition, bestStateForPartition); } @@ -414,14 +414,14 @@ public class DelayedAutoRebalancer extends AbstractRebalancer { * @param currentStateMap * : instance->state for each partition * @param disabledInstancesForPartition - * @param isResourceEnabled + * @param idealState * @return */ @Override protected Map<String, String> computeBestPossibleStateForPartition(Set<String> liveInstances, StateModelDefinition stateModelDef, List<String> preferenceList, Map<String, String> currentStateMap, Set<String> disabledInstancesForPartition, - boolean isResourceEnabled) { + IdealState idealState) { if (currentStateMap == null) { currentStateMap = Collections.emptyMap(); @@ -434,7 +434,7 @@ public class DelayedAutoRebalancer extends AbstractRebalancer { } // (2) If resource disabled altogether, transit to initial-state (e.g. OFFLINE) if it's not in ERROR. - if (!isResourceEnabled) { + if (!idealState.isEnabled()) { return computeBestPossibleMapForDisabledResource(currentStateMap, stateModelDef); } @@ -442,7 +442,7 @@ public class DelayedAutoRebalancer extends AbstractRebalancer { List<String> instancesToMove = new ArrayList<String>(currentStateMap.keySet()); instancesToMove.removeAll(preferenceList); - Set<String> instancesToDrop = new HashSet<String>(); + Set<String> instancesToDrop = new HashSet<>(); Iterator<String> it = instancesToMove.iterator(); while (it.hasNext()) { String instance = it.next(); @@ -471,13 +471,11 @@ public class DelayedAutoRebalancer extends AbstractRebalancer { bestPossibleStateMap.put(instance, HelixDefinedState.DROPPED.name()); } - // The eventual goal is to have all instances in preferenceList all in the target states as specified in bestPossibleStateMap. - Map<String, String> targetInstanceMap = new HashMap<String, String>(bestPossibleStateMap); + // If the load-balance finishes (all replica are migrated to new instances), + // we should drop all partitions from previous assigned instances. + Map<String, String> targetInstanceMap = new HashMap<>(currentStateMap); targetInstanceMap.keySet().retainAll(preferenceList); - - // Once currentStateMap contains all required target instances and states, the - // load-balance finishes, we should drop all partitions from previous assigned instances. - if (currentStateMap.entrySet().containsAll(targetInstanceMap.entrySet())) { + if (migrationCompleted(preferenceList, stateModelDef, targetInstanceMap, idealState)) { for (String instance : currentStateMap.keySet()) { if (!preferenceList.contains(instance)) { String state = currentStateMap.get(instance); @@ -491,6 +489,37 @@ public class DelayedAutoRebalancer extends AbstractRebalancer { return bestPossibleStateMap; } + private boolean migrationCompleted(List<String> preferenceList, + StateModelDefinition stateModelDef, Map<String, String> currentStateMap, + IdealState idealState) { + if (preferenceList == null) { + preferenceList = Collections.emptyList(); + } + + int replica = idealState.getReplicaCount(preferenceList.size()); + LinkedHashMap<String, Integer> bestPossileStateCountMap = + stateModelDef.getStateCountMap(preferenceList.size(), replica); + Map<String, Integer> currentStateCounts = StateModelDefinition.getStateCounts(currentStateMap); + + for (String state : bestPossileStateCountMap.keySet()) { + if (state.equals(HelixDefinedState.DROPPED.name()) || + state.equals(HelixDefinedState.ERROR.name()) || + state.equals(stateModelDef.getInitialState())) { + continue; + } + + Integer bestPossibleCount = bestPossileStateCountMap.get(state); + Integer currentCount = currentStateCounts.get(state); + bestPossibleCount = bestPossibleCount == null ? 0 : bestPossibleCount; + currentCount = currentCount == null ? 0 : currentCount; + if (currentCount < bestPossibleCount) { + return false; + } + } + + return true; + } + /** * Sorter for nodes that sorts according to the CurrentState of the partition, based on the state priority defined http://git-wip-us.apache.org/repos/asf/helix/blob/bf181daf/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java index 1b4d3f3..6e2513d 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java @@ -227,8 +227,8 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { PartitionStateMap intermediatePartitionStateMap = new PartitionStateMap(resourceName); - Set<Partition> partitionsNeedRecovery = new HashSet<Partition>(); - Set<Partition> partitionsNeedLoadbalance = new HashSet<Partition>(); + Set<Partition> partitionsNeedRecovery = new HashSet<>(); + Set<Partition> partitionsNeedLoadbalance = new HashSet<>(); for (Partition partition : resource.getPartitions()) { Map<String, String> currentStateMap = currentStateOutput.getCurrentStateMap(resourceName, partition); @@ -245,32 +245,30 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { partitionsNeedLoadbalance.add(partition); } else { // no rebalance needed. - Map<String, String> intermediateMap = new HashMap<String, String>(bestPossibleMap); + Map<String, String> intermediateMap = new HashMap<>(bestPossibleMap); intermediatePartitionStateMap.setState(partition, intermediateMap); } } - if (logger.isDebugEnabled()) { - logger.debug( - "recovery balance needed for " + resource + " partitions: " + partitionsNeedRecovery); - logger.debug( - "load balance needed for " + resource + " partitions: " + partitionsNeedLoadbalance); - } + logger.info( + "recovery balance needed for " + resourceName + " partitions: " + partitionsNeedRecovery); + logger.info( + "load balance needed for " + resourceName + " partitions: " + partitionsNeedLoadbalance); chargePendingTransition(resource, currentStateOutput, throttleController, partitionsNeedRecovery, partitionsNeedLoadbalance); // perform recovery rebalance - int recoveryRebalanceThrottledCount = + Set<Partition> recoveryThrottledPartitions = recoveryRebalance(resource, bestPossiblePartitionStateMap, throttleController, intermediatePartitionStateMap, partitionsNeedRecovery, currentStateOutput, cache.getStateModelDef(resource.getStateModelDefRef()).getTopState()); - int loadRebalanceThrottledCount = partitionsNeedLoadbalance.size(); + Set<Partition> loadbalanceThrottledPartitions = partitionsNeedLoadbalance; if (partitionsNeedRecovery.isEmpty()) { // perform load balance only if no partition need recovery rebalance. // TODO: to set a minimal threshold for allowing load-rebalance. - loadRebalanceThrottledCount = + loadbalanceThrottledPartitions = loadRebalance(resource, currentStateOutput, bestPossiblePartitionStateMap, throttleController, intermediatePartitionStateMap, partitionsNeedLoadbalance, currentStateOutput.getCurrentStateMap(resourceName)); @@ -284,8 +282,15 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { if (clusterStatusMonitor != null) { clusterStatusMonitor.updateRebalancerStats(resourceName, partitionsNeedRecovery.size(), - partitionsNeedLoadbalance.size(), recoveryRebalanceThrottledCount, - loadRebalanceThrottledCount); + partitionsNeedLoadbalance.size(), recoveryThrottledPartitions.size(), + loadbalanceThrottledPartitions.size()); + } + + if (logger.isDebugEnabled()) { + logParitionMapState(resourceName, new HashSet<>(resource.getPartitions()), + partitionsNeedRecovery, recoveryThrottledPartitions, partitionsNeedLoadbalance, + loadbalanceThrottledPartitions, currentStateOutput, bestPossiblePartitionStateMap, + intermediatePartitionStateMap); } logger.debug("End processing resource:" + resourceName); @@ -333,10 +338,10 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { /** * Perform any recovery balance if needed, fill intermediatePartitionStateMap * if recover rebalance is needed. - * return the number of partitions needs recoveryRebalance but get throttled + * return the partitions needs recoveryRebalance but get throttled */ - public int recoveryRebalance(Resource resource, PartitionStateMap bestPossiblePartitionStateMap, + public Set<Partition> recoveryRebalance(Resource resource, PartitionStateMap bestPossiblePartitionStateMap, StateTransitionThrottleController throttleController, PartitionStateMap intermediatePartitionStateMap, Set<Partition> partitionsNeedRecovery, CurrentStateOutput currentStateOutput, String topState) { @@ -366,14 +371,14 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { intermediatePartitionStateMap, RebalanceType.RECOVERY_BALANCE); } - logger.debug(String + logger.info(String .format("needRecovery: %d, recoverybalanceThrottled: %d", partitionsNeedRecovery.size(), partitionRecoveryBalanceThrottled.size())); - return partitionRecoveryBalanceThrottled.size(); + return partitionRecoveryBalanceThrottled; } - /* return the number of partitions needs loadRebalance but get throttled */ - private int loadRebalance(Resource resource, CurrentStateOutput currentStateOutput, + /* return the partitions needs loadRebalance but get throttled */ + private Set<Partition> loadRebalance(Resource resource, CurrentStateOutput currentStateOutput, PartitionStateMap bestPossiblePartitionStateMap, StateTransitionThrottleController throttleController, PartitionStateMap intermediatePartitionStateMap, Set<Partition> partitionsNeedLoadbalance, @@ -382,7 +387,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { Set<Partition> partitionsLoadbalanceThrottled = new HashSet<Partition>(); List<Partition> partitionsNeedLoadRebalancePrioritized = - new ArrayList<Partition>(partitionsNeedLoadbalance); + new ArrayList<>(partitionsNeedLoadbalance); // TODO : Due to currently using JAVA 1.6, the original order of partitions list is not // determinable, sort the list by partition name and remove the code after bump to JAVA 1.8 @@ -408,11 +413,11 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { partitionsLoadbalanceThrottled.size())); if (logger.isDebugEnabled()) { - logger.debug("recovery balance throttled for " + resource + " partitions: " + logger.debug("recovery balance throttled for " + resourceName + " partitions: " + partitionsLoadbalanceThrottled); } - return partitionsLoadbalanceThrottled.size(); + return partitionsLoadbalanceThrottled; } private void throtteStateTransitions(StateTransitionThrottleController throttleController, @@ -482,12 +487,12 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { } int replica = idealState.getReplicaCount(preferenceList.size()); - Set<String> activeList = new HashSet<String>(preferenceList); + Set<String> activeList = new HashSet<>(preferenceList); activeList.retainAll(cache.getEnabledLiveInstances()); LinkedHashMap<String, Integer> bestPossileStateCountMap = - getBestPossibleStateCountMap(stateModelDef, activeList.size(), replica); - Map<String, Integer> currentStateCounts = getStateCounts(currentStateMap); + stateModelDef.getStateCountMap(activeList.size(), replica); + Map<String, Integer> currentStateCounts = StateModelDefinition.getStateCounts(currentStateMap); for (String state : bestPossileStateCountMap.keySet()) { Integer bestPossibleCount = bestPossileStateCountMap.get(state); @@ -506,66 +511,6 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { return RebalanceType.LOAD_BALANCE; } - private LinkedHashMap<String, Integer> getBestPossibleStateCountMap( - StateModelDefinition stateModelDef, int candidateNodeNum, int totalReplicas) { - LinkedHashMap<String, Integer> stateCountMap = new LinkedHashMap<String, Integer>(); - List<String> statesPriorityList = stateModelDef.getStatesPriorityList(); - - int replicas = totalReplicas; - for (String state : statesPriorityList) { - String num = stateModelDef.getNumInstancesPerState(state); - if (candidateNodeNum <= 0) { - break; - } - if ("N".equals(num)) { - stateCountMap.put(state, candidateNodeNum); - replicas -= candidateNodeNum; - break; - } else if ("R".equals(num)) { - // wait until we get the counts for all other states - continue; - } else { - int stateCount = -1; - try { - stateCount = Integer.parseInt(num); - } catch (Exception e) { - } - - if (stateCount > 0) { - int count = stateCount <= candidateNodeNum ? stateCount : candidateNodeNum; - candidateNodeNum -= count; - stateCountMap.put(state, count); - replicas -= count; - } - } - } - - // get state count for R - for (String state : statesPriorityList) { - String num = stateModelDef.getNumInstancesPerState(state); - if ("R".equals(num)) { - if (candidateNodeNum > 0 && replicas > 0) { - stateCountMap.put(state, replicas < candidateNodeNum ? replicas : candidateNodeNum); - } - // should have at most one state using R - break; - } - } - return stateCountMap; - } - - /* given instance->state map, return the state counts */ - private Map<String, Integer> getStateCounts(Map<String, String> stateMap) { - Map<String, Integer> stateCounts = new HashMap<String, Integer>(); - for (String state : stateMap.values()) { - if (!stateCounts.containsKey(state)) { - stateCounts.put(state, 0); - } - stateCounts.put(state, stateCounts.get(state) + 1); - } - return stateCounts; - } - private void logParitionMapState(String resource, Set<Partition> allPartitions, Set<Partition> recoveryPartitions, Set<Partition> recoveryThrottledPartitions, Set<Partition> loadbalancePartitions, Set<Partition> loadbalanceThrottledPartitions, @@ -579,23 +524,6 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { + "\nPartitions get throttled on load-balance: " + loadbalanceThrottledPartitions); for (Partition partition : allPartitions) { - if (recoveryPartitions.contains(partition)) { - logger - .debug("recovery balance needed for " + resource + " " + partition.getPartitionName()); - if (recoveryThrottledPartitions.contains(partition)) { - logger.debug("Recovery balance throttled on resource for " + resource + " " + partition - .getPartitionName()); - } - } else if (loadbalancePartitions.contains(partition)) { - logger.debug("load balance needed for " + resource + " " + partition.getPartitionName()); - if (loadbalanceThrottledPartitions.contains(partition)) { - logger.debug("Load balance throttled on resource for " + resource + " " + partition - .getPartitionName()); - } - } else { - logger.debug("no balance needed for " + resource + " " + partition.getPartitionName()); - } - logger.debug( partition + ": Best possible map: " + bestPossibleStateMap.getPartitionMap(partition)); logger.debug(partition + ": Current State: " + currentStateOutput http://git-wip-us.apache.org/repos/asf/helix/blob/bf181daf/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java index bd39a83..d969405 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java @@ -193,7 +193,8 @@ public class PersistAssignmentStage extends AbstractBaseStage { IdealState idealState, Map<Partition, Map<String, String>> assignments) { String stateModelDef = idealState.getStateModelDefRef(); /** Only convert for MasterSlave resources */ - if (!stateModelDef.equals(BuiltInStateModelDefinitions.MasterSlave.name())) { + if (!stateModelDef.equals(BuiltInStateModelDefinitions.MasterSlave.name()) || idealState + .getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO)) { return assignments; } http://git-wip-us.apache.org/repos/asf/helix/blob/bf181daf/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java index 17ff9de..01a3746 100644 --- a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java +++ b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java @@ -427,14 +427,19 @@ public class StateModelDefinition extends HelixProperty { * @return state count map: state->count */ public LinkedHashMap<String, Integer> getStateCountMap(int candidateNodeNum, int totalReplicas) { - LinkedHashMap<String, Integer> stateCountMap = new LinkedHashMap<String, Integer>(); + LinkedHashMap<String, Integer> stateCountMap = new LinkedHashMap<>(); List<String> statesPriorityList = getStatesPriorityList(); int replicas = totalReplicas; for (String state : statesPriorityList) { String num = getNumInstancesPerState(state); + if (candidateNodeNum <= 0) { + break; + } if ("N".equals(num)) { stateCountMap.put(state, candidateNodeNum); + replicas -= candidateNodeNum; + break; } else if ("R".equals(num)) { // wait until we get the counts for all other states continue; @@ -443,13 +448,13 @@ public class StateModelDefinition extends HelixProperty { try { stateCount = Integer.parseInt(num); } catch (Exception e) { - // LOG.error("Invalid count for state: " + state + ", count: " + num + - // ", use -1 instead"); } if (stateCount > 0) { - stateCountMap.put(state, stateCount); - replicas -= stateCount; + int count = stateCount <= candidateNodeNum ? stateCount : candidateNodeNum; + candidateNodeNum -= count; + stateCountMap.put(state, count); + replicas -= count; } } } @@ -458,11 +463,31 @@ public class StateModelDefinition extends HelixProperty { for (String state : statesPriorityList) { String num = getNumInstancesPerState(state); if ("R".equals(num)) { - stateCountMap.put(state, replicas); + if (candidateNodeNum > 0 && replicas > 0) { + stateCountMap.put(state, replicas < candidateNodeNum ? replicas : candidateNodeNum); + } // should have at most one state using R break; } } return stateCountMap; } + + /** + * Given instance->state map, return the state counts + * + * @param stateMap + * + * @return state->count map for the given state map. + */ + public static Map<String, Integer> getStateCounts(Map<String, String> stateMap) { + Map<String, Integer> stateCounts = new HashMap<>(); + for (String state : stateMap.values()) { + if (!stateCounts.containsKey(state)) { + stateCounts.put(state, 0); + } + stateCounts.put(state, stateCounts.get(state) + 1); + } + return stateCounts; + } } http://git-wip-us.apache.org/repos/asf/helix/blob/bf181daf/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java index 2e10cd6..0ff4f3d 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java @@ -23,6 +23,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.model.IdealState; import org.apache.helix.util.TestInputLoader; import org.testng.Assert; import org.testng.annotations.DataProvider; @@ -40,7 +41,7 @@ public class TestAbstractRebalancer { Map<String, String> bestPossibleMap = rebalancer .computeBestPossibleStateForPartition(new HashSet<String>(liveInstances), BuiltInStateModelDefinitions.valueOf(stateModelName).getStateModelDefinition(), - preferenceList, currentStateMap, new HashSet<String>(disabledInstancesForPartition), true); + preferenceList, currentStateMap, new HashSet<String>(disabledInstancesForPartition), new IdealState("test")); Assert.assertTrue(bestPossibleMap.equals(expectedBestPossibleMap)); } http://git-wip-us.apache.org/repos/asf/helix/blob/bf181daf/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java index e39b343..eb40b6a 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java @@ -41,6 +41,7 @@ import org.apache.helix.ZNRecord; import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy; import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy; import org.apache.helix.controller.stages.ClusterDataCache; +import org.apache.helix.model.IdealState; import org.apache.helix.model.LiveInstance; import org.apache.helix.model.StateModelDefinition; import org.apache.helix.tools.StateModelConfigGenerator; @@ -221,13 +222,15 @@ public class TestAutoRebalanceStrategy { accessor.setProperty(keyBuilder.liveInstance(node), liveInstance); } cache.refresh(accessor); + + IdealState is = new IdealState("resource"); for (String partition : _partitions) { List<String> preferenceList = listResult.get(partition); Map<String, String> currentStateMap = _currentMapping.get(partition); Set<String> disabled = Collections.emptySet(); Map<String, String> assignment = new AutoRebalancer() .computeBestPossibleStateForPartition(cache.getLiveInstances().keySet(), _stateModelDef, preferenceList, - currentStateMap, disabled, true); + currentStateMap, disabled, is); mapResult.put(partition, assignment); } return mapResult; http://git-wip-us.apache.org/repos/asf/helix/blob/bf181daf/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java index fead6a1..130e174 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java @@ -11,6 +11,7 @@ import java.util.Map; import java.util.Set; import org.apache.helix.controller.stages.BaseStageTest; import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.model.IdealState; import org.apache.helix.model.StateModelDefinition; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.ObjectReader; @@ -53,14 +54,19 @@ public class TestZeroReplicaAvoidance extends BaseStageTest { liveInstances.add("localhost_" + i); } + IdealState is = new IdealState("test"); + is.setReplicas("3"); + DelayedAutoRebalancer rebalancer = new DelayedAutoRebalancer(); Map<String, String> bestPossibleMap = rebalancer .computeBestPossibleStateForPartition(liveInstances, stateModelDef, instancePreferenceList, currentStateMap, - Collections.<String>emptySet(), true); + Collections.<String>emptySet(), is); Assert.assertEquals(bestPossibleMap, expectedBestPossibleMap, - "Differs, get " + bestPossibleMap + ": expected: " + expectedBestPossibleMap); + "Differs, get " + bestPossibleMap + "\nexpected: " + expectedBestPossibleMap + + "\ncurrentState: " + currentStateMap + "\npreferenceList: " + instancePreferenceList); - System.out.println("END TestBestPossibleStateCalcStage at " + new Date(System.currentTimeMillis())); + System.out.println( + "END TestBestPossibleStateCalcStage at " + new Date(System.currentTimeMillis())); } @DataProvider(name = "zeroReplicaInput") http://git-wip-us.apache.org/repos/asf/helix/blob/bf181daf/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java b/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java index 1cf19cc..6417b9b 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java +++ b/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java @@ -19,6 +19,8 @@ package org.apache.helix.integration.common; * under the License. */ +import java.util.Map; +import java.util.Set; import java.util.logging.Level; import org.I0Itec.zkclient.ZkServer; import org.apache.helix.BaseDataAccessor; @@ -32,15 +34,19 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.manager.zk.ZNRecordSerializer; import org.apache.helix.manager.zk.ZkBaseDataAccessor; import org.apache.helix.manager.zk.ZkClient; +import org.apache.helix.model.BuiltInStateModelDefinitions; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.ConfigScope; +import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.StateModelDefinition; import org.apache.helix.model.builder.ConfigScopeBuilder; import org.apache.helix.tools.ClusterSetup; import org.apache.helix.util.ZKClientPool; import org.apache.log4j.Logger; +import org.testng.Assert; import org.testng.AssertJUnit; import org.testng.annotations.AfterSuite; import org.testng.annotations.BeforeSuite; @@ -176,4 +182,44 @@ public class ZkIntegrationTestBase { return idealState; } + + /** + * Validate there should be always minimal active replica and top state replica for each partition. + * Also make sure there is always some partitions with only active replica count. + */ + protected void validateMinActiveAndTopStateReplica(IdealState is, ExternalView ev, + int minActiveReplica, int numNodes) { + StateModelDefinition stateModelDef = + BuiltInStateModelDefinitions.valueOf(is.getStateModelDefRef()).getStateModelDefinition(); + String topState = stateModelDef.getStatesPriorityList().get(0); + int replica = Integer.valueOf(is.getReplicas()); + + Map<String, Integer> stateCount = + stateModelDef.getStateCountMap(numNodes, replica); + Set<String> activeStates = stateCount.keySet(); + + for (String partition : is.getPartitionSet()) { + Map<String, String> assignmentMap = ev.getRecord().getMapField(partition); + Assert.assertNotNull(assignmentMap, + is.getResourceName() + "'s best possible assignment is null for partition " + partition); + Assert.assertTrue(!assignmentMap.isEmpty(), + is.getResourceName() + "'s partition " + partition + " has no best possible map in IS."); + + boolean hasTopState = false; + int activeReplica = 0; + for (String state : assignmentMap.values()) { + if (topState.equalsIgnoreCase(state)) { + hasTopState = true; + } + if (activeStates.contains(state)) { + activeReplica++; + } + } + + Assert.assertTrue(hasTopState, String.format("%s missing %s replica", partition, topState)); + Assert.assertTrue(activeReplica >= minActiveReplica, String + .format("%s has less active replica %d then required %d", partition, activeReplica, + minActiveReplica)); + } + } } http://git-wip-us.apache.org/repos/asf/helix/blob/bf181daf/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java index d1cbe81..350ecd1 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java +++ b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java @@ -33,6 +33,7 @@ import org.apache.helix.mock.participant.MockSchemataModelFactory; import org.apache.helix.mock.participant.MockTransition; import org.apache.helix.model.BuiltInStateModelDefinitions; import org.apache.helix.participant.StateMachineEngine; +import org.apache.helix.participant.statemachine.StateModelFactory; import org.apache.log4j.Logger; public class MockParticipantManager extends ZKHelixManager implements Runnable, ZkTestManager { @@ -42,7 +43,11 @@ public class MockParticipantManager extends ZKHelixManager implements Runnable, private CountDownLatch _stopCountDown = new CountDownLatch(1); private CountDownLatch _waitStopCompleteCountDown = new CountDownLatch(1); - private final MockMSModelFactory _msModelFactory = new MockMSModelFactory(null); + protected MockMSModelFactory _msModelFactory = new MockMSModelFactory(null); + protected DummyLeaderStandbyStateModelFactory _lsModelFactory = + new DummyLeaderStandbyStateModelFactory(10); + protected DummyOnlineOfflineStateModelFactory _ofModelFactory = + new DummyOnlineOfflineStateModelFactory(10); public MockParticipantManager(String zkAddr, String clusterName, String instanceName) { super(clusterName, instanceName, InstanceType.PARTICIPANT, zkAddr); @@ -86,15 +91,10 @@ public class MockParticipantManager extends ZKHelixManager implements Runnable, StateMachineEngine stateMach = getStateMachineEngine(); stateMach.registerStateModelFactory(BuiltInStateModelDefinitions.MasterSlave.name(), _msModelFactory); - - DummyLeaderStandbyStateModelFactory lsModelFactory = - new DummyLeaderStandbyStateModelFactory(10); - DummyOnlineOfflineStateModelFactory ofModelFactory = - new DummyOnlineOfflineStateModelFactory(10); stateMach.registerStateModelFactory(BuiltInStateModelDefinitions.LeaderStandby.name(), - lsModelFactory); + _lsModelFactory); stateMach.registerStateModelFactory(BuiltInStateModelDefinitions.OnlineOffline.name(), - ofModelFactory); + _ofModelFactory); MockSchemataModelFactory schemataFactory = new MockSchemataModelFactory(); stateMach.registerStateModelFactory("STORAGE_DEFAULT_SM_SCHEMATA", schemataFactory); http://git-wip-us.apache.org/repos/asf/helix/blob/bf181daf/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestDelayedAutoRebalance.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestDelayedAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestDelayedAutoRebalance.java index a5b8ef3..d926e01 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestDelayedAutoRebalance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestDelayedAutoRebalance.java @@ -33,7 +33,6 @@ import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.model.BuiltInStateModelDefinitions; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; -import org.apache.helix.model.StateModelDefinition; import org.apache.helix.tools.ClusterSetup; import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier; @@ -137,7 +136,7 @@ public class TestDelayedAutoRebalance extends ZkIntegrationTestBase { ExternalView ev = _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica); + validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); } setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, -1); } @@ -160,7 +159,7 @@ public class TestDelayedAutoRebalance extends ZkIntegrationTestBase { ExternalView ev = _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - validateMinActiveAndTopStateReplica(is, ev, _replica); + validateMinActiveAndTopStateReplica(is, ev, _replica, NUM_NODE); } } @@ -187,9 +186,9 @@ public class TestDelayedAutoRebalance extends ZkIntegrationTestBase { _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); if (db.equals(testDb)) { - validateMinActiveAndTopStateReplica(idealState, ev, _replica); + validateMinActiveAndTopStateReplica(idealState, ev, _replica, NUM_NODE); } else { - validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica); + validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); validateNoPartitionMove(is, externalViewsBefore.get(db), ev, _participants.get(0).getInstanceName(), false); } @@ -212,9 +211,8 @@ public class TestDelayedAutoRebalance extends ZkIntegrationTestBase { for (String db : _testDBs) { ExternalView ev = _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); - IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState( - CLUSTER_NAME, db); - validateMinActiveAndTopStateReplica(is, ev, _replica); + IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + validateMinActiveAndTopStateReplica(is, ev, _replica, NUM_NODE); } enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true); @@ -312,46 +310,6 @@ public class TestDelayedAutoRebalance extends ZkIntegrationTestBase { } } - /** - * Validate there should be always minimal active replica and top state replica for each partition. - * Also make sure there is always some partitions with only active replica count. - */ - protected void validateMinActiveAndTopStateReplica(IdealState is, ExternalView ev, - int minActiveReplica) { - StateModelDefinition stateModelDef = - BuiltInStateModelDefinitions.valueOf(is.getStateModelDefRef()).getStateModelDefinition(); - String topState = stateModelDef.getStatesPriorityList().get(0); - int replica = Integer.valueOf(is.getReplicas()); - - Map<String, Integer> stateCount = - stateModelDef.getStateCountMap(NUM_NODE, replica); - Set<String> activeStates = stateCount.keySet(); - - for (String partition : is.getPartitionSet()) { - Map<String, String> assignmentMap = ev.getRecord().getMapField(partition); - Assert.assertNotNull(assignmentMap, - is.getResourceName() + "'s best possible assignment is null for partition " + partition); - Assert.assertTrue(!assignmentMap.isEmpty(), - is.getResourceName() + "'s partition " + partition + " has no best possible map in IS."); - - boolean hasTopState = false; - int activeReplica = 0; - for (String state : assignmentMap.values()) { - if (topState.equalsIgnoreCase(state)) { - hasTopState = true; - } - if (activeStates.contains(state)) { - activeReplica++; - } - } - - Assert.assertTrue(hasTopState, String.format("%s missing %s replica", partition, topState)); - Assert.assertTrue(activeReplica >= minActiveReplica, String - .format("%s has less active replica %d then required %d", partition, activeReplica, - minActiveReplica)); - } - } - private void validateDelayedMovements(Map<String, ExternalView> externalViewsBefore) throws InterruptedException { // bring down one node, no partition should be moved. @@ -363,7 +321,7 @@ public class TestDelayedAutoRebalance extends ZkIntegrationTestBase { ExternalView ev = _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica); + validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); validateNoPartitionMove(is, externalViewsBefore.get(db), ev, _participants.get(0).getInstanceName(), false); } http://git-wip-us.apache.org/repos/asf/helix/blob/bf181daf/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java index d9a46d2..5df2563 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java @@ -63,7 +63,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut ExternalView ev = _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica); + validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); validateNoPartitionMove(is, externalViewsBefore.get(db), ev, instance, true); } } @@ -86,7 +86,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut ExternalView ev = _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica); + validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); validateNoPartitionMove(is, externalViewsBefore.get(db), ev, _participants.get(0).getInstanceName(), true); } @@ -113,7 +113,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut ExternalView ev = _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica); + validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); validateNoPartitionMove(is, externalViewsBefore.get(db), ev, _participants.get(0).getInstanceName(), true); } @@ -127,7 +127,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut ExternalView ev = _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica); + validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); } setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, -1); } @@ -150,7 +150,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut ExternalView ev = _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica); + validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); validateNoPartitionMove(is, externalViewsBefore.get(db), ev, _participants.get(0).getInstanceName(), true); } @@ -164,7 +164,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut ExternalView ev = _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica); + validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); } setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, -1); } @@ -188,7 +188,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut ExternalView ev = _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica); + validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); validateNoPartitionMove(is, externalViewsBefore.get(db), ev, _participants.get(0).getInstanceName(), true); } @@ -199,7 +199,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut ExternalView ev = _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - validateMinActiveAndTopStateReplica(is, ev, _replica); + validateMinActiveAndTopStateReplica(is, ev, _replica, NUM_NODE); } } @@ -217,7 +217,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut ExternalView ev = _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica); + validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); validateNoPartitionMove(is, externalViewsBefore.get(db), ev, _participants.get(0).getInstanceName(), true); } @@ -240,9 +240,9 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); if (db.equals(testDb)) { - validateMinActiveAndTopStateReplica(idealState, ev, _replica); + validateMinActiveAndTopStateReplica(idealState, ev, _replica, NUM_NODE); } else { - validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica); + validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); validateNoPartitionMove(is, externalViewsBefore.get(db), ev, _participants.get(0).getInstanceName(), true); } @@ -265,7 +265,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut ExternalView ev = _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica); + validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); validateNoPartitionMove(is, externalViewsBefore.get(db), ev, _participants.get(0).getInstanceName(), true); } @@ -281,7 +281,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState( CLUSTER_NAME, db); - validateMinActiveAndTopStateReplica(is, ev, _replica); + validateMinActiveAndTopStateReplica(is, ev, _replica, NUM_NODE); } enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true); http://git-wip-us.apache.org/repos/asf/helix/blob/bf181daf/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java index fafd78c..d2f4013 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java @@ -24,8 +24,12 @@ import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixException; +import org.apache.helix.InstanceType; +import org.apache.helix.NotificationContext; import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy; import org.apache.helix.controller.rebalancer.strategy.MultiRoundCrushRebalanceStrategy; import org.apache.helix.controller.rebalancer.util.RebalanceScheduler; @@ -33,9 +37,23 @@ import org.apache.helix.integration.common.ZkIntegrationTestBase; import org.apache.helix.integration.manager.ClusterControllerManager; import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.manager.zk.ZKHelixDataAccessor; +import org.apache.helix.manager.zk.ZKHelixManager; +import org.apache.helix.mock.participant.DummyProcess; +import org.apache.helix.mock.participant.MockMSModelFactory; +import org.apache.helix.mock.participant.MockMSStateModel; +import org.apache.helix.mock.participant.MockSchemataModelFactory; +import org.apache.helix.mock.participant.MockTransition; import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; +import org.apache.helix.model.Message; import org.apache.helix.model.ResourceConfig; +import org.apache.helix.model.StateModelDefinition; +import org.apache.helix.participant.StateMachineEngine; +import org.apache.helix.participant.statemachine.StateModel; +import org.apache.helix.participant.statemachine.StateModelFactory; +import org.apache.helix.participant.statemachine.StateModelInfo; +import org.apache.helix.participant.statemachine.Transition; import org.apache.helix.tools.ClusterSetup; import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier; @@ -146,6 +164,50 @@ public class TestMixedModeAutoRebalance extends ZkIntegrationTestBase { } } + @Test + public void testUserDefinedPreferenceListsInFullAutoWithErrors() throws Exception { + String db = "Test-DB-1"; + createResourceWithDelayedRebalance(CLUSTER_NAME, db, + BuiltInStateModelDefinitions.MasterSlave.name(), 5, _replica, _replica, 0, + CrushRebalanceStrategy.class.getName()); + + IdealState idealState = + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + Map<String, List<String>> userDefinedPreferenceLists = idealState.getPreferenceLists(); + + List<String> newNodes = new ArrayList<>(); + for (int i = NUM_NODE; i < NUM_NODE + _replica; i++) { + String instance = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + _gSetupTool.addInstanceToCluster(CLUSTER_NAME, instance); + + // start dummy participants + MockParticipantManager participant = + new TestMockParticipantManager(ZK_ADDR, CLUSTER_NAME, instance); + participant.syncStart(); + _participants.add(participant); + newNodes.add(instance); + } + + List<String> userDefinedPartitions = new ArrayList<>(); + for (String partition : userDefinedPreferenceLists.keySet()) { + userDefinedPreferenceLists.put(partition, newNodes); + userDefinedPartitions.add(partition); + } + + ResourceConfig resourceConfig = + new ResourceConfig.Builder(db).setPreferenceLists(userDefinedPreferenceLists).build(); + _configAccessor.setResourceConfig(CLUSTER_NAME, db, resourceConfig); + + //TODO: Trigger rebalancer, remove this once Helix controller is listening on resource config changes. + RebalanceScheduler.invokeRebalance(_dataAccessor, db); + + Thread.sleep(1000); + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + IdealState is = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + validateMinActiveAndTopStateReplica(is, ev, _replica, NUM_NODE); + } + private void verifyUserDefinedPreferenceLists(String db, Map<String, List<String>> userDefinedPreferenceLists, List<String> userDefinedPartitions) throws InterruptedException { @@ -190,4 +252,33 @@ public class TestMixedModeAutoRebalance extends ZkIntegrationTestBase { } System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); } + + public static class TestMockParticipantManager extends MockParticipantManager { + public TestMockParticipantManager(String zkAddr, String clusterName, String instanceName) { + super(zkAddr, clusterName, instanceName); + _msModelFactory = new MockDelayMSStateModelFactory(); + } + } + + public static class MockDelayMSStateModelFactory extends MockMSModelFactory { + @Override + public MockDelayMSStateModel createNewStateModel(String resourceName, + String partitionKey) { + MockDelayMSStateModel model = new MockDelayMSStateModel(null); + return model; + } + } + + // mock delay master-slave state model + @StateModelInfo(initialState = "OFFLINE", states = { "MASTER", "SLAVE", "ERROR" }) + public static class MockDelayMSStateModel extends MockMSStateModel { + public MockDelayMSStateModel(MockTransition transition) { + super(transition); + } + + @Transition(to = "*", from = "*") + public void generalTransitionHandle(Message message, NotificationContext context) { + throw new IllegalArgumentException("AAA"); + } + } } http://git-wip-us.apache.org/repos/asf/helix/blob/bf181daf/helix-core/src/test/resources/TestDelayedAutoRebalancer.MasterSlave.json ---------------------------------------------------------------------- diff --git a/helix-core/src/test/resources/TestDelayedAutoRebalancer.MasterSlave.json b/helix-core/src/test/resources/TestDelayedAutoRebalancer.MasterSlave.json index 29c20b5..5f88600 100644 --- a/helix-core/src/test/resources/TestDelayedAutoRebalancer.MasterSlave.json +++ b/helix-core/src/test/resources/TestDelayedAutoRebalancer.MasterSlave.json @@ -99,7 +99,8 @@ "localhost_2": "MASTER", "localhost_4": "SLAVE", "localhost_0": "DROPPED", - "localhost_1": "SLAVE" + "localhost_1": "SLAVE", + "localhost_3": "ERROR" } } ] http://git-wip-us.apache.org/repos/asf/helix/blob/bf181daf/helix-core/src/test/resources/TestDelayedAutoRebalancer.OnlineOffline.json ---------------------------------------------------------------------- diff --git a/helix-core/src/test/resources/TestDelayedAutoRebalancer.OnlineOffline.json b/helix-core/src/test/resources/TestDelayedAutoRebalancer.OnlineOffline.json index b2a6626..bf745bf 100644 --- a/helix-core/src/test/resources/TestDelayedAutoRebalancer.OnlineOffline.json +++ b/helix-core/src/test/resources/TestDelayedAutoRebalancer.OnlineOffline.json @@ -73,6 +73,7 @@ "localhost_3": "ONLINE" }, "bestPossibleStates": { + "localhost_2": "ERROR", "localhost_3": "ONLINE", "localhost_4": "ONLINE", "localhost_1": "ONLINE" @@ -94,8 +95,9 @@ "bestPossibleStates": { "localhost_3": "ONLINE", "localhost_4": "ONLINE", - "localhost_0": "DROPPED", - "localhost_1": "DROPPED" + "localhost_0": "ONLINE", + "localhost_1": "ONLINE", + "localhost_2": "ERROR" } } ]