Fix no master replica when all of replica in new instances turn to ERROR state 
when migrating existing replicas to all new instances in DelayedAutoRebalancer.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/bf181daf
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/bf181daf
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/bf181daf

Branch: refs/heads/master
Commit: bf181daf8761d91fd4b8aeb03f8e99b9c8bb097a
Parents: fc04d53
Author: Lei Xia <l...@linkedin.com>
Authored: Mon Sep 11 11:00:59 2017 -0700
Committer: Junkai Xue <j...@linkedin.com>
Committed: Mon Nov 6 17:07:38 2017 -0800

----------------------------------------------------------------------
 .../rebalancer/AbstractRebalancer.java          |  23 ++--
 .../rebalancer/DelayedAutoRebalancer.java       |  51 +++++--
 .../stages/IntermediateStateCalcStage.java      | 134 +++++--------------
 .../stages/PersistAssignmentStage.java          |   3 +-
 .../helix/model/StateModelDefinition.java       |  37 ++++-
 .../rebalancer/TestAbstractRebalancer.java      |   3 +-
 .../rebalancer/TestAutoRebalanceStrategy.java   |   5 +-
 .../rebalancer/TestZeroReplicaAvoidance.java    |  12 +-
 .../common/ZkIntegrationTestBase.java           |  46 +++++++
 .../manager/MockParticipantManager.java         |  16 +--
 .../rebalancer/TestDelayedAutoRebalance.java    |  56 +-------
 ...elayedAutoRebalanceWithDisabledInstance.java |  26 ++--
 .../rebalancer/TestMixedModeAutoRebalance.java  |  91 +++++++++++++
 .../TestDelayedAutoRebalancer.MasterSlave.json  |   3 +-
 ...TestDelayedAutoRebalancer.OnlineOffline.json |   6 +-
 15 files changed, 303 insertions(+), 209 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/bf181daf/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
index 94908ba..7f79f7f 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
@@ -96,7 +96,7 @@ public abstract class AbstractRebalancer implements 
Rebalancer, MappingCalculato
           Collections.unmodifiableSet(cache.getLiveInstances().keySet()));
       Map<String, String> bestStateForPartition =
           
computeBestPossibleStateForPartition(cache.getLiveInstances().keySet(), 
stateModelDef, preferenceList,
-              currentStateMap, disabledInstancesForPartition, 
idealState.isEnabled());
+              currentStateMap, disabledInstancesForPartition, idealState);
       partitionMapping.addReplicaMap(partition, bestStateForPartition);
     }
     return partitionMapping;
@@ -160,7 +160,7 @@ public abstract class AbstractRebalancer implements 
Rebalancer, MappingCalculato
   protected Map<String, String> 
computeBestPossibleStateForPartition(Set<String> liveInstances,
       StateModelDefinition stateModelDef, List<String> preferenceList,
       Map<String, String> currentStateMap, Set<String> 
disabledInstancesForPartition,
-      boolean isResourceEnabled) {
+      IdealState idealState) {
 
     if (currentStateMap == null) {
       currentStateMap = Collections.emptyMap();
@@ -173,7 +173,7 @@ public abstract class AbstractRebalancer implements 
Rebalancer, MappingCalculato
     }
 
     // (2) If resource disabled altogether, transit to initial-state (e.g. 
OFFLINE) if it's not in ERROR.
-    if (!isResourceEnabled) {
+    if (!idealState.isEnabled()) {
       return computeBestPossibleMapForDisabledResource(currentStateMap, 
stateModelDef);
     }
 
@@ -243,7 +243,7 @@ public abstract class AbstractRebalancer implements 
Rebalancer, MappingCalculato
   protected Map<String, String> computeBestPossibleMap(List<String> 
preferenceList, StateModelDefinition stateModelDef,
       Map<String, String> currentStateMap, Set<String> liveInstances, 
Set<String> disabledInstancesForPartition) {
 
-    Map<String, String> bestPossibleStateMap = new HashMap<String, String>();
+    Map<String, String> bestPossibleStateMap = new HashMap<>();
 
     // (1) Instances that have current state but not in preference list, drop, 
no matter it's disabled or not.
     for (String instance : currentStateMap.keySet()) {
@@ -276,8 +276,8 @@ public abstract class AbstractRebalancer implements 
Rebalancer, MappingCalculato
     // To achieve that, we sort the preferenceList based on CurrentState, by 
treating top-state and second-states with
     // same priority and rely on the fact that Collections.sort() is stable.
     List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
-    Set<String> assigned = new HashSet<String>();
-    Set<String> liveAndEnabled = new HashSet<String>(liveInstances);
+    Set<String> assigned = new HashSet<>();
+    Set<String> liveAndEnabled = new HashSet<>(liveInstances);
     liveAndEnabled.removeAll(disabledInstancesForPartition);
 
     for (String state : statesPriorityList) {
@@ -294,11 +294,14 @@ public abstract class AbstractRebalancer implements 
Rebalancer, MappingCalculato
         if (stateCount <= 0) {
           break;
         }
-        boolean inError = 
HelixDefinedState.ERROR.toString().equals(currentStateMap.get(instance));
-        if (!assigned.contains(instance) && liveAndEnabled.contains(instance) 
&& !inError) {
-          bestPossibleStateMap.put(instance, state);
+        if (!assigned.contains(instance) && liveAndEnabled.contains(instance)) 
{
+          if 
(HelixDefinedState.ERROR.toString().equals(currentStateMap.get(instance))) {
+            bestPossibleStateMap.put(instance, 
HelixDefinedState.ERROR.toString());
+          } else {
+            bestPossibleStateMap.put(instance, state);
+            stateCount--;
+          }
           assigned.add(instance);
-          stateCount--;
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/bf181daf/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
index bbbdda0..ea9678e 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
@@ -385,7 +385,7 @@ public class DelayedAutoRebalancer extends 
AbstractRebalancer {
       List<String> preferenceList = getPreferenceList(partition, idealState, 
activeNodes);
       Map<String, String> bestStateForPartition =
           computeBestPossibleStateForPartition(liveNodes, stateModelDef, 
preferenceList, currentStateMap,
-              disabledInstancesForPartition, idealState.isEnabled());
+              disabledInstancesForPartition, idealState);
 
       partitionMapping.addReplicaMap(partition, bestStateForPartition);
     }
@@ -414,14 +414,14 @@ public class DelayedAutoRebalancer extends 
AbstractRebalancer {
    * @param currentStateMap
    *          : instance->state for each partition
    * @param disabledInstancesForPartition
-   * @param isResourceEnabled
+   * @param idealState
    * @return
    */
   @Override
   protected Map<String, String> 
computeBestPossibleStateForPartition(Set<String> liveInstances,
       StateModelDefinition stateModelDef, List<String> preferenceList,
       Map<String, String> currentStateMap, Set<String> 
disabledInstancesForPartition,
-      boolean isResourceEnabled) {
+      IdealState idealState) {
 
     if (currentStateMap == null) {
       currentStateMap = Collections.emptyMap();
@@ -434,7 +434,7 @@ public class DelayedAutoRebalancer extends 
AbstractRebalancer {
     }
 
     // (2) If resource disabled altogether, transit to initial-state (e.g. 
OFFLINE) if it's not in ERROR.
-    if (!isResourceEnabled) {
+    if (!idealState.isEnabled()) {
       return computeBestPossibleMapForDisabledResource(currentStateMap, 
stateModelDef);
     }
 
@@ -442,7 +442,7 @@ public class DelayedAutoRebalancer extends 
AbstractRebalancer {
     List<String> instancesToMove = new 
ArrayList<String>(currentStateMap.keySet());
     instancesToMove.removeAll(preferenceList);
 
-    Set<String> instancesToDrop = new HashSet<String>();
+    Set<String> instancesToDrop = new HashSet<>();
     Iterator<String> it = instancesToMove.iterator();
     while (it.hasNext()) {
       String instance = it.next();
@@ -471,13 +471,11 @@ public class DelayedAutoRebalancer extends 
AbstractRebalancer {
       bestPossibleStateMap.put(instance, HelixDefinedState.DROPPED.name());
     }
 
-    // The eventual goal is to have all instances in preferenceList all in the 
target states as specified in bestPossibleStateMap.
-    Map<String, String> targetInstanceMap = new HashMap<String, 
String>(bestPossibleStateMap);
+    // If the load-balance finishes (all replica are migrated to new 
instances),
+    // we should drop all partitions from previous assigned instances.
+    Map<String, String> targetInstanceMap = new HashMap<>(currentStateMap);
     targetInstanceMap.keySet().retainAll(preferenceList);
-
-    // Once currentStateMap contains all required target instances and states, 
the
-    // load-balance finishes, we should drop all partitions from previous 
assigned instances.
-    if (currentStateMap.entrySet().containsAll(targetInstanceMap.entrySet())) {
+    if (migrationCompleted(preferenceList, stateModelDef, targetInstanceMap, 
idealState)) {
       for (String instance : currentStateMap.keySet()) {
         if (!preferenceList.contains(instance)) {
           String state = currentStateMap.get(instance);
@@ -491,6 +489,37 @@ public class DelayedAutoRebalancer extends 
AbstractRebalancer {
     return bestPossibleStateMap;
   }
 
+  private boolean migrationCompleted(List<String> preferenceList,
+      StateModelDefinition stateModelDef, Map<String, String> currentStateMap,
+      IdealState idealState) {
+    if (preferenceList == null) {
+      preferenceList = Collections.emptyList();
+    }
+
+    int replica = idealState.getReplicaCount(preferenceList.size());
+    LinkedHashMap<String, Integer> bestPossileStateCountMap =
+        stateModelDef.getStateCountMap(preferenceList.size(), replica);
+    Map<String, Integer> currentStateCounts = 
StateModelDefinition.getStateCounts(currentStateMap);
+
+    for (String state : bestPossileStateCountMap.keySet()) {
+      if (state.equals(HelixDefinedState.DROPPED.name()) ||
+          state.equals(HelixDefinedState.ERROR.name()) ||
+          state.equals(stateModelDef.getInitialState())) {
+        continue;
+      }
+
+      Integer bestPossibleCount = bestPossileStateCountMap.get(state);
+      Integer currentCount = currentStateCounts.get(state);
+      bestPossibleCount = bestPossibleCount == null ? 0 : bestPossibleCount;
+      currentCount = currentCount == null ? 0 : currentCount;
+      if (currentCount < bestPossibleCount) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
 
   /**
    * Sorter for nodes that sorts according to the CurrentState of the 
partition, based on the state priority defined

http://git-wip-us.apache.org/repos/asf/helix/blob/bf181daf/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index 1b4d3f3..6e2513d 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -227,8 +227,8 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
 
     PartitionStateMap intermediatePartitionStateMap = new 
PartitionStateMap(resourceName);
 
-    Set<Partition> partitionsNeedRecovery = new HashSet<Partition>();
-    Set<Partition> partitionsNeedLoadbalance = new HashSet<Partition>();
+    Set<Partition> partitionsNeedRecovery = new HashSet<>();
+    Set<Partition> partitionsNeedLoadbalance = new HashSet<>();
     for (Partition partition : resource.getPartitions()) {
       Map<String, String> currentStateMap =
           currentStateOutput.getCurrentStateMap(resourceName, partition);
@@ -245,32 +245,30 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
         partitionsNeedLoadbalance.add(partition);
       } else {
         // no rebalance needed.
-        Map<String, String> intermediateMap = new HashMap<String, 
String>(bestPossibleMap);
+        Map<String, String> intermediateMap = new HashMap<>(bestPossibleMap);
         intermediatePartitionStateMap.setState(partition, intermediateMap);
       }
     }
 
-    if (logger.isDebugEnabled()) {
-      logger.debug(
-          "recovery balance needed for " + resource + " partitions: " + 
partitionsNeedRecovery);
-      logger.debug(
-          "load balance needed for " + resource + " partitions: " + 
partitionsNeedLoadbalance);
-    }
+    logger.info(
+        "recovery balance needed for " + resourceName + " partitions: " + 
partitionsNeedRecovery);
+    logger.info(
+        "load balance needed for " + resourceName + " partitions: " + 
partitionsNeedLoadbalance);
 
     chargePendingTransition(resource, currentStateOutput, throttleController,
         partitionsNeedRecovery, partitionsNeedLoadbalance);
 
     // perform recovery rebalance
-    int recoveryRebalanceThrottledCount =
+    Set<Partition> recoveryThrottledPartitions =
     recoveryRebalance(resource, bestPossiblePartitionStateMap, 
throttleController,
         intermediatePartitionStateMap, partitionsNeedRecovery, 
currentStateOutput,
         cache.getStateModelDef(resource.getStateModelDefRef()).getTopState());
 
-    int loadRebalanceThrottledCount = partitionsNeedLoadbalance.size();
+    Set<Partition> loadbalanceThrottledPartitions = partitionsNeedLoadbalance;
     if (partitionsNeedRecovery.isEmpty()) {
       // perform load balance only if no partition need recovery rebalance.
       // TODO: to set a minimal threshold for allowing load-rebalance.
-      loadRebalanceThrottledCount =
+      loadbalanceThrottledPartitions =
           loadRebalance(resource, currentStateOutput, 
bestPossiblePartitionStateMap,
               throttleController, intermediatePartitionStateMap, 
partitionsNeedLoadbalance,
               currentStateOutput.getCurrentStateMap(resourceName));
@@ -284,8 +282,15 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
 
     if (clusterStatusMonitor != null) {
       clusterStatusMonitor.updateRebalancerStats(resourceName, 
partitionsNeedRecovery.size(),
-          partitionsNeedLoadbalance.size(), recoveryRebalanceThrottledCount,
-          loadRebalanceThrottledCount);
+          partitionsNeedLoadbalance.size(), recoveryThrottledPartitions.size(),
+          loadbalanceThrottledPartitions.size());
+    }
+
+    if (logger.isDebugEnabled()) {
+      logParitionMapState(resourceName, new 
HashSet<>(resource.getPartitions()),
+          partitionsNeedRecovery, recoveryThrottledPartitions, 
partitionsNeedLoadbalance,
+          loadbalanceThrottledPartitions, currentStateOutput, 
bestPossiblePartitionStateMap,
+          intermediatePartitionStateMap);
     }
 
     logger.debug("End processing resource:" + resourceName);
@@ -333,10 +338,10 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
   /**
    *  Perform any recovery balance if needed, fill 
intermediatePartitionStateMap
    *  if recover rebalance is needed.
-   *  return the number of partitions needs recoveryRebalance but get throttled
+   *  return the partitions needs recoveryRebalance but get throttled
    */
 
-  public int recoveryRebalance(Resource resource, PartitionStateMap 
bestPossiblePartitionStateMap,
+  public Set<Partition> recoveryRebalance(Resource resource, PartitionStateMap 
bestPossiblePartitionStateMap,
       StateTransitionThrottleController throttleController,
       PartitionStateMap intermediatePartitionStateMap, Set<Partition> 
partitionsNeedRecovery,
       CurrentStateOutput currentStateOutput, String topState) {
@@ -366,14 +371,14 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
           intermediatePartitionStateMap, RebalanceType.RECOVERY_BALANCE);
     }
 
-    logger.debug(String
+    logger.info(String
         .format("needRecovery: %d, recoverybalanceThrottled: %d", 
partitionsNeedRecovery.size(),
             partitionRecoveryBalanceThrottled.size()));
-    return partitionRecoveryBalanceThrottled.size();
+    return partitionRecoveryBalanceThrottled;
   }
 
-  /* return the number of partitions needs loadRebalance but get throttled */
-  private int loadRebalance(Resource resource, CurrentStateOutput 
currentStateOutput,
+  /* return the partitions needs loadRebalance but get throttled */
+  private Set<Partition> loadRebalance(Resource resource, CurrentStateOutput 
currentStateOutput,
       PartitionStateMap bestPossiblePartitionStateMap,
       StateTransitionThrottleController throttleController,
       PartitionStateMap intermediatePartitionStateMap, Set<Partition> 
partitionsNeedLoadbalance,
@@ -382,7 +387,7 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
     Set<Partition> partitionsLoadbalanceThrottled = new HashSet<Partition>();
 
     List<Partition> partitionsNeedLoadRebalancePrioritized =
-        new ArrayList<Partition>(partitionsNeedLoadbalance);
+        new ArrayList<>(partitionsNeedLoadbalance);
 
     // TODO : Due to currently using JAVA 1.6, the original order of 
partitions list is not
     // determinable, sort the list by partition name and remove the code after 
bump to JAVA 1.8
@@ -408,11 +413,11 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
             partitionsLoadbalanceThrottled.size()));
 
     if (logger.isDebugEnabled()) {
-      logger.debug("recovery balance throttled for " + resource + " 
partitions: "
+      logger.debug("recovery balance throttled for " + resourceName + " 
partitions: "
           + partitionsLoadbalanceThrottled);
     }
 
-    return partitionsLoadbalanceThrottled.size();
+    return partitionsLoadbalanceThrottled;
   }
 
   private void throtteStateTransitions(StateTransitionThrottleController 
throttleController,
@@ -482,12 +487,12 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
     }
 
     int replica = idealState.getReplicaCount(preferenceList.size());
-    Set<String> activeList = new HashSet<String>(preferenceList);
+    Set<String> activeList = new HashSet<>(preferenceList);
     activeList.retainAll(cache.getEnabledLiveInstances());
 
     LinkedHashMap<String, Integer> bestPossileStateCountMap =
-        getBestPossibleStateCountMap(stateModelDef, activeList.size(), 
replica);
-    Map<String, Integer> currentStateCounts = getStateCounts(currentStateMap);
+        stateModelDef.getStateCountMap(activeList.size(), replica);
+    Map<String, Integer> currentStateCounts = 
StateModelDefinition.getStateCounts(currentStateMap);
 
     for (String state : bestPossileStateCountMap.keySet()) {
       Integer bestPossibleCount = bestPossileStateCountMap.get(state);
@@ -506,66 +511,6 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
     return RebalanceType.LOAD_BALANCE;
   }
 
-  private LinkedHashMap<String, Integer> getBestPossibleStateCountMap(
-      StateModelDefinition stateModelDef, int candidateNodeNum, int 
totalReplicas) {
-    LinkedHashMap<String, Integer> stateCountMap = new LinkedHashMap<String, 
Integer>();
-    List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
-
-    int replicas = totalReplicas;
-    for (String state : statesPriorityList) {
-      String num = stateModelDef.getNumInstancesPerState(state);
-      if (candidateNodeNum <= 0) {
-        break;
-      }
-      if ("N".equals(num)) {
-        stateCountMap.put(state, candidateNodeNum);
-        replicas -= candidateNodeNum;
-        break;
-      } else if ("R".equals(num)) {
-        // wait until we get the counts for all other states
-        continue;
-      } else {
-        int stateCount = -1;
-        try {
-          stateCount = Integer.parseInt(num);
-        } catch (Exception e) {
-        }
-
-        if (stateCount > 0) {
-          int count = stateCount <= candidateNodeNum ? stateCount : 
candidateNodeNum;
-          candidateNodeNum -= count;
-          stateCountMap.put(state, count);
-          replicas -= count;
-        }
-      }
-    }
-
-    // get state count for R
-    for (String state : statesPriorityList) {
-      String num = stateModelDef.getNumInstancesPerState(state);
-      if ("R".equals(num)) {
-        if (candidateNodeNum > 0 && replicas > 0) {
-          stateCountMap.put(state, replicas < candidateNodeNum ? replicas : 
candidateNodeNum);
-        }
-        // should have at most one state using R
-        break;
-      }
-    }
-    return stateCountMap;
-  }
-
-  /* given instance->state map, return the state counts */
-  private Map<String, Integer> getStateCounts(Map<String, String> stateMap) {
-    Map<String, Integer> stateCounts = new HashMap<String, Integer>();
-    for (String state : stateMap.values()) {
-      if (!stateCounts.containsKey(state)) {
-        stateCounts.put(state, 0);
-      }
-      stateCounts.put(state, stateCounts.get(state) + 1);
-    }
-    return stateCounts;
-  }
-
   private void logParitionMapState(String resource, Set<Partition> 
allPartitions,
       Set<Partition> recoveryPartitions, Set<Partition> 
recoveryThrottledPartitions,
       Set<Partition> loadbalancePartitions, Set<Partition> 
loadbalanceThrottledPartitions,
@@ -579,23 +524,6 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
         + "\nPartitions get throttled on load-balance: " + 
loadbalanceThrottledPartitions);
 
     for (Partition partition : allPartitions) {
-      if (recoveryPartitions.contains(partition)) {
-        logger
-            .debug("recovery balance needed for " + resource + " " + 
partition.getPartitionName());
-        if (recoveryThrottledPartitions.contains(partition)) {
-          logger.debug("Recovery balance throttled on resource for " + 
resource + " " + partition
-              .getPartitionName());
-        }
-      } else if (loadbalancePartitions.contains(partition)) {
-        logger.debug("load balance needed for " + resource + " " + 
partition.getPartitionName());
-        if (loadbalanceThrottledPartitions.contains(partition)) {
-          logger.debug("Load balance throttled on resource for " + resource + 
" " + partition
-              .getPartitionName());
-        }
-      } else {
-        logger.debug("no balance needed for " + resource + " " + 
partition.getPartitionName());
-      }
-
       logger.debug(
           partition + ": Best possible map: " + 
bestPossibleStateMap.getPartitionMap(partition));
       logger.debug(partition + ": Current State: " + currentStateOutput

http://git-wip-us.apache.org/repos/asf/helix/blob/bf181daf/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
index bd39a83..d969405 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
@@ -193,7 +193,8 @@ public class PersistAssignmentStage extends 
AbstractBaseStage {
       IdealState idealState, Map<Partition, Map<String, String>> assignments) {
     String stateModelDef = idealState.getStateModelDefRef();
     /** Only convert for MasterSlave resources */
-    if 
(!stateModelDef.equals(BuiltInStateModelDefinitions.MasterSlave.name())) {
+    if (!stateModelDef.equals(BuiltInStateModelDefinitions.MasterSlave.name()) 
|| idealState
+        .getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO)) {
       return assignments;
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/bf181daf/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java 
b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
index 17ff9de..01a3746 100644
--- a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
+++ b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
@@ -427,14 +427,19 @@ public class StateModelDefinition extends HelixProperty {
    * @return state count map: state->count
    */
   public LinkedHashMap<String, Integer> getStateCountMap(int candidateNodeNum, 
int totalReplicas) {
-    LinkedHashMap<String, Integer> stateCountMap = new LinkedHashMap<String, 
Integer>();
+    LinkedHashMap<String, Integer> stateCountMap = new LinkedHashMap<>();
     List<String> statesPriorityList = getStatesPriorityList();
 
     int replicas = totalReplicas;
     for (String state : statesPriorityList) {
       String num = getNumInstancesPerState(state);
+      if (candidateNodeNum <= 0) {
+        break;
+      }
       if ("N".equals(num)) {
         stateCountMap.put(state, candidateNodeNum);
+        replicas -= candidateNodeNum;
+        break;
       } else if ("R".equals(num)) {
         // wait until we get the counts for all other states
         continue;
@@ -443,13 +448,13 @@ public class StateModelDefinition extends HelixProperty {
         try {
           stateCount = Integer.parseInt(num);
         } catch (Exception e) {
-          // LOG.error("Invalid count for state: " + state + ", count: " + num 
+
-          // ", use -1 instead");
         }
 
         if (stateCount > 0) {
-          stateCountMap.put(state, stateCount);
-          replicas -= stateCount;
+          int count = stateCount <= candidateNodeNum ? stateCount : 
candidateNodeNum;
+          candidateNodeNum -= count;
+          stateCountMap.put(state, count);
+          replicas -= count;
         }
       }
     }
@@ -458,11 +463,31 @@ public class StateModelDefinition extends HelixProperty {
     for (String state : statesPriorityList) {
       String num = getNumInstancesPerState(state);
       if ("R".equals(num)) {
-        stateCountMap.put(state, replicas);
+        if (candidateNodeNum > 0 && replicas > 0) {
+          stateCountMap.put(state, replicas < candidateNodeNum ? replicas : 
candidateNodeNum);
+        }
         // should have at most one state using R
         break;
       }
     }
     return stateCountMap;
   }
+
+  /**
+   * Given instance->state map, return the state counts
+   *
+   * @param stateMap
+   *
+   * @return state->count map for the given state map.
+   */
+  public static Map<String, Integer> getStateCounts(Map<String, String> 
stateMap) {
+    Map<String, Integer> stateCounts = new HashMap<>();
+    for (String state : stateMap.values()) {
+      if (!stateCounts.containsKey(state)) {
+        stateCounts.put(state, 0);
+      }
+      stateCounts.put(state, stateCounts.get(state) + 1);
+    }
+    return stateCounts;
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/bf181daf/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java
index 2e10cd6..0ff4f3d 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java
@@ -23,6 +23,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.util.TestInputLoader;
 import org.testng.Assert;
 import org.testng.annotations.DataProvider;
@@ -40,7 +41,7 @@ public class TestAbstractRebalancer {
     Map<String, String> bestPossibleMap = rebalancer
         .computeBestPossibleStateForPartition(new 
HashSet<String>(liveInstances),
             
BuiltInStateModelDefinitions.valueOf(stateModelName).getStateModelDefinition(),
-            preferenceList, currentStateMap, new 
HashSet<String>(disabledInstancesForPartition), true);
+            preferenceList, currentStateMap, new 
HashSet<String>(disabledInstancesForPartition), new IdealState("test"));
 
     Assert.assertTrue(bestPossibleMap.equals(expectedBestPossibleMap));
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/bf181daf/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
index e39b343..eb40b6a 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
@@ -41,6 +41,7 @@ import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
 import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.tools.StateModelConfigGenerator;
@@ -221,13 +222,15 @@ public class TestAutoRebalanceStrategy {
         accessor.setProperty(keyBuilder.liveInstance(node), liveInstance);
       }
       cache.refresh(accessor);
+
+      IdealState is = new IdealState("resource");
       for (String partition : _partitions) {
         List<String> preferenceList = listResult.get(partition);
         Map<String, String> currentStateMap = _currentMapping.get(partition);
         Set<String> disabled = Collections.emptySet();
         Map<String, String> assignment = new AutoRebalancer()
             
.computeBestPossibleStateForPartition(cache.getLiveInstances().keySet(), 
_stateModelDef, preferenceList,
-                currentStateMap, disabled, true);
+                currentStateMap, disabled, is);
         mapResult.put(partition, assignment);
       }
       return mapResult;

http://git-wip-us.apache.org/repos/asf/helix/blob/bf181daf/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java
index fead6a1..130e174 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java
@@ -11,6 +11,7 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.helix.controller.stages.BaseStageTest;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.StateModelDefinition;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.ObjectReader;
@@ -53,14 +54,19 @@ public class TestZeroReplicaAvoidance extends BaseStageTest 
{
       liveInstances.add("localhost_" + i);
     }
 
+    IdealState is = new IdealState("test");
+    is.setReplicas("3");
+
     DelayedAutoRebalancer rebalancer = new DelayedAutoRebalancer();
     Map<String, String> bestPossibleMap = rebalancer
         .computeBestPossibleStateForPartition(liveInstances, stateModelDef, 
instancePreferenceList, currentStateMap,
-            Collections.<String>emptySet(), true);
+            Collections.<String>emptySet(), is);
     Assert.assertEquals(bestPossibleMap, expectedBestPossibleMap,
-        "Differs, get " + bestPossibleMap + ": expected: " + 
expectedBestPossibleMap);
+        "Differs, get " + bestPossibleMap + "\nexpected: " + 
expectedBestPossibleMap
+            + "\ncurrentState: " + currentStateMap + "\npreferenceList: " + 
instancePreferenceList);
 
-    System.out.println("END TestBestPossibleStateCalcStage at " + new 
Date(System.currentTimeMillis()));
+    System.out.println(
+        "END TestBestPossibleStateCalcStage at " + new 
Date(System.currentTimeMillis()));
   }
 
   @DataProvider(name = "zeroReplicaInput")

http://git-wip-us.apache.org/repos/asf/helix/blob/bf181daf/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java
 
b/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java
index 1cf19cc..6417b9b 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java
@@ -19,6 +19,8 @@ package org.apache.helix.integration.common;
  * under the License.
  */
 
+import java.util.Map;
+import java.util.Set;
 import java.util.logging.Level;
 import org.I0Itec.zkclient.ZkServer;
 import org.apache.helix.BaseDataAccessor;
@@ -32,15 +34,19 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ConfigScope;
+import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.builder.ConfigScopeBuilder;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.util.ZKClientPool;
 import org.apache.log4j.Logger;
+import org.testng.Assert;
 import org.testng.AssertJUnit;
 import org.testng.annotations.AfterSuite;
 import org.testng.annotations.BeforeSuite;
@@ -176,4 +182,44 @@ public class ZkIntegrationTestBase {
 
     return idealState;
   }
+
+  /**
+   * Validate there should be always minimal active replica and top state 
replica for each partition.
+   * Also make sure there is always some partitions with only active replica 
count.
+   */
+  protected void validateMinActiveAndTopStateReplica(IdealState is, 
ExternalView ev,
+      int minActiveReplica, int numNodes) {
+    StateModelDefinition stateModelDef =
+        
BuiltInStateModelDefinitions.valueOf(is.getStateModelDefRef()).getStateModelDefinition();
+    String topState = stateModelDef.getStatesPriorityList().get(0);
+    int replica = Integer.valueOf(is.getReplicas());
+
+    Map<String, Integer> stateCount =
+        stateModelDef.getStateCountMap(numNodes, replica);
+    Set<String> activeStates = stateCount.keySet();
+
+    for (String partition : is.getPartitionSet()) {
+      Map<String, String> assignmentMap = 
ev.getRecord().getMapField(partition);
+      Assert.assertNotNull(assignmentMap,
+          is.getResourceName() + "'s best possible assignment is null for 
partition " + partition);
+      Assert.assertTrue(!assignmentMap.isEmpty(),
+          is.getResourceName() + "'s partition " + partition + " has no best 
possible map in IS.");
+
+      boolean hasTopState = false;
+      int activeReplica = 0;
+      for (String state : assignmentMap.values()) {
+        if (topState.equalsIgnoreCase(state)) {
+          hasTopState = true;
+        }
+        if (activeStates.contains(state)) {
+          activeReplica++;
+        }
+      }
+
+      Assert.assertTrue(hasTopState, String.format("%s missing %s replica", 
partition, topState));
+      Assert.assertTrue(activeReplica >= minActiveReplica, String
+          .format("%s has less active replica %d then required %d", partition, 
activeReplica,
+              minActiveReplica));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/bf181daf/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
 
b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
index d1cbe81..350ecd1 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
@@ -33,6 +33,7 @@ import 
org.apache.helix.mock.participant.MockSchemataModelFactory;
 import org.apache.helix.mock.participant.MockTransition;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModelFactory;
 import org.apache.log4j.Logger;
 
 public class MockParticipantManager extends ZKHelixManager implements 
Runnable, ZkTestManager {
@@ -42,7 +43,11 @@ public class MockParticipantManager extends ZKHelixManager 
implements Runnable,
   private CountDownLatch _stopCountDown = new CountDownLatch(1);
   private CountDownLatch _waitStopCompleteCountDown = new CountDownLatch(1);
 
-  private final MockMSModelFactory _msModelFactory = new 
MockMSModelFactory(null);
+  protected MockMSModelFactory _msModelFactory = new MockMSModelFactory(null);
+  protected DummyLeaderStandbyStateModelFactory _lsModelFactory =
+      new DummyLeaderStandbyStateModelFactory(10);
+  protected DummyOnlineOfflineStateModelFactory _ofModelFactory =
+      new DummyOnlineOfflineStateModelFactory(10);
 
   public MockParticipantManager(String zkAddr, String clusterName, String 
instanceName) {
     super(clusterName, instanceName, InstanceType.PARTICIPANT, zkAddr);
@@ -86,15 +91,10 @@ public class MockParticipantManager extends ZKHelixManager 
implements Runnable,
       StateMachineEngine stateMach = getStateMachineEngine();
       
stateMach.registerStateModelFactory(BuiltInStateModelDefinitions.MasterSlave.name(),
           _msModelFactory);
-
-      DummyLeaderStandbyStateModelFactory lsModelFactory =
-          new DummyLeaderStandbyStateModelFactory(10);
-      DummyOnlineOfflineStateModelFactory ofModelFactory =
-          new DummyOnlineOfflineStateModelFactory(10);
       
stateMach.registerStateModelFactory(BuiltInStateModelDefinitions.LeaderStandby.name(),
-          lsModelFactory);
+          _lsModelFactory);
       
stateMach.registerStateModelFactory(BuiltInStateModelDefinitions.OnlineOffline.name(),
-          ofModelFactory);
+          _ofModelFactory);
 
       MockSchemataModelFactory schemataFactory = new 
MockSchemataModelFactory();
       stateMach.registerStateModelFactory("STORAGE_DEFAULT_SM_SCHEMATA", 
schemataFactory);

http://git-wip-us.apache.org/repos/asf/helix/blob/bf181daf/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestDelayedAutoRebalance.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestDelayedAutoRebalance.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestDelayedAutoRebalance.java
index a5b8ef3..d926e01 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestDelayedAutoRebalance.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestDelayedAutoRebalance.java
@@ -33,7 +33,6 @@ import 
org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
-import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.tools.ClusterSetup;
 import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
@@ -137,7 +136,7 @@ public class TestDelayedAutoRebalance extends 
ZkIntegrationTestBase {
       ExternalView ev =
           
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
       IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
-      validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica);
+      validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE);
     }
     setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, -1);
   }
@@ -160,7 +159,7 @@ public class TestDelayedAutoRebalance extends 
ZkIntegrationTestBase {
       ExternalView ev =
           
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
       IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
-      validateMinActiveAndTopStateReplica(is, ev, _replica);
+      validateMinActiveAndTopStateReplica(is, ev, _replica, NUM_NODE);
     }
   }
 
@@ -187,9 +186,9 @@ public class TestDelayedAutoRebalance extends 
ZkIntegrationTestBase {
           
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
 
       if (db.equals(testDb)) {
-        validateMinActiveAndTopStateReplica(idealState, ev, _replica);
+        validateMinActiveAndTopStateReplica(idealState, ev, _replica, 
NUM_NODE);
       } else {
-        validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica);
+        validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, 
NUM_NODE);
         validateNoPartitionMove(is, externalViewsBefore.get(db), ev,
             _participants.get(0).getInstanceName(), false);
       }
@@ -212,9 +211,8 @@ public class TestDelayedAutoRebalance extends 
ZkIntegrationTestBase {
     for (String db : _testDBs) {
       ExternalView ev =
           
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
-      IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(
-          CLUSTER_NAME, db);
-      validateMinActiveAndTopStateReplica(is, ev, _replica);
+      IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      validateMinActiveAndTopStateReplica(is, ev, _replica, NUM_NODE);
     }
 
     enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true);
@@ -312,46 +310,6 @@ public class TestDelayedAutoRebalance extends 
ZkIntegrationTestBase {
     }
   }
 
-  /**
-   * Validate there should be always minimal active replica and top state 
replica for each partition.
-   * Also make sure there is always some partitions with only active replica 
count.
-   */
-  protected void validateMinActiveAndTopStateReplica(IdealState is, 
ExternalView ev,
-      int minActiveReplica) {
-    StateModelDefinition stateModelDef =
-        
BuiltInStateModelDefinitions.valueOf(is.getStateModelDefRef()).getStateModelDefinition();
-    String topState = stateModelDef.getStatesPriorityList().get(0);
-    int replica = Integer.valueOf(is.getReplicas());
-
-    Map<String, Integer> stateCount =
-        stateModelDef.getStateCountMap(NUM_NODE, replica);
-    Set<String> activeStates = stateCount.keySet();
-
-    for (String partition : is.getPartitionSet()) {
-      Map<String, String> assignmentMap = 
ev.getRecord().getMapField(partition);
-      Assert.assertNotNull(assignmentMap,
-          is.getResourceName() + "'s best possible assignment is null for 
partition " + partition);
-      Assert.assertTrue(!assignmentMap.isEmpty(),
-          is.getResourceName() + "'s partition " + partition + " has no best 
possible map in IS.");
-
-      boolean hasTopState = false;
-      int activeReplica = 0;
-      for (String state : assignmentMap.values()) {
-        if (topState.equalsIgnoreCase(state)) {
-          hasTopState = true;
-        }
-        if (activeStates.contains(state)) {
-          activeReplica++;
-        }
-      }
-
-      Assert.assertTrue(hasTopState, String.format("%s missing %s replica", 
partition, topState));
-      Assert.assertTrue(activeReplica >= minActiveReplica, String
-          .format("%s has less active replica %d then required %d", partition, 
activeReplica,
-              minActiveReplica));
-    }
-  }
-
   private void validateDelayedMovements(Map<String, ExternalView> 
externalViewsBefore)
       throws InterruptedException {
     // bring down one node, no partition should be moved.
@@ -363,7 +321,7 @@ public class TestDelayedAutoRebalance extends 
ZkIntegrationTestBase {
       ExternalView ev =
           
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
       IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
-      validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica);
+      validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE);
       validateNoPartitionMove(is, externalViewsBefore.get(db), ev,
           _participants.get(0).getInstanceName(), false);
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/bf181daf/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java
index d9a46d2..5df2563 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java
@@ -63,7 +63,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance 
extends TestDelayedAut
       ExternalView ev =
           
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
       IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
-      validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica);
+      validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE);
       validateNoPartitionMove(is, externalViewsBefore.get(db), ev, instance, 
true);
     }
   }
@@ -86,7 +86,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance 
extends TestDelayedAut
       ExternalView ev =
           
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
       IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
-      validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica);
+      validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE);
       validateNoPartitionMove(is, externalViewsBefore.get(db), ev,
           _participants.get(0).getInstanceName(), true);
     }
@@ -113,7 +113,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance 
extends TestDelayedAut
       ExternalView ev =
           
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
       IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
-      validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica);
+      validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE);
       validateNoPartitionMove(is, externalViewsBefore.get(db), ev,
           _participants.get(0).getInstanceName(), true);
     }
@@ -127,7 +127,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance 
extends TestDelayedAut
       ExternalView ev =
           
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
       IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
-      validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica);
+      validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE);
     }
     setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, -1);
   }
@@ -150,7 +150,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance 
extends TestDelayedAut
       ExternalView ev =
           
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
       IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
-      validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica);
+      validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE);
       validateNoPartitionMove(is, externalViewsBefore.get(db), ev,
           _participants.get(0).getInstanceName(), true);
     }
@@ -164,7 +164,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance 
extends TestDelayedAut
       ExternalView ev =
           
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
       IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
-      validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica);
+      validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE);
     }
     setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, -1);
   }
@@ -188,7 +188,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance 
extends TestDelayedAut
       ExternalView ev =
           
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
       IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
-      validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica);
+      validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE);
       validateNoPartitionMove(is, externalViewsBefore.get(db), ev,
           _participants.get(0).getInstanceName(), true);
     }
@@ -199,7 +199,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance 
extends TestDelayedAut
       ExternalView ev =
           
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
       IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
-      validateMinActiveAndTopStateReplica(is, ev, _replica);
+      validateMinActiveAndTopStateReplica(is, ev, _replica, NUM_NODE);
     }
   }
 
@@ -217,7 +217,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance 
extends TestDelayedAut
       ExternalView ev =
           
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
       IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
-      validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica);
+      validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE);
       validateNoPartitionMove(is, externalViewsBefore.get(db), ev,
           _participants.get(0).getInstanceName(), true);
     }
@@ -240,9 +240,9 @@ public class TestDelayedAutoRebalanceWithDisabledInstance 
extends TestDelayedAut
           
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
 
       if (db.equals(testDb)) {
-        validateMinActiveAndTopStateReplica(idealState, ev, _replica);
+        validateMinActiveAndTopStateReplica(idealState, ev, _replica, 
NUM_NODE);
       } else {
-        validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica);
+        validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, 
NUM_NODE);
         validateNoPartitionMove(is, externalViewsBefore.get(db), ev,
             _participants.get(0).getInstanceName(), true);
       }
@@ -265,7 +265,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance 
extends TestDelayedAut
       ExternalView ev =
           
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
       IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
-      validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica);
+      validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE);
       validateNoPartitionMove(is, externalViewsBefore.get(db), ev,
           _participants.get(0).getInstanceName(), true);
     }
@@ -281,7 +281,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance 
extends TestDelayedAut
           
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
       IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(
           CLUSTER_NAME, db);
-      validateMinActiveAndTopStateReplica(is, ev, _replica);
+      validateMinActiveAndTopStateReplica(is, ev, _replica, NUM_NODE);
     }
 
     enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true);

http://git-wip-us.apache.org/repos/asf/helix/blob/bf181daf/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
index fafd78c..d2f4013 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
@@ -24,8 +24,12 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
 import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
 import 
org.apache.helix.controller.rebalancer.strategy.MultiRoundCrushRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
@@ -33,9 +37,23 @@ import 
org.apache.helix.integration.common.ZkIntegrationTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.mock.participant.DummyProcess;
+import org.apache.helix.mock.participant.MockMSModelFactory;
+import org.apache.helix.mock.participant.MockMSStateModel;
+import org.apache.helix.mock.participant.MockSchemataModelFactory;
+import org.apache.helix.mock.participant.MockTransition;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
 import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
 import org.apache.helix.tools.ClusterSetup;
 import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
@@ -146,6 +164,50 @@ public class TestMixedModeAutoRebalance extends 
ZkIntegrationTestBase {
     }
   }
 
+  @Test
+  public void testUserDefinedPreferenceListsInFullAutoWithErrors() throws 
Exception {
+    String db = "Test-DB-1";
+    createResourceWithDelayedRebalance(CLUSTER_NAME, db,
+        BuiltInStateModelDefinitions.MasterSlave.name(), 5, _replica, 
_replica, 0,
+        CrushRebalanceStrategy.class.getName());
+
+    IdealState idealState =
+        
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+    Map<String, List<String>> userDefinedPreferenceLists = 
idealState.getPreferenceLists();
+
+    List<String> newNodes = new ArrayList<>();
+    for (int i = NUM_NODE; i < NUM_NODE + _replica; i++) {
+      String instance = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _gSetupTool.addInstanceToCluster(CLUSTER_NAME, instance);
+
+      // start dummy participants
+      MockParticipantManager participant =
+          new TestMockParticipantManager(ZK_ADDR, CLUSTER_NAME, instance);
+      participant.syncStart();
+      _participants.add(participant);
+      newNodes.add(instance);
+    }
+
+    List<String> userDefinedPartitions = new ArrayList<>();
+    for (String partition : userDefinedPreferenceLists.keySet()) {
+      userDefinedPreferenceLists.put(partition, newNodes);
+      userDefinedPartitions.add(partition);
+    }
+
+    ResourceConfig resourceConfig =
+        new 
ResourceConfig.Builder(db).setPreferenceLists(userDefinedPreferenceLists).build();
+    _configAccessor.setResourceConfig(CLUSTER_NAME, db, resourceConfig);
+
+    //TODO: Trigger rebalancer, remove this once Helix controller is listening 
on resource config changes.
+    RebalanceScheduler.invokeRebalance(_dataAccessor, db);
+
+    Thread.sleep(1000);
+    ExternalView ev =
+        
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, 
db);
+    IdealState is = 
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+    validateMinActiveAndTopStateReplica(is, ev, _replica, NUM_NODE);
+  }
+
   private void verifyUserDefinedPreferenceLists(String db,
       Map<String, List<String>> userDefinedPreferenceLists, List<String> 
userDefinedPartitions)
       throws InterruptedException {
@@ -190,4 +252,33 @@ public class TestMixedModeAutoRebalance extends 
ZkIntegrationTestBase {
     }
     System.out.println("END " + CLASS_NAME + " at " + new 
Date(System.currentTimeMillis()));
   }
+
+  public static class TestMockParticipantManager extends 
MockParticipantManager {
+    public TestMockParticipantManager(String zkAddr, String clusterName, 
String instanceName) {
+      super(zkAddr, clusterName, instanceName);
+      _msModelFactory = new MockDelayMSStateModelFactory();
+    }
+  }
+
+  public static class MockDelayMSStateModelFactory extends MockMSModelFactory {
+    @Override
+    public MockDelayMSStateModel createNewStateModel(String resourceName,
+        String partitionKey) {
+      MockDelayMSStateModel model = new MockDelayMSStateModel(null);
+      return model;
+    }
+  }
+
+  // mock delay master-slave state model
+  @StateModelInfo(initialState = "OFFLINE", states = { "MASTER", "SLAVE", 
"ERROR" })
+  public static class MockDelayMSStateModel extends MockMSStateModel {
+    public MockDelayMSStateModel(MockTransition transition) {
+      super(transition);
+    }
+
+    @Transition(to = "*", from = "*")
+    public void generalTransitionHandle(Message message, NotificationContext 
context) {
+      throw new IllegalArgumentException("AAA");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/bf181daf/helix-core/src/test/resources/TestDelayedAutoRebalancer.MasterSlave.json
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/resources/TestDelayedAutoRebalancer.MasterSlave.json 
b/helix-core/src/test/resources/TestDelayedAutoRebalancer.MasterSlave.json
index 29c20b5..5f88600 100644
--- a/helix-core/src/test/resources/TestDelayedAutoRebalancer.MasterSlave.json
+++ b/helix-core/src/test/resources/TestDelayedAutoRebalancer.MasterSlave.json
@@ -99,7 +99,8 @@
         "localhost_2": "MASTER",
         "localhost_4": "SLAVE",
         "localhost_0": "DROPPED",
-        "localhost_1": "SLAVE"
+        "localhost_1": "SLAVE",
+        "localhost_3": "ERROR"
       }
     }
   ]

http://git-wip-us.apache.org/repos/asf/helix/blob/bf181daf/helix-core/src/test/resources/TestDelayedAutoRebalancer.OnlineOffline.json
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/resources/TestDelayedAutoRebalancer.OnlineOffline.json 
b/helix-core/src/test/resources/TestDelayedAutoRebalancer.OnlineOffline.json
index b2a6626..bf745bf 100644
--- a/helix-core/src/test/resources/TestDelayedAutoRebalancer.OnlineOffline.json
+++ b/helix-core/src/test/resources/TestDelayedAutoRebalancer.OnlineOffline.json
@@ -73,6 +73,7 @@
         "localhost_3": "ONLINE"
       },
       "bestPossibleStates": {
+        "localhost_2": "ERROR",
         "localhost_3": "ONLINE",
         "localhost_4": "ONLINE",
         "localhost_1": "ONLINE"
@@ -94,8 +95,9 @@
       "bestPossibleStates": {
         "localhost_3": "ONLINE",
         "localhost_4": "ONLINE",
-        "localhost_0": "DROPPED",
-        "localhost_1": "DROPPED"
+        "localhost_0": "ONLINE",
+        "localhost_1": "ONLINE",
+        "localhost_2": "ERROR"
       }
     }
   ]

Reply via email to