This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit f49986e24c27c49d8c877457753e32dcf8b540f4
Author: Junkai Xue <[email protected]>
AuthorDate: Tue May 25 10:19:08 2021 -0700

    [Replica Level Throttle] Make Pipeline in a correct order and fixes tests 
(#1750)
    
    * Make Pipeline in a correct order and fixes tests
    
    1. Make pipeline running in a correct order to process computation
    2. Add "DROPPED" case in the mapping to reflect the real case.
    3. Fixes test cases.
---
 .../helix/controller/GenericHelixController.java   |   4 +-
 .../stages/IntermediateStateCalcStage.java         |  36 +-
 .../resource/ResourceMessageGenerationPhase.java   |   7 +-
 .../helix/monitoring/mbeans/ResourceMonitor.java   |  58 ++--
 .../helix/controller/stages/BaseStageTest.java     |   9 +
 .../stages/TestIntermediateStateCalcStage.java     |  60 ++--
 .../controller/stages/TestRebalancePipeline.java   |  14 +-
 .../controller/stages/TestRecoveryLoadBalance.java | 225 -------------
 .../stages/TestStateTransitionPriority.java        |  95 ++++--
 .../TestNoThrottleDisabledPartitions.java          |  38 ++-
 .../controller/TestRedundantDroppedMessage.java    |   7 +-
 .../messaging/p2pMessage/TestP2PMessages.java      |   2 +-
 .../TestP2PMessagesAvoidDuplicatedMessage.java     |  11 +-
 .../p2pMessage/TestP2PStateTransitionMessages.java |   6 +-
 .../TestP2PWithStateCancellationMessage.java       |  11 +-
 .../monitoring/mbeans/TestRebalancerMetrics.java   |  18 +-
 .../test/resources/TestPartitionLevelPriority.json |   2 +-
 .../TestRecoveryLoadBalance.MasterSlave.json       | 367 ---------------------
 .../TestRecoveryLoadBalance.OnlineOffline.json     | 206 ------------
 19 files changed, 233 insertions(+), 943 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 4dad60f..4baac38 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -513,12 +513,14 @@ public class GenericHelixController implements 
IdealStateChangeListener, LiveIns
       // rebalance pipeline
       Pipeline rebalancePipeline = new Pipeline(pipelineName);
       rebalancePipeline.addStage(new BestPossibleStateCalcStage());
-      rebalancePipeline.addStage(new IntermediateStateCalcStage());
       // Need to add MaintenanceRecoveryStage here because 
MAX_PARTITIONS_PER_INSTANCE check could
       // only occur after IntermediateStateCalcStage calculation
       rebalancePipeline.addStage(new MaintenanceRecoveryStage());
       rebalancePipeline.addStage(new ResourceMessageGenerationPhase());
       rebalancePipeline.addStage(new MessageSelectionStage());
+      // The IntermediateStateCalcStage should be applied after message 
selection
+      // Messages are throttled already removed by IntermediateStateCalcStage 
in MessageSelection output
+      rebalancePipeline.addStage(new IntermediateStateCalcStage());
       rebalancePipeline.addStage(new MessageThrottleStage());
       rebalancePipeline.addStage(new ResourceMessageDispatchStage());
       rebalancePipeline.addStage(new PersistAssignmentStage());
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index b91dca6..b06352d 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -51,6 +51,7 @@ import org.apache.helix.model.Resource;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.helix.monitoring.mbeans.ResourceMonitor;
+import org.apache.helix.participant.statemachine.StateModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -347,11 +348,10 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
     Collections.sort(partitions, new 
PartitionPriorityComparator(bestPossiblePartitionStateMap.getStateMap(),
         currentStateOutput.getCurrentStateMap(resourceName), 
stateModelDef.getTopState()));
     for (Partition partition : partitions) {
-      List<Message> messagesToThrottle = new 
ArrayList<>(resourceMessageMap.get(partition));
-      if (messagesToThrottle == null || messagesToThrottle.isEmpty()) {
+      if (resourceMessageMap.get(partition) == null || 
resourceMessageMap.get(partition).isEmpty()) {
         continue;
       }
-
+      List<Message> messagesToThrottle = new 
ArrayList<>(resourceMessageMap.get(partition));
       Map<String, String> derivedCurrentStateMap = 
currentStateOutput.getCurrentStateMap(resourceName, partition)
           .entrySet()
           .stream()
@@ -468,6 +468,9 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
         StateTransitionThrottleConfig.RebalanceType rebalanceType =
             getRebalanceTypePerMessage(requiredStates, message, 
currentStateMap);
         String currentState = currentStateMap.get(message.getTgtName());
+        if (currentState == null) {
+          currentState = stateModelDefinition.getInitialState();
+        }
         if (!message.getToState().equals(currentState) && 
message.getFromState().equals(currentState)
             && !cache.getDisabledInstancesForPartition(resourceName, 
partition.getPartitionName())
             .contains(message.getTgtName())) {
@@ -833,7 +836,10 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
       // Higher priority for the partition with fewer replicas with states 
matching with IdealState
       int idealStateMatched1 = getIdealStateMatched(p1);
       int idealStateMatched2 = getIdealStateMatched(p2);
-      return Integer.compare(idealStateMatched1, idealStateMatched2);
+      if (idealStateMatched1 != idealStateMatched2) {
+        return Integer.compare(idealStateMatched1, idealStateMatched2);
+      }
+      return p1.getPartitionName().compareTo(p2.getPartitionName());
     }
 
     private int getMissTopStateIndex(Partition partition) {
@@ -889,17 +895,23 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
   private void computeIntermediateMap(PartitionStateMap intermediateStateMap,
       Map<Partition, Map<String, Message>> pendingMessageMap, Map<Partition, 
List<Message>> resourceMessageMap) {
     for (Map.Entry<Partition, Map<String, Message>> entry : 
pendingMessageMap.entrySet()) {
-      entry.getValue()
-          .entrySet()
-          .stream()
-          .forEach(
-              e -> intermediateStateMap.setState(entry.getKey(), 
e.getValue().getTgtName(), e.getValue().getToState()));
+      entry.getValue().entrySet().stream().forEach(e -> {
+        if 
(!e.getValue().getToState().equals(HelixDefinedState.DROPPED.name())) {
+          intermediateStateMap.setState(entry.getKey(), 
e.getValue().getTgtName(), e.getValue().getToState());
+        } else {
+          
intermediateStateMap.getStateMap().get(entry.getKey()).remove(e.getValue().getTgtName());
+        }
+      });
     }
 
     for (Map.Entry<Partition, List<Message>> entry : 
resourceMessageMap.entrySet()) {
-      entry.getValue()
-          .stream()
-          .forEach(e -> intermediateStateMap.setState(entry.getKey(), 
e.getTgtName(), e.getToState()));
+      entry.getValue().stream().forEach(e -> {
+        if (!e.getToState().equals(HelixDefinedState.DROPPED.name())) {
+          intermediateStateMap.setState(entry.getKey(), e.getTgtName(), 
e.getToState());
+        } else {
+          
intermediateStateMap.getStateMap().get(entry.getKey()).remove(e.getTgtName());
+        }
+      });
     }
   }
 
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/resource/ResourceMessageGenerationPhase.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/resource/ResourceMessageGenerationPhase.java
index 7c4d1b7..73b309e 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/resource/ResourceMessageGenerationPhase.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/resource/ResourceMessageGenerationPhase.java
@@ -20,6 +20,7 @@ package org.apache.helix.controller.stages.resource;
  */
 
 import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
 import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.controller.stages.IntermediateStateOutput;
 import org.apache.helix.controller.stages.MessageGenerationPhase;
@@ -30,8 +31,8 @@ import 
org.apache.helix.controller.stages.MessageGenerationPhase;
 public class ResourceMessageGenerationPhase extends MessageGenerationPhase {
   @Override
   public void process(ClusterEvent event) throws Exception {
-    IntermediateStateOutput intermediateStateOutput =
-        event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());
-    processEvent(event, intermediateStateOutput);
+    BestPossibleStateOutput bestPossibleStateOutput =
+        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
+    processEvent(event, bestPossibleStateOutput);
   }
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
index b659f9c..51d3321 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
@@ -60,10 +60,10 @@ public class ResourceMonitor extends DynamicMBeanProvider {
   private SimpleDynamicMetric<Long> _externalViewIdealStateDiff;
   private SimpleDynamicMetric<Long> _numLessMinActiveReplicaPartitions;
   private SimpleDynamicMetric<Long> _numLessReplicaPartitions;
-  private SimpleDynamicMetric<Long> _numPendingRecoveryRebalancePartitions;
-  private SimpleDynamicMetric<Long> _numPendingLoadRebalancePartitions;
-  private SimpleDynamicMetric<Long> _numRecoveryRebalanceThrottledPartitions;
-  private SimpleDynamicMetric<Long> _numLoadRebalanceThrottledPartitions;
+  private SimpleDynamicMetric<Long> _numPendingRecoveryRebalanceReplicas;
+  private SimpleDynamicMetric<Long> _numPendingLoadRebalanceReplicas;
+  private SimpleDynamicMetric<Long> _numRecoveryRebalanceThrottledReplicas;
+  private SimpleDynamicMetric<Long> _numLoadRebalanceThrottledReplicas;
   private SimpleDynamicMetric<Long> _numPendingStateTransitions;
 
   // Counters
@@ -110,14 +110,16 @@ public class ResourceMonitor extends DynamicMBeanProvider 
{
     _dynamicCapacityMetricsMap = new ConcurrentHashMap<>();
 
     _externalViewIdealStateDiff = new 
SimpleDynamicMetric("DifferenceWithIdealStateGauge", 0L);
-    _numLoadRebalanceThrottledPartitions =
-        new SimpleDynamicMetric("LoadRebalanceThrottledPartitionGauge", 0L);
-    _numRecoveryRebalanceThrottledPartitions =
-        new SimpleDynamicMetric("RecoveryRebalanceThrottledPartitionGauge", 
0L);
-    _numPendingLoadRebalancePartitions =
-        new SimpleDynamicMetric("PendingLoadRebalancePartitionGauge", 0L);
-    _numPendingRecoveryRebalancePartitions =
-        new SimpleDynamicMetric("PendingRecoveryRebalancePartitionGauge", 0L);
+    _numPendingRecoveryRebalanceReplicas =
+        new SimpleDynamicMetric("PendingRecoveryRebalanceReplicaGauge", 0L);
+    _numLoadRebalanceThrottledReplicas =
+        new SimpleDynamicMetric("LoadRebalanceThrottledReplicaGauge", 0L);
+    _numRecoveryRebalanceThrottledReplicas =
+        new SimpleDynamicMetric("RecoveryRebalanceThrottledReplicaGauge", 0L);
+    _numPendingLoadRebalanceReplicas =
+        new SimpleDynamicMetric("PendingLoadRebalanceReplicaGauge", 0L);
+    _numPendingRecoveryRebalanceReplicas =
+        new SimpleDynamicMetric("PendingRecoveryRebalanceReplicaGauge", 0L);
     _numLessReplicaPartitions = new 
SimpleDynamicMetric("MissingReplicaPartitionGauge", 0L);
     _numLessMinActiveReplicaPartitions =
         new SimpleDynamicMetric("MissingMinActiveReplicaPartitionGauge", 0L);
@@ -370,10 +372,10 @@ public class ResourceMonitor extends DynamicMBeanProvider 
{
   public void updateRebalancerStats(long numPendingRecoveryRebalancePartitions,
       long numPendingLoadRebalancePartitions, long 
numRecoveryRebalanceThrottledPartitions,
       long numLoadRebalanceThrottledPartitions) {
-    
_numPendingRecoveryRebalancePartitions.updateValue(numPendingRecoveryRebalancePartitions);
-    
_numPendingLoadRebalancePartitions.updateValue(numPendingLoadRebalancePartitions);
-    
_numRecoveryRebalanceThrottledPartitions.updateValue(numRecoveryRebalanceThrottledPartitions);
-    
_numLoadRebalanceThrottledPartitions.updateValue(numLoadRebalanceThrottledPartitions);
+    
_numPendingRecoveryRebalanceReplicas.updateValue(numPendingRecoveryRebalancePartitions);
+    
_numPendingLoadRebalanceReplicas.updateValue(numPendingLoadRebalancePartitions);
+    
_numRecoveryRebalanceThrottledReplicas.updateValue(numRecoveryRebalanceThrottledPartitions);
+    
_numLoadRebalanceThrottledReplicas.updateValue(numLoadRebalanceThrottledPartitions);
   }
 
   /**
@@ -422,20 +424,20 @@ public class ResourceMonitor extends DynamicMBeanProvider 
{
     return _numLessReplicaPartitions.getValue();
   }
 
-  public long getPendingRecoveryRebalancePartitionGauge() {
-    return _numPendingRecoveryRebalancePartitions.getValue();
+  public long getNumPendingRecoveryRebalanceReplicas() {
+    return _numPendingRecoveryRebalanceReplicas.getValue();
   }
 
-  public long getPendingLoadRebalancePartitionGauge() {
-    return _numPendingLoadRebalancePartitions.getValue();
+  public long getNumPendingLoadRebalanceReplicas() {
+    return _numPendingLoadRebalanceReplicas.getValue();
   }
 
-  public long getRecoveryRebalanceThrottledPartitionGauge() {
-    return _numRecoveryRebalanceThrottledPartitions.getValue();
+  public long getNumRecoveryRebalanceThrottledReplicas() {
+    return _numRecoveryRebalanceThrottledReplicas.getValue();
   }
 
-  public long getLoadRebalanceThrottledPartitionGauge() {
-    return _numLoadRebalanceThrottledPartitions.getValue();
+  public long getNumLoadRebalanceThrottledReplicas() {
+    return _numLoadRebalanceThrottledReplicas.getValue();
   }
 
   public long getNumPendingStateTransitionGauge() {
@@ -461,10 +463,10 @@ public class ResourceMonitor extends DynamicMBeanProvider 
{
         _numNonTopStatePartitions,
         _numLessMinActiveReplicaPartitions,
         _numLessReplicaPartitions,
-        _numPendingRecoveryRebalancePartitions,
-        _numPendingLoadRebalancePartitions,
-        _numRecoveryRebalanceThrottledPartitions,
-        _numLoadRebalanceThrottledPartitions,
+        _numPendingRecoveryRebalanceReplicas,
+        _numPendingLoadRebalanceReplicas,
+        _numRecoveryRebalanceThrottledReplicas,
+        _numLoadRebalanceThrottledReplicas,
         _externalViewIdealStateDiff,
         _successfulTopStateHandoffDurationCounter,
         _successTopStateHandoffCounter,
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
 
b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
index cc6b4df..2d4aa09 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
@@ -31,6 +31,7 @@ import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.model.Message;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.controller.pipeline.Stage;
 import org.apache.helix.controller.pipeline.StageContext;
@@ -276,4 +277,12 @@ public class BaseStageTest {
 
     return resourceMap;
   }
+
+  protected Message generateMessage(String fromState, String toState, String 
tgtName) {
+    Message message = new Message(new ZNRecord(UUID.randomUUID().toString()));
+    message.setTgtName(tgtName);
+    message.setFromState(fromState);
+    message.setToState(toState);
+    return message;
+  }
 }
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
index 9584edb..7f17dea 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
@@ -51,15 +51,14 @@ public class TestIntermediateStateCalcStage extends 
BaseStageTest {
     }
 
     preSetup(resources, nReplica, nReplica);
-    event.addAttribute(AttributeName.RESOURCES.name(),
-        getResourceMap(resources, nPartition, "OnlineOffline"));
+    event.addAttribute(AttributeName.RESOURCES.name(), 
getResourceMap(resources, nPartition, "OnlineOffline"));
     event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(),
         getResourceMap(resources, nPartition, "OnlineOffline"));
 
     // Initialize bestpossible state and current state
     BestPossibleStateOutput bestPossibleStateOutput = new 
BestPossibleStateOutput();
     CurrentStateOutput currentStateOutput = new CurrentStateOutput();
-
+    MessageOutput messageSelectOutput = new MessageOutput();
     IntermediateStateOutput expectedResult = new IntermediateStateOutput();
 
     _clusterConfig.setErrorOrRecoveryPartitionThresholdForLoadBalance(1);
@@ -79,11 +78,11 @@ public class TestIntermediateStateCalcStage extends 
BaseStageTest {
             // Regular recovery balance
             currentStateOutput.setCurrentState(resource, partition, 
instanceName, "OFFLINE");
             // add blocked state transition messages
-            Message pendingMessage = new Message("customType", "001");
-            pendingMessage.setToState("ONLINE");
+            Message pendingMessage = generateMessage("OFFLINE", "ONLINE", 
instanceName);
             currentStateOutput.setPendingMessage(resource, partition, 
instanceName, pendingMessage);
 
             bestPossibleStateOutput.setState(resource, partition, 
instanceName, "ONLINE");
+
             // should be recovered:
             expectedResult.setState(resource, partition, instanceName, 
"ONLINE");
           } else if (resource.endsWith("1")) {
@@ -91,6 +90,8 @@ public class TestIntermediateStateCalcStage extends 
BaseStageTest {
             currentStateOutput.setCurrentState(resource, partition, 
instanceName, "ONLINE");
             currentStateOutput.setCurrentState(resource, partition, 
instanceName + "-1", "OFFLINE");
             bestPossibleStateOutput.setState(resource, partition, 
instanceName, "ONLINE");
+            messageSelectOutput.addMessage(resource, partition,
+                generateMessage("OFFLINE", "DROPPED", instanceName + "-1"));
             // should be recovered:
             expectedResult.setState(resource, partition, instanceName, 
"ONLINE");
           } else if (resource.endsWith("2")) {
@@ -110,14 +111,16 @@ public class TestIntermediateStateCalcStage extends 
BaseStageTest {
                 expectedResult.setState(resource, partition, instanceName, 
"ERROR");
               } else {
                 currentStateOutput.setCurrentState(resource, partition, 
instanceName, "OFFLINE");
+                messageSelectOutput.addMessage(resource, partition, 
generateMessage("OFFLINE", "ONLINE", instanceName));
                 // Recovery balance
                 expectedResult.setState(resource, partition, instanceName, 
"ONLINE");
               }
             } else {
               currentStateOutput.setCurrentState(resource, partition, 
instanceName, "ONLINE");
-              currentStateOutput
-                  .setCurrentState(resource, partition, instanceName + "-1", 
"OFFLINE");
+              currentStateOutput.setCurrentState(resource, partition, 
instanceName + "-1", "OFFLINE");
               // load balance is throttled, so keep all current states
+              messageSelectOutput.addMessage(resource, partition,
+                  generateMessage("OFFLINE", "DROPPED", instanceName + "-1"));
               expectedResult.setState(resource, partition, instanceName, 
"ONLINE");
               // The following must be removed because now downward state 
transitions are allowed
               // expectedResult.setState(resource, partition, instanceName + 
"-1", "OFFLINE");
@@ -129,16 +132,18 @@ public class TestIntermediateStateCalcStage extends 
BaseStageTest {
               // This partition requires recovery
               currentStateOutput.setCurrentState(resource, partition, 
instanceName, "OFFLINE");
               bestPossibleStateOutput.setState(resource, partition, 
instanceName, "ONLINE");
+              messageSelectOutput.addMessage(resource, partition, 
generateMessage("OFFLINE", "ONLINE", instanceName));
               // After recovery, it should be back ONLINE
               expectedResult.setState(resource, partition, instanceName, 
"ONLINE");
             } else {
               // Other partitions require dropping of replicas
               currentStateOutput.setCurrentState(resource, partition, 
instanceName, "ONLINE");
-              currentStateOutput
-                  .setCurrentState(resource, partition, instanceName + "-1", 
"OFFLINE");
+              currentStateOutput.setCurrentState(resource, partition, 
instanceName + "-1", "OFFLINE");
               // BestPossibleState dictates that we only need one ONLINE 
replica
               bestPossibleStateOutput.setState(resource, partition, 
instanceName, "ONLINE");
               bestPossibleStateOutput.setState(resource, partition, 
instanceName + "-1", "DROPPED");
+              messageSelectOutput.addMessage(resource, partition,
+                  generateMessage("OFFLINE", "DROPPED", instanceName + "-1"));
               // So instanceName-1 will NOT be expected to show up in 
expectedResult
               expectedResult.setState(resource, partition, instanceName, 
"ONLINE");
               expectedResult.setState(resource, partition, instanceName + 
"-1", "DROPPED");
@@ -150,6 +155,7 @@ public class TestIntermediateStateCalcStage extends 
BaseStageTest {
               // Set up a partition requiring recovery
               currentStateOutput.setCurrentState(resource, partition, 
instanceName, "OFFLINE");
               bestPossibleStateOutput.setState(resource, partition, 
instanceName, "ONLINE");
+              messageSelectOutput.addMessage(resource, partition, 
generateMessage("OFFLINE", "ONLINE", instanceName));
               // After recovery, it should be back ONLINE
               expectedResult.setState(resource, partition, instanceName, 
"ONLINE");
             } else {
@@ -157,6 +163,8 @@ public class TestIntermediateStateCalcStage extends 
BaseStageTest {
               bestPossibleStateOutput.setState(resource, partition, 
instanceName, "ONLINE");
               // Check that load balance (bringing up a new node) did not take 
place
               bestPossibleStateOutput.setState(resource, partition, 
instanceName + "-1", "ONLINE");
+              messageSelectOutput.addMessage(resource, partition,
+                  generateMessage("OFFLINE", "ONLINE", instanceName + "-1"));
               expectedResult.setState(resource, partition, instanceName, 
"ONLINE");
             }
           }
@@ -166,9 +174,9 @@ public class TestIntermediateStateCalcStage extends 
BaseStageTest {
     }
 
     event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), 
bestPossibleStateOutput);
+    event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), 
messageSelectOutput);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
-    event.addAttribute(AttributeName.ControllerDataProvider.name(),
-        new ResourceControllerDataProvider());
+    event.addAttribute(AttributeName.ControllerDataProvider.name(), new 
ResourceControllerDataProvider());
     runStage(event, new ReadClusterDataStage());
     runStage(event, new IntermediateStateCalcStage());
 
@@ -177,7 +185,8 @@ public class TestIntermediateStateCalcStage extends 
BaseStageTest {
     for (String resource : resources) {
       // Note Assert.assertEquals won't work. If "actual" is an empty map, it 
won't compare
       // anything.
-      Assert.assertTrue(output.getPartitionStateMap(resource).getStateMap()
+      Assert.assertTrue(output.getPartitionStateMap(resource)
+          .getStateMap()
           
.equals(expectedResult.getPartitionStateMap(resource).getStateMap()));
     }
   }
@@ -195,14 +204,13 @@ public class TestIntermediateStateCalcStage extends 
BaseStageTest {
     }
 
     preSetup(resources, nReplica, nReplica);
-    event.addAttribute(AttributeName.RESOURCES.name(),
-        getResourceMap(resources, nPartition,
-            "OnlineOffline"));
+    event.addAttribute(AttributeName.RESOURCES.name(), 
getResourceMap(resources, nPartition, "OnlineOffline"));
     event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(),
         getResourceMap(resources, nPartition, "OnlineOffline"));
 
     // Initialize best possible state and current state
     BestPossibleStateOutput bestPossibleStateOutput = new 
BestPossibleStateOutput();
+    MessageOutput messageSelectOutput = new MessageOutput();
     CurrentStateOutput currentStateOutput = new CurrentStateOutput();
     IntermediateStateOutput expectedResult = new IntermediateStateOutput();
 
@@ -225,6 +233,7 @@ public class TestIntermediateStateCalcStage extends 
BaseStageTest {
               // Set up a partition requiring recovery
               currentStateOutput.setCurrentState(resource, partition, 
instanceName, "OFFLINE");
               bestPossibleStateOutput.setState(resource, partition, 
instanceName, "ONLINE");
+              messageSelectOutput.addMessage(resource, partition, 
generateMessage("OFFLINE", "ONLINE", instanceName));
               // After recovery, it should be back ONLINE
               expectedResult.setState(resource, partition, instanceName, 
"ONLINE");
             } else {
@@ -236,6 +245,8 @@ public class TestIntermediateStateCalcStage extends 
BaseStageTest {
 
               // This partition to bring up a replica (load balance will 
happen)
               bestPossibleStateOutput.setState(resource, partition, 
instanceName + "-1", "ONLINE");
+              messageSelectOutput.addMessage(resource, partition,
+                  generateMessage("OFFLINE", "ONLINE", instanceName + "-1"));
               expectedResult.setState(resource, partition, instanceName + 
"-1", "ONLINE");
             }
           }
@@ -246,8 +257,8 @@ public class TestIntermediateStateCalcStage extends 
BaseStageTest {
 
     event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), 
bestPossibleStateOutput);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
-    event.addAttribute(AttributeName.ControllerDataProvider.name(),
-        new ResourceControllerDataProvider());
+    event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), 
messageSelectOutput);
+    event.addAttribute(AttributeName.ControllerDataProvider.name(), new 
ResourceControllerDataProvider());
     runStage(event, new ReadClusterDataStage());
     runStage(event, new IntermediateStateCalcStage());
 
@@ -256,7 +267,8 @@ public class TestIntermediateStateCalcStage extends 
BaseStageTest {
     for (String resource : resources) {
       // Note Assert.assertEquals won't work. If "actual" is an empty map, it 
won't compare
       // anything.
-      Assert.assertEquals(output.getPartitionStateMap(resource).getStateMap(), 
expectedResult.getPartitionStateMap(resource).getStateMap());
+      Assert.assertEquals(output.getPartitionStateMap(resource).getStateMap(),
+          expectedResult.getPartitionStateMap(resource).getStateMap());
     }
   }
 
@@ -268,13 +280,11 @@ public class TestIntermediateStateCalcStage extends 
BaseStageTest {
 
     // Set up cluster configs
     _clusterConfig = 
accessor.getProperty(accessor.keyBuilder().clusterConfig());
-    _clusterConfig.setStateTransitionThrottleConfigs(ImmutableList
-        .of(new StateTransitionThrottleConfig(
-                StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE,
-                StateTransitionThrottleConfig.ThrottleScope.INSTANCE, 3),
-            new StateTransitionThrottleConfig(
-                StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE,
-                StateTransitionThrottleConfig.ThrottleScope.INSTANCE, 3)));
+    _clusterConfig.setStateTransitionThrottleConfigs(ImmutableList.of(
+        new 
StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE,
+            StateTransitionThrottleConfig.ThrottleScope.INSTANCE, 3),
+        new 
StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE,
+            StateTransitionThrottleConfig.ThrottleScope.INSTANCE, 3)));
     setClusterConfig(_clusterConfig);
   }
 }
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
index 66c1771..d262b7b 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
@@ -97,9 +97,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     rebalancePipeline.addStage(new ResourceComputationStage());
     rebalancePipeline.addStage(new CurrentStateComputationStage());
     rebalancePipeline.addStage(new BestPossibleStateCalcStage());
-    rebalancePipeline.addStage(new IntermediateStateCalcStage());
     rebalancePipeline.addStage(new ResourceMessageGenerationPhase());
     rebalancePipeline.addStage(new MessageSelectionStage());
+    rebalancePipeline.addStage(new IntermediateStateCalcStage());
     rebalancePipeline.addStage(new MessageThrottleStage());
     rebalancePipeline.addStage(new ResourceMessageDispatchStage());
 
@@ -133,9 +133,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
 
     Pipeline messagePipeline = new Pipeline();
     messagePipeline.addStage(new BestPossibleStateCalcStage());
-    messagePipeline.addStage(new IntermediateStateCalcStage());
     messagePipeline.addStage(new ResourceMessageGenerationPhase());
     messagePipeline.addStage(new MessageSelectionStage());
+    messagePipeline.addStage(new IntermediateStateCalcStage());
     messagePipeline.addStage(new MessageThrottleStage());
     messagePipeline.addStage(new ResourceMessageDispatchStage());
 
@@ -157,7 +157,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
 
     Thread.sleep(2 * MessageGenerationPhase.DEFAULT_OBSELETE_MSG_PURGE_DELAY);
     runPipeline(event, dataRefresh, false);
-    
+
     // Verify the stale message should be deleted
     Assert.assertTrue(TestHelper.verify(() -> {
       if (dataCache.getStaleMessages().size() != 0) {
@@ -334,9 +334,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     rebalancePipeline.addStage(new ResourceComputationStage());
     rebalancePipeline.addStage(new CurrentStateComputationStage());
     rebalancePipeline.addStage(new BestPossibleStateCalcStage());
-    rebalancePipeline.addStage(new IntermediateStateCalcStage());
     rebalancePipeline.addStage(new ResourceMessageGenerationPhase());
     rebalancePipeline.addStage(new MessageSelectionStage());
+    rebalancePipeline.addStage(new IntermediateStateCalcStage());
     rebalancePipeline.addStage(new MessageThrottleStage());
     rebalancePipeline.addStage(new ResourceMessageDispatchStage());
 
@@ -431,9 +431,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     rebalancePipeline.addStage(new ResourceComputationStage());
     rebalancePipeline.addStage(new CurrentStateComputationStage());
     rebalancePipeline.addStage(new BestPossibleStateCalcStage());
-    rebalancePipeline.addStage(new IntermediateStateCalcStage());
     rebalancePipeline.addStage(new ResourceMessageGenerationPhase());
     rebalancePipeline.addStage(new MessageSelectionStage());
+    rebalancePipeline.addStage(new IntermediateStateCalcStage());
     rebalancePipeline.addStage(new MessageThrottleStage());
     rebalancePipeline.addStage(new ResourceMessageDispatchStage());
 
@@ -510,9 +510,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     rebalancePipeline.addStage(new ResourceComputationStage());
     rebalancePipeline.addStage(new CurrentStateComputationStage());
     rebalancePipeline.addStage(new BestPossibleStateCalcStage());
-    rebalancePipeline.addStage(new IntermediateStateCalcStage());
     rebalancePipeline.addStage(new ResourceMessageGenerationPhase());
     rebalancePipeline.addStage(new MessageSelectionStage());
+    rebalancePipeline.addStage(new IntermediateStateCalcStage());
     rebalancePipeline.addStage(new MessageThrottleStage());
     rebalancePipeline.addStage(new ResourceMessageDispatchStage());
 
@@ -590,9 +590,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     rebalancePipeline.addStage(new ResourceComputationStage());
     rebalancePipeline.addStage(new CurrentStateComputationStage());
     rebalancePipeline.addStage(new BestPossibleStateCalcStage());
-    rebalancePipeline.addStage(new IntermediateStateCalcStage());
     rebalancePipeline.addStage(new ResourceMessageGenerationPhase());
     rebalancePipeline.addStage(new MessageSelectionStage());
+    rebalancePipeline.addStage(new IntermediateStateCalcStage());
     rebalancePipeline.addStage(new MessageThrottleStage());
     rebalancePipeline.addStage(new ResourceMessageDispatchStage());
 
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRecoveryLoadBalance.java
 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRecoveryLoadBalance.java
deleted file mode 100644
index 32d8663..0000000
--- 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRecoveryLoadBalance.java
+++ /dev/null
@@ -1,225 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.ObjectReader;
-import org.apache.helix.api.config.StateTransitionThrottleConfig;
-import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
-import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
-import 
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
-import org.apache.helix.model.ClusterConfig;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.Partition;
-import org.testng.Assert;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-public class TestRecoveryLoadBalance extends BaseStageTest {
-
-  private final String INPUT = "inputs";
-  private final String MIN_ACTIVE_REPLICAS = "minActiveReplicas";
-  private final String LOAD_BALANCE_THROTTLE = "loadBalanceThrottle";
-  private final String CURRENT_STATE = "currentStates";
-  private final String BEST_POSSIBLE_STATE = "bestPossibleStates";
-  private final String EXPECTED_STATE = "expectedStates";
-  private final String ERROR_OR_RECOVERY_PARTITION_THRESHOLD =
-      "errorOrRecoveryPartitionThresholdForLoadBalance";
-  private final String STATE_MODEL = "statemodel";
-  private ClusterConfig _clusterConfig;
-
-  @Test(dataProvider = "recoveryLoadBalanceInput")
-  public void testRecoveryAndLoadBalance(String stateModelDef,
-      int errorOrRecoveryPartitionThresholdForLoadBalance,
-      Map<String, Map<String, Map<String, String>>> stateMapping, int 
minActiveReplicas, int loadBalanceThrottle) {
-    System.out.println("START TestRecoveryLoadBalance at " + new 
Date(System.currentTimeMillis()));
-
-    String resourcePrefix = "resource";
-    int nResource = 1;
-    int nPartition = stateMapping.size();
-    int nReplica = 3;
-
-    Set<String> resourceSet = new HashSet<>();
-    for (int i = 0; i < nResource; i++) {
-      resourceSet.add(resourcePrefix + "_" + i);
-    }
-
-    preSetup(StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE, 
resourceSet, nReplica,
-        nReplica, stateModelDef, minActiveReplicas);
-
-    _clusterConfig.setErrorOrRecoveryPartitionThresholdForLoadBalance(
-        errorOrRecoveryPartitionThresholdForLoadBalance);
-    if (loadBalanceThrottle >= 0) {
-      _clusterConfig.setStateTransitionThrottleConfigs(Arrays.asList(
-          new StateTransitionThrottleConfig(
-              StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE,
-              StateTransitionThrottleConfig.ThrottleScope.RESOURCE, 
loadBalanceThrottle)));
-    }
-    setClusterConfig(_clusterConfig);
-
-    event.addAttribute(AttributeName.RESOURCES.name(), getResourceMap(
-        resourceSet.toArray(new String[resourceSet.size()]), nPartition, 
stateModelDef));
-    event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), 
getResourceMap(
-        resourceSet.toArray(new String[resourceSet.size()]), nPartition, 
stateModelDef));
-    event.addAttribute(AttributeName.ControllerDataProvider.name(),
-        new ResourceControllerDataProvider());
-
-    // Initialize bestpossible state and current state
-    BestPossibleStateOutput bestPossibleStateOutput = new 
BestPossibleStateOutput();
-    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
-    IntermediateStateOutput expectedResult = new IntermediateStateOutput();
-
-    for (String resource : resourceSet) {
-      IdealState is = 
accessor.getProperty(accessor.keyBuilder().idealStates(resource));
-      setSingleIdealState(is);
-
-      Map<String, List<String>> partitionMap = new HashMap<>();
-      for (int p = 0; p < nPartition; p++) {
-        Partition partition = new Partition(resource + "_" + p);
-
-        // Set input
-        for (int r = 0; r < 
stateMapping.get(partition.toString()).get(CURRENT_STATE).size(); r++) {
-          String instanceName = HOSTNAME_PREFIX + r;
-          currentStateOutput.setCurrentState(resource, partition, instanceName,
-              
stateMapping.get(partition.toString()).get(CURRENT_STATE).get(instanceName));
-        }
-        for (int r = 0; r < 
stateMapping.get(partition.toString()).get(BEST_POSSIBLE_STATE)
-            .size(); r++) {
-          String instanceName = HOSTNAME_PREFIX + r;
-          bestPossibleStateOutput.setState(resource, partition, instanceName,
-              
stateMapping.get(partition.toString()).get(BEST_POSSIBLE_STATE).get(instanceName));
-        }
-        for (int r = 0; r < 
stateMapping.get(partition.toString()).get(EXPECTED_STATE)
-            .size(); r++) {
-          String instanceName = HOSTNAME_PREFIX + r;
-          expectedResult.setState(resource, partition, instanceName,
-              
stateMapping.get(partition.toString()).get(EXPECTED_STATE).get(instanceName));
-        }
-
-        // Set partitionMap
-        for (int r = 0; r < nReplica; r++) {
-          String instanceName = HOSTNAME_PREFIX + r;
-          partitionMap.put(partition.getPartitionName(), 
Collections.singletonList(instanceName));
-        }
-      }
-      bestPossibleStateOutput.setPreferenceLists(resource, partitionMap);
-    }
-
-    event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), 
bestPossibleStateOutput);
-    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
-    runStage(event, new ReadClusterDataStage());
-    runStage(event, new IntermediateStateCalcStage());
-
-    IntermediateStateOutput output = 
event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());
-
-    for (String resource : resourceSet) {
-      // For debugging purposes
-      // Object map1 = output.getPartitionStateMap(resource).getStateMap();
-      // Object map2 = 
expectedResult.getPartitionStateMap(resource).getStateMap();
-
-      // Note Assert.assertEquals won't work. If "actual" is an empty map, it 
won't compare
-      // anything.
-      Assert.assertTrue(output.getPartitionStateMap(resource).getStateMap()
-          
.equals(expectedResult.getPartitionStateMap(resource).getStateMap()));
-    }
-
-    System.out.println("END TestRecoveryLoadBalance at " + new 
Date(System.currentTimeMillis()));
-  }
-
-  @DataProvider(name = "recoveryLoadBalanceInput")
-  public Object[][] rebalanceStrategies() {
-
-    try {
-      List<Object[]> data = new ArrayList<>();
-      // Add data
-      
data.addAll(loadTestInputs("TestRecoveryLoadBalance.OnlineOffline.json"));
-      data.addAll(loadTestInputs("TestRecoveryLoadBalance.MasterSlave.json"));
-
-      Object[][] ret = new Object[data.size()][];
-      for (int i = 0; i < data.size(); i++) {
-        ret[i] = data.get(i);
-      }
-      return ret;
-    } catch (Throwable e) {
-      return new Object[][] {
-          {}
-      };
-    }
-  }
-
-  public List<Object[]> loadTestInputs(String fileName) {
-    List<Object[]> ret = new ArrayList<>();
-    InputStream inputStream = 
getClass().getClassLoader().getResourceAsStream(fileName);
-    try {
-      ObjectReader mapReader = new ObjectMapper().reader(List.class);
-      List<Map<String, Object>> inputList = mapReader.readValue(inputStream);
-      for (Map<String, Object> inputMap : inputList) {
-        String stateModelName = (String) inputMap.get(STATE_MODEL);
-        int threshold = (int) 
inputMap.get(ERROR_OR_RECOVERY_PARTITION_THRESHOLD);
-        int minActiveReplicas = -1;
-        if (inputMap.get(MIN_ACTIVE_REPLICAS) != null) {
-          minActiveReplicas = 
Integer.parseInt(inputMap.get(MIN_ACTIVE_REPLICAS).toString());
-        }
-        int loadBalanceThrottle = -1;
-        if (inputMap.get(LOAD_BALANCE_THROTTLE) != null) {
-          loadBalanceThrottle = 
Integer.parseInt(inputMap.get(LOAD_BALANCE_THROTTLE).toString());
-        }
-        Map<String, Map<String, Map<String, String>>> stateMapping =
-            (Map<String, Map<String, Map<String, String>>>) 
inputMap.get(INPUT);
-        ret.add(new Object[] {
-            stateModelName, threshold, stateMapping, minActiveReplicas, 
loadBalanceThrottle
-        });
-      }
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-    return ret;
-  }
-
-  private void preSetup(StateTransitionThrottleConfig.RebalanceType 
rebalanceType,
-      Set<String> resourceSet, int numOfLiveInstances, int numOfReplicas, 
String stateModelName,
-      int minActiveReplica) {
-    setupIdealState(numOfLiveInstances, resourceSet.toArray(new 
String[resourceSet.size()]),
-        numOfLiveInstances, numOfReplicas, IdealState.RebalanceMode.FULL_AUTO, 
stateModelName,
-        DelayedAutoRebalancer.class.getName(), 
CrushEdRebalanceStrategy.class.getName(),
-        minActiveReplica);
-    setupStateModel();
-    setupLiveInstances(numOfLiveInstances);
-
-    // Set up cluster configs
-    _clusterConfig = 
accessor.getProperty(accessor.keyBuilder().clusterConfig());
-    StateTransitionThrottleConfig throttleConfig = new 
StateTransitionThrottleConfig(rebalanceType,
-        StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 
Integer.MAX_VALUE);
-    
_clusterConfig.setStateTransitionThrottleConfigs(Collections.singletonList(throttleConfig));
-    setClusterConfig(_clusterConfig);
-  }
-}
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPriority.java
 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPriority.java
index be817ef..22c20f7 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPriority.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPriority.java
@@ -62,6 +62,7 @@ public class TestStateTransitionPriority extends 
BaseStageTest {
 
     // Initialize bestpossible state and current state
     BestPossibleStateOutput bestPossibleStateOutput = new 
BestPossibleStateOutput();
+    MessageOutput messageSelectOutput = new MessageOutput();
     CurrentStateOutput currentStateOutput = new CurrentStateOutput();
 
     for (String resource : resourceMap.keySet()) {
@@ -76,11 +77,13 @@ public class TestStateTransitionPriority extends 
BaseStageTest {
           Collections.singletonList(instanceName));
       bestPossibleStateOutput.setPreferenceLists(resource, partitionMap);
       bestPossibleStateOutput.setState(resource, partition, instanceName, 
"SLAVE");
+      messageSelectOutput.addMessage(resource, partition, 
generateMessage("OFFLINE", "SLAVE", instanceName));
       currentStateOutput.setCurrentState(resource, partition, instanceName, 
"OFFLINE");
     }
 
 
     event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), 
bestPossibleStateOutput);
+    event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), 
messageSelectOutput);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
     runStage(event, new ReadClusterDataStage());
 
@@ -108,6 +111,7 @@ public class TestStateTransitionPriority extends 
BaseStageTest {
 
     // Initialize bestpossible state and current state
     BestPossibleStateOutput bestPossibleStateOutput = new 
BestPossibleStateOutput();
+    MessageOutput messageSelectOutput = new MessageOutput();
     CurrentStateOutput currentStateOutput = new CurrentStateOutput();
 
     for (String resource : resourceMap.keySet()) {
@@ -121,11 +125,13 @@ public class TestStateTransitionPriority extends 
BaseStageTest {
       String nextInstanceName = HOSTNAME_PREFIX + 
(Integer.parseInt(resource.split("_")[1]) + 1);
       partitionMap.put(partition.getPartitionName(), 
Collections.singletonList(nextInstanceName));
       bestPossibleStateOutput.setPreferenceLists(resource, partitionMap);
-      bestPossibleStateOutput.setState(resource, partition, nextInstanceName, 
"MASTER");
+      bestPossibleStateOutput.setState(resource, partition, instanceName, 
"MASTER");
+      bestPossibleStateOutput.setState(resource, partition, nextInstanceName, 
"SLAVE");
       currentStateOutput.setCurrentState(resource, partition, instanceName, 
"MASTER");
     }
 
     event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), 
bestPossibleStateOutput);
+    event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), 
messageSelectOutput);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
     event.addAttribute(AttributeName.ControllerDataProvider.name(),
         new ResourceControllerDataProvider());
@@ -134,8 +140,10 @@ public class TestStateTransitionPriority extends 
BaseStageTest {
     // Keep update the current state.
     List<String> resourcePriority = new ArrayList<String>();
     for (int i = 0; i < resourceMap.size(); i++) {
+      event.addAttribute(AttributeName.MESSAGES_SELECTED.name(),
+          generateMessageMapForResource(bestPossibleStateOutput, 
currentStateOutput, resourcePriority));
       runStage(event, new IntermediateStateCalcStage());
-      updateCurrentStatesForLoadBalance(resourcePriority, currentStateOutput);
+      updateCurrentStatesForLoadBalance(resourcePriority, currentStateOutput, 
bestPossibleStateOutput);
     }
 
     Assert.assertEquals(resourcePriority, expectedPriority);
@@ -164,7 +172,6 @@ public class TestStateTransitionPriority extends 
BaseStageTest {
     Resource resource = new Resource(resourceName);
     BestPossibleStateOutput bestPossibleStateOutput = new 
BestPossibleStateOutput();
     CurrentStateOutput currentStateOutput = new CurrentStateOutput();
-
     for (String partitionName : bestPossibleMap.keySet()) {
       Partition partition = new Partition(partitionName);
       bestPossibleStateOutput.setPreferenceList(resourceName, partitionName, 
preferenceList);
@@ -173,6 +180,7 @@ public class TestStateTransitionPriority extends 
BaseStageTest {
             bestPossibleMap.get(partitionName).get(instanceName));
         currentStateOutput.setCurrentState(resourceName, partition, 
instanceName,
             currentStateMap.get(partitionName).get(instanceName));
+
       }
       resource.addPartition(partitionName);
     }
@@ -183,6 +191,8 @@ public class TestStateTransitionPriority extends 
BaseStageTest {
     event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(),
         Collections.singletonMap(resourceName, resource));
     event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), 
bestPossibleStateOutput);
+    event.addAttribute(AttributeName.MESSAGES_SELECTED.name(),
+        generateMessageMapForPartition(bestPossibleMap, currentStateMap, 
Collections.emptyList(), resourceName));
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
     event.addAttribute(AttributeName.ControllerDataProvider.name(),
         new ResourceControllerDataProvider());
@@ -191,9 +201,10 @@ public class TestStateTransitionPriority extends 
BaseStageTest {
     // Keep update the current state.
     List<String> partitionPriority = new ArrayList<String>();
     for (int i = 0; i < bestPossibleMap.size(); i++) {
+      event.addAttribute(AttributeName.MESSAGES_SELECTED.name(),
+          generateMessageMapForPartition(bestPossibleMap, currentStateMap, 
partitionPriority, resourceName));
       runStage(event, new IntermediateStateCalcStage());
-      updateCurrentStateForPartitionLevelPriority(partitionPriority, 
currentStateOutput,
-          resourceName, bestPossibleMap);
+      updateCurrentStateForPartitionLevelPriority(partitionPriority, 
currentStateOutput, resourceName, bestPossibleMap);
     }
 
     Assert.assertEquals(partitionPriority, expectedPriority);
@@ -287,25 +298,52 @@ public class TestStateTransitionPriority extends 
BaseStageTest {
     }
   }
 
-  private void updateCurrentStatesForLoadBalance(List<String> resourcePriority,
-      CurrentStateOutput currentStateOutput) {
-    IntermediateStateOutput output = 
event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());
-    for (PartitionStateMap partitionStateMap : 
output.getResourceStatesMap().values()) {
-      String resourceName = partitionStateMap.getResourceName();
+  private void updateCurrentStatesForLoadBalance(List<String> 
resourcePriority, CurrentStateOutput currentStateOutput,
+      BestPossibleStateOutput bestPossibleStateOutput) {
+    MessageOutput output = 
event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+    for (String resourceName : 
bestPossibleStateOutput.getResourceStatesMap().keySet()) {
       Partition partition = new Partition(resourceName + "_0");
-      String oldInstance = HOSTNAME_PREFIX + resourceName.split("_")[1];
-      String expectedInstance =
-          HOSTNAME_PREFIX + (Integer.parseInt(resourceName.split("_")[1]) + 1);
-      if 
(partitionStateMap.getPartitionMap(partition).containsKey(expectedInstance)
-          && !resourcePriority.contains(resourceName)) {
-        currentStateOutput.getCurrentStateMap(resourceName, 
partition).remove(oldInstance);
-        updateCurrentOutput(resourcePriority, currentStateOutput, 
resourceName, partition,
-            expectedInstance, "MASTER");
+      if (output.getResourceMessageMap(resourceName).get(partition) != null
+          && output.getResourceMessageMap(resourceName).get(partition).size() 
> 0) {
+        String nextInstanceName = HOSTNAME_PREFIX + 
(Integer.parseInt(resourceName.split("_")[1]) + 1);
+        currentStateOutput.setCurrentState(resourceName, partition, 
nextInstanceName, "SLAVE");
+        resourcePriority.add(resourceName);
         break;
       }
     }
   }
 
+  private MessageOutput generateMessageMapForResource(BestPossibleStateOutput 
bestPossibleStateOutput,
+      CurrentStateOutput currentStateOutput, List<String> resourcePrirority) {
+    MessageOutput messageSelectOutput = new MessageOutput();
+    for (String resource : 
bestPossibleStateOutput.getResourceStatesMap().keySet()) {
+      if (!resourcePrirority.contains(resource) && 
!bestPossibleStateOutput.getPartitionStateMap(resource)
+          .getStateMap()
+          .equals(currentStateOutput.getCurrentStateMap(resource))) {
+        messageSelectOutput.addMessage(resource, new Partition(resource + 
"_0"),
+            generateMessage("OFFLINE", "SLAVE", (HOSTNAME_PREFIX + 
(Integer.parseInt(resource.split("_")[1]) + 1))));
+      }
+    }
+    return messageSelectOutput;
+  }
+
+  private MessageOutput generateMessageMapForPartition(Map<String, Map<String, 
String>> bestPossibleMap,
+      Map<String, Map<String, String>> currentStateMap, List<String> 
partitionPriority, String resourceName) {
+    MessageOutput messageSelectOutput = new MessageOutput();
+    for (String partitionName : bestPossibleMap.keySet()) {
+      for (String instanceName : bestPossibleMap.get(partitionName).keySet()) {
+        if (!partitionPriority.contains(partitionName) && 
!bestPossibleMap.get(partitionName)
+            .get(instanceName)
+            .equals(currentStateMap.get(partitionName).get(instanceName))) {
+          messageSelectOutput.addMessage(resourceName, new 
Partition(partitionName),
+              
generateMessage(currentStateMap.get(partitionName).get(instanceName),
+                  bestPossibleMap.get(partitionName).get(instanceName), 
instanceName));
+        }
+      }
+    }
+    return messageSelectOutput;
+  }
+
   private void updateCurrentOutput(List<String> resourcePriority,
       CurrentStateOutput currentStateOutput, String resourceName, Partition 
partition,
       String instanceName, String state) {
@@ -317,19 +355,14 @@ public class TestStateTransitionPriority extends 
BaseStageTest {
   private void updateCurrentStateForPartitionLevelPriority(List<String> 
partitionPriority,
       CurrentStateOutput currentStateOutput, String resourceName,
       Map<String, Map<String, String>> bestPossibleMap) {
-    IntermediateStateOutput output = 
event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());
-    PartitionStateMap partitionStateMap = 
output.getPartitionStateMap(resourceName);
-    for (Partition partition : partitionStateMap.getStateMap().keySet()) {
-      Map<String, String> instanceStateMap = 
bestPossibleMap.get(partition.getPartitionName());
-      if (partitionStateMap.getPartitionMap(partition).equals(instanceStateMap)
-          && !partitionPriority.contains(partition.getPartitionName())) {
-        partitionPriority.add(partition.getPartitionName());
-        for (String instanceName : instanceStateMap.keySet()) {
-          currentStateOutput.setCurrentState(resourceName, partition, 
instanceName,
-              instanceStateMap.get(instanceName));
-        }
-        break;
+    MessageOutput output = 
event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+    output.getResourceMessageMap(resourceName).entrySet().stream().filter(e -> 
e.getValue().size() > 0).forEach(e -> {
+        partitionPriority.add(e.getKey().toString());
+
+      for (String instanceName : 
bestPossibleMap.get(e.getKey().toString()).keySet()) {
+        currentStateOutput.setCurrentState(resourceName, e.getKey(), 
instanceName,
+            bestPossibleMap.get(e.getKey().toString()).get(instanceName));
       }
-    }
+    });
   }
 }
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java
 
b/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java
index df6bc02..98dd281 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java
@@ -69,9 +69,9 @@ public class TestNoThrottleDisabledPartitions extends 
ZkTestBase {
     setupEnvironment(participantCount);
 
     // Set the throttling only for load balance
-    setThrottleConfigForLoadBalance();
+    setThrottleConfigForLoadBalance(1);
 
-    // Disable instance 0 so that it will cause a partition to do a recovery 
balance
+    // Disable instance 0 so that it will cause a partition to do a load 
balance
     PropertyKey key = 
_accessor.keyBuilder().instanceConfig(_participants[0].getInstanceName());
     InstanceConfig instanceConfig = _accessor.getProperty(key);
     instanceConfig.setInstanceEnabled(false);
@@ -261,7 +261,7 @@ public class TestNoThrottleDisabledPartitions extends 
ZkTestBase {
   public void testNoThrottleOnDisabledPartition() throws Exception {
     int participantCount = 3;
     setupEnvironment(participantCount);
-    setThrottleConfig();
+    setThrottleConfig(3); // Convert partition to replica mapping should be 1 
-> 3
 
     // Disable a partition so that it will not be subject to throttling
     String partitionName = _resourceName + "0_0";
@@ -292,8 +292,9 @@ public class TestNoThrottleDisabledPartitions extends 
ZkTestBase {
     controller.syncStart();
     Thread.sleep(500L);
 
+    // The throttle quota will be consumed by first partition with all
     for (MockParticipantManager participantManager : _participants) {
-      Assert.assertTrue(verifyTwoMessages(participantManager));
+      Assert.assertTrue(verifySingleMessage(participantManager));
     }
 
     // clean up the cluster
@@ -339,11 +340,14 @@ public class TestNoThrottleDisabledPartitions extends 
ZkTestBase {
     controller.syncStop();
   }
 
+  private void setThrottleConfig() {
+    setThrottleConfig(1);
+  }
   /**
    * Set all throttle configs at 1 so that we could test by observing the 
number of ongoing
    * transitions.
    */
-  private void setThrottleConfig() {
+  private void setThrottleConfig(int maxReplicas) {
     PropertyKey.Builder keyBuilder = _accessor.keyBuilder();
 
     ClusterConfig clusterConfig = 
_accessor.getProperty(_accessor.keyBuilder().clusterConfig());
@@ -353,27 +357,31 @@ public class TestNoThrottleDisabledPartitions extends 
ZkTestBase {
     // Add throttling at cluster-level
     throttleConfigs.add(new StateTransitionThrottleConfig(
         StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE,
-        StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 1));
+        StateTransitionThrottleConfig.ThrottleScope.CLUSTER, maxReplicas));
     throttleConfigs.add(
         new 
StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE,
-            StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 1));
+            StateTransitionThrottleConfig.ThrottleScope.CLUSTER, maxReplicas));
     throttleConfigs
         .add(new 
StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.ANY,
-            StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 1));
+            StateTransitionThrottleConfig.ThrottleScope.CLUSTER, maxReplicas));
 
     // Add throttling at instance level
     throttleConfigs
         .add(new 
StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.ANY,
-            StateTransitionThrottleConfig.ThrottleScope.INSTANCE, 1));
+            StateTransitionThrottleConfig.ThrottleScope.INSTANCE, 
maxReplicas));
 
     clusterConfig.setStateTransitionThrottleConfigs(throttleConfigs);
     _accessor.setProperty(keyBuilder.clusterConfig(), clusterConfig);
   }
 
+  private void setThrottleConfigForLoadBalance() {
+    setThrottleConfigForLoadBalance(0);
+  }
+
   /**
    * Set throttle limits only for load balance so that none of them would 
happen.
    */
-  private void setThrottleConfigForLoadBalance() {
+  private void setThrottleConfigForLoadBalance(int maxReplicas) {
     PropertyKey.Builder keyBuilder = _accessor.keyBuilder();
 
     ClusterConfig clusterConfig = 
_accessor.getProperty(_accessor.keyBuilder().clusterConfig());
@@ -383,12 +391,12 @@ public class TestNoThrottleDisabledPartitions extends 
ZkTestBase {
     // Add throttling at cluster-level
     throttleConfigs.add(
         new 
StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE,
-            StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 0));
+            StateTransitionThrottleConfig.ThrottleScope.CLUSTER, maxReplicas));
 
     // Add throttling at instance level
     throttleConfigs.add(
         new 
StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE,
-            StateTransitionThrottleConfig.ThrottleScope.INSTANCE, 0));
+            StateTransitionThrottleConfig.ThrottleScope.INSTANCE, 
maxReplicas));
 
     clusterConfig.setStateTransitionThrottleConfigs(throttleConfigs);
     _accessor.setProperty(keyBuilder.clusterConfig(), clusterConfig);
@@ -481,15 +489,15 @@ public class TestNoThrottleDisabledPartitions extends 
ZkTestBase {
   }
 
   /**
-   * Ensure that there are 2 messages for a given Participant.
+   * Ensure that there are 1 messages for a given Participant.
    * @param participant
    * @return
    */
-  private boolean verifyTwoMessages(final MockParticipantManager participant) {
+  private boolean verifySingleMessage(final MockParticipantManager 
participant) {
     PropertyKey key = 
_accessor.keyBuilder().messages(participant.getInstanceName());
     List<String> messageNames = _accessor.getChildNames(key);
     if (messageNames != null) {
-      return messageNames.size() == 2;
+      return messageNames.size() == 1;
     }
     return false;
   }
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java
 
b/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java
index 3bd9e45..9539b98 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java
@@ -31,6 +31,7 @@ import org.apache.helix.controller.stages.ClusterEventType;
 import org.apache.helix.controller.stages.CurrentStateComputationStage;
 import org.apache.helix.controller.stages.IntermediateStateCalcStage;
 import org.apache.helix.controller.stages.MessageOutput;
+import org.apache.helix.controller.stages.MessageSelectionStage;
 import org.apache.helix.controller.stages.ResourceComputationStage;
 import 
org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
 import org.apache.helix.model.IdealState;
@@ -75,11 +76,13 @@ public class TestRedundantDroppedMessage extends 
TaskSynchronizedTestBase {
     runStage(event, new ResourceComputationStage());
     runStage(event, new CurrentStateComputationStage());
     runStage(event, new BestPossibleStateCalcStage());
-    runStage(event, new IntermediateStateCalcStage());
     Assert.assertEquals(cache.getCachedIdealMapping().size(), 1);
     runStage(event, new ResourceMessageGenerationPhase());
+    runStage(event, new MessageSelectionStage());
+    runStage(event, new IntermediateStateCalcStage());
+
 
-    MessageOutput messageOutput = 
event.getAttribute(AttributeName.MESSAGES_ALL.name());
+    MessageOutput messageOutput = 
event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
     Assert
         .assertEquals(messageOutput.getMessages(resourceName, new 
Partition(partitionName)).size(),
             1);
diff --git 
a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java
 
b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java
index 4f5f60c..1f1d7bd 100644
--- 
a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java
+++ 
b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java
@@ -100,9 +100,9 @@ public class TestP2PMessages extends BaseStageTest {
     _fullPipeline.addStage(new ResourceComputationStage());
     _fullPipeline.addStage(new CurrentStateComputationStage());
     _fullPipeline.addStage(new BestPossibleStateCalcStage());
-    _fullPipeline.addStage(new IntermediateStateCalcStage());
     _fullPipeline.addStage(new ResourceMessageGenerationPhase());
     _fullPipeline.addStage(new MessageSelectionStage());
+    _fullPipeline.addStage(new IntermediateStateCalcStage());
     _fullPipeline.addStage(new MessageThrottleStage());
     _fullPipeline.addStage(new ResourceMessageDispatchStage());
 
diff --git 
a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
 
b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
index 56e1f5a..1d164c5 100644
--- 
a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
+++ 
b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
@@ -87,14 +87,15 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends 
BaseStageTest {
     _fullPipeline = new Pipeline("FullPipeline");
     _fullPipeline.addStage(new ReadClusterDataStage());
     _fullPipeline.addStage(new BestPossibleStateCalcStage());
-    _fullPipeline.addStage(new IntermediateStateCalcStage());
     _fullPipeline.addStage(new ResourceMessageGenerationPhase());
     _fullPipeline.addStage(new MessageSelectionStage());
+    _fullPipeline.addStage(new IntermediateStateCalcStage());
     _fullPipeline.addStage(new MessageThrottleStage());
 
     _messagePipeline = new Pipeline("MessagePipeline");
     _messagePipeline.addStage(new ResourceMessageGenerationPhase());
     _messagePipeline.addStage(new MessageSelectionStage());
+    _messagePipeline.addStage(new IntermediateStateCalcStage());
     _messagePipeline.addStage(new MessageThrottleStage());
 
 
@@ -126,7 +127,7 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends 
BaseStageTest {
 
     _fullPipeline.handle(event);
 
-    _bestpossibleState = 
event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());
+    _bestpossibleState = 
event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
 
     MessageOutput messageOutput =
         event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
@@ -217,7 +218,7 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends 
BaseStageTest {
 
 
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
-    event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), 
_bestpossibleState);
+    event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), 
_bestpossibleState);
 
     _messagePipeline.handle(event);
 
@@ -273,7 +274,7 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends 
BaseStageTest {
     instanceStateMap.put(thirdMaster, "MASTER");
     _bestpossibleState.setState(_db, _partition, instanceStateMap);
 
-    event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), 
_bestpossibleState);
+    event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), 
_bestpossibleState);
 
     _messagePipeline.handle(event);
 
@@ -290,7 +291,7 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends 
BaseStageTest {
     currentStateOutput.setPendingMessage(_db, _partition, secondMaster, 
relayMessage);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
 
-    event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), 
_bestpossibleState);
+    event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), 
_bestpossibleState);
 
     _messagePipeline.handle(event);
 
diff --git 
a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
 
b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
index bf53682..cec0d2b 100644
--- 
a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
+++ 
b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
@@ -199,9 +199,9 @@ public class TestP2PStateTransitionMessages extends 
BaseStageTest {
     event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), 
bestPossibleStateOutput);
 
     pipeline = new Pipeline("test");
-    pipeline.addStage(new IntermediateStateCalcStage());
     pipeline.addStage(new ResourceMessageGenerationPhase());
     pipeline.addStage(new MessageSelectionStage());
+    pipeline.addStage(new IntermediateStateCalcStage());
     pipeline.addStage(new MessageThrottleStage());
 
     pipeline.handle(event);
@@ -223,9 +223,9 @@ public class TestP2PStateTransitionMessages extends 
BaseStageTest {
 
 
     pipeline = new Pipeline("test");
-    pipeline.addStage(new IntermediateStateCalcStage());
     pipeline.addStage(new ResourceMessageGenerationPhase());
     pipeline.addStage(new MessageSelectionStage());
+    pipeline.addStage(new IntermediateStateCalcStage());
     pipeline.addStage(new MessageThrottleStage());
 
     pipeline.handle(event);
@@ -359,9 +359,9 @@ public class TestP2PStateTransitionMessages extends 
BaseStageTest {
     Pipeline pipeline = new Pipeline("test");
     pipeline.addStage(new ReadClusterDataStage());
     pipeline.addStage(new BestPossibleStateCalcStage());
-    pipeline.addStage(new IntermediateStateCalcStage());
     pipeline.addStage(new ResourceMessageGenerationPhase());
     pipeline.addStage(new MessageSelectionStage());
+    pipeline.addStage(new IntermediateStateCalcStage());
     pipeline.addStage(new MessageThrottleStage());
 
     return pipeline;
diff --git 
a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java
 
b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java
index a8191cb..f685bf4 100644
--- 
a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java
+++ 
b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java
@@ -31,6 +31,7 @@ import org.apache.helix.HelixManager;
 import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.stages.AttributeName;
 import org.apache.helix.controller.stages.BaseStageTest;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
 import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.controller.stages.ClusterEventType;
 import org.apache.helix.controller.stages.CurrentStateOutput;
@@ -166,11 +167,11 @@ public class TestP2PWithStateCancellationMessage extends 
BaseStageTest {
     currentStateOutput.setCurrentState(RESOURCE_NAME, new Partition("1"), 
"localhost_2", "MASTER");
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
 
-    IntermediateStateOutput intermediateStateOutput = new 
IntermediateStateOutput();
-    intermediateStateOutput.setState(RESOURCE_NAME, new Partition("0"), 
"localhost_1", "SLAVE");
-    intermediateStateOutput.setState(RESOURCE_NAME, new Partition("0"), 
"localhost_2", "MASTER");
-    intermediateStateOutput.setState(RESOURCE_NAME, new Partition("1"), 
"localhost_2", "MASTER");
-    event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), 
intermediateStateOutput);
+    BestPossibleStateOutput bestPossibleStateOutput = new 
BestPossibleStateOutput();
+    bestPossibleStateOutput.setState(RESOURCE_NAME, new Partition("0"), 
"localhost_1", "SLAVE");
+    bestPossibleStateOutput.setState(RESOURCE_NAME, new Partition("0"), 
"localhost_2", "MASTER");
+    bestPossibleStateOutput.setState(RESOURCE_NAME, new Partition("1"), 
"localhost_2", "MASTER");
+    event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), 
bestPossibleStateOutput);
 
     return event;
   }
diff --git 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java
 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java
index 43bc8b1..ccc1431 100644
--- 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java
+++ 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java
@@ -13,7 +13,9 @@ import 
org.apache.helix.controller.stages.BestPossibleStateCalcStage;
 import org.apache.helix.controller.stages.BestPossibleStateOutput;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.controller.stages.IntermediateStateCalcStage;
+import org.apache.helix.controller.stages.MessageSelectionStage;
 import org.apache.helix.controller.stages.ReadClusterDataStage;
+import 
org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
@@ -77,14 +79,17 @@ public class TestRebalancerMetrics extends BaseStageTest {
     setupThrottleConfig(cache.getClusterConfig(),
         StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE, 
maxPending);
     runStage(event, new BestPossibleStateCalcStage());
+    runStage(event, new ResourceMessageGenerationPhase());
+    runStage(event, new MessageSelectionStage());
     runStage(event, new IntermediateStateCalcStage());
 
     ClusterStatusMonitor clusterStatusMonitor = 
event.getAttribute(AttributeName.clusterStatusMonitor.name());
     ResourceMonitor resourceMonitor = 
clusterStatusMonitor.getResourceMonitor(resource);
 
-    
Assert.assertEquals(resourceMonitor.getPendingRecoveryRebalancePartitionGauge(),
 numPartition);
-    
Assert.assertEquals(resourceMonitor.getRecoveryRebalanceThrottledPartitionGauge(),
-        numPartition - maxPending);
+    
Assert.assertEquals(resourceMonitor.getNumPendingRecoveryRebalanceReplicas(),
+        numPartition * numReplica - 
resourceMonitor.getNumPendingLoadRebalanceReplicas());
+    
Assert.assertEquals(resourceMonitor.getNumRecoveryRebalanceThrottledReplicas(),
+        numPartition * numReplica - 
resourceMonitor.getNumPendingLoadRebalanceReplicas() - maxPending);
 
     System.out
         .println("END testRecoveryRebalanceMetrics at " + new 
Date(System.currentTimeMillis()));
@@ -135,15 +140,16 @@ public class TestRebalancerMetrics extends BaseStageTest {
     setupThrottleConfig(cache.getClusterConfig(),
         StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE, maxPending);
     runStage(event, new BestPossibleStateCalcStage());
+    runStage(event, new ResourceMessageGenerationPhase());
+    runStage(event, new MessageSelectionStage());
     runStage(event, new IntermediateStateCalcStage());
 
     ClusterStatusMonitor clusterStatusMonitor = 
event.getAttribute(AttributeName.clusterStatusMonitor.name());
     ResourceMonitor resourceMonitor = 
clusterStatusMonitor.getResourceMonitor(resource);
 
-    long numPendingLoadBalance = 
resourceMonitor.getPendingLoadRebalancePartitionGauge();
+    long numPendingLoadBalance = 
resourceMonitor.getNumPendingLoadRebalanceReplicas();
     Assert.assertTrue(numPendingLoadBalance > 0);
-    
Assert.assertEquals(resourceMonitor.getLoadRebalanceThrottledPartitionGauge(),
-        numPendingLoadBalance - maxPending);
+    
Assert.assertEquals(resourceMonitor.getNumLoadRebalanceThrottledReplicas(), 
numPendingLoadBalance - maxPending);
 
     System.out
         .println("END testLoadBalanceMetrics at " + new 
Date(System.currentTimeMillis()));
diff --git a/helix-core/src/test/resources/TestPartitionLevelPriority.json 
b/helix-core/src/test/resources/TestPartitionLevelPriority.json
index 125277d..5e42d2d 100644
--- a/helix-core/src/test/resources/TestPartitionLevelPriority.json
+++ b/helix-core/src/test/resources/TestPartitionLevelPriority.json
@@ -143,8 +143,8 @@
         "Partition_0",
         "Partition_3",
         "Partition_2",
-        "Partition_1",
         "Partition_4",
+        "Partition_1",
         "Partition_5"
       ]
     }
diff --git 
a/helix-core/src/test/resources/TestRecoveryLoadBalance.MasterSlave.json 
b/helix-core/src/test/resources/TestRecoveryLoadBalance.MasterSlave.json
deleted file mode 100644
index 077f129..0000000
--- a/helix-core/src/test/resources/TestRecoveryLoadBalance.MasterSlave.json
+++ /dev/null
@@ -1,367 +0,0 @@
-[
-  {
-    "statemodel": "MasterSlave",
-    "errorOrRecoveryPartitionThresholdForLoadBalance": 0,
-    "inputs": {
-      "resource_0_0": {
-        "currentStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "SLAVE",
-          "localhost_3": "SLAVE"
-        },
-        "bestPossibleStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "SLAVE",
-          "localhost_3": "DROPPED"
-        },
-        "expectedStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "SLAVE",
-          "localhost_3": "DROPPED"
-        }
-      },
-      "resource_0_1": {
-        "currentStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "SLAVE"
-        },
-        "bestPossibleStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "SLAVE"
-        },
-        "expectedStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "SLAVE"
-        }
-      }
-    }
-  },
-  {
-    "statemodel": "MasterSlave",
-    "errorOrRecoveryPartitionThresholdForLoadBalance": 1,
-    "inputs": {
-      "resource_0_0": {
-        "currentStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "ERROR",
-          "localhost_2": "SLAVE",
-          "localhost_3": "SLAVE"
-        },
-        "bestPossibleStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "ERROR",
-          "localhost_2": "SLAVE",
-          "localhost_3": "DROPPED"
-        },
-        "expectedStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "ERROR",
-          "localhost_2": "SLAVE",
-          "localhost_3": "DROPPED"
-        }
-      },
-      "resource_0_1": {
-        "currentStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "SLAVE"
-        },
-        "bestPossibleStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "SLAVE"
-        },
-        "expectedStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "SLAVE"
-        }
-      }
-    }
-  },
-  {
-    "statemodel": "MasterSlave",
-    "errorOrRecoveryPartitionThresholdForLoadBalance": 0,
-    "inputs": {
-      "resource_0_0": {
-        "currentStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "ERROR",
-          "localhost_2": "SLAVE",
-          "localhost_3": "SLAVE"
-        },
-        "bestPossibleStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "ERROR",
-          "localhost_2": "SLAVE",
-          "localhost_3": "DROPPED"
-        },
-        "expectedStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "ERROR",
-          "localhost_2": "SLAVE",
-          "localhost_3": "DROPPED"
-        }
-      },
-      "resource_0_1": {
-        "currentStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "SLAVE"
-        },
-        "bestPossibleStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "SLAVE"
-        },
-        "expectedStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "SLAVE"
-        }
-      }
-    }
-  },
-  {
-    "statemodel": "MasterSlave",
-    "errorOrRecoveryPartitionThresholdForLoadBalance": 0,
-    "inputs": {
-      "resource_0_0": {
-        "currentStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "OFFLINE",
-          "localhost_2": "OFFLINE",
-          "localhost_3": "SLAVE"
-        },
-        "bestPossibleStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "SLAVE",
-          "localhost_3": "DROPPED"
-        },
-        "expectedStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "SLAVE",
-          "localhost_3": "DROPPED"
-        }
-      },
-      "resource_0_1": {
-        "currentStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "SLAVE"
-        },
-        "bestPossibleStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "SLAVE"
-        },
-        "expectedStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "SLAVE"
-        }
-      }
-    }
-  },
-  {
-    "statemodel": "MasterSlave",
-    "errorOrRecoveryPartitionThresholdForLoadBalance": 0,
-    "inputs": {
-      "resource_0_0": {
-        "currentStates": {
-          "localhost_0": "SLAVE",
-          "localhost_1": "SLAVE",
-          "localhost_2": "SLAVE"
-        },
-        "bestPossibleStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "SLAVE"
-        },
-        "expectedStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "SLAVE"
-        }
-      },
-      "resource_0_1": {
-        "currentStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "SLAVE"
-        },
-        "bestPossibleStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "SLAVE"
-        },
-        "expectedStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "SLAVE"
-        }
-      }
-    }
-  },
-  {
-    "statemodel": "MasterSlave",
-    "errorOrRecoveryPartitionThresholdForLoadBalance": 0,
-    "inputs": {
-      "resource_0_0": {
-        "currentStates": {
-          "localhost_0": "SLAVE",
-          "localhost_1": "SLAVE",
-          "localhost_2": "SLAVE"
-        },
-        "bestPossibleStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "SLAVE"
-        },
-        "expectedStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "SLAVE"
-        }
-      },
-      "resource_0_1": {
-        "currentStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "SLAVE",
-          "localhost_3": "OFFLINE"
-        },
-        "bestPossibleStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "SLAVE",
-          "localhost_3": "SLAVE"
-        },
-        "expectedStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "SLAVE",
-          "localhost_3": "OFFLINE"
-        }
-      }
-    }
-  },
-  {
-    "statemodel": "MasterSlave",
-    "errorOrRecoveryPartitionThresholdForLoadBalance": 0,
-    "loadBalanceThrottle": "0",
-    "minActiveReplica": "2",
-    "inputs": {
-      "resource_0_0": {
-        "currentStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "OFFLINE",
-          "localhost_3": "OFFLINE"
-        },
-        "bestPossibleStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "SLAVE",
-          "localhost_3": "DROPPED"
-        },
-        "expectedStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "OFFLINE",
-          "localhost_3": "OFFLINE"
-        }
-      },
-      "resource_0_1": {
-        "currentStates": {
-          "localhost_0": "SLAVE",
-          "localhost_1": "SLAVE",
-          "localhost_2": "OFFLINE"
-        },
-        "bestPossibleStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "SLAVE"
-        },
-        "expectedStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "SLAVE"
-        }
-      }
-    }
-  },
-  {
-    "statemodel": "MasterSlave",
-    "errorOrRecoveryPartitionThresholdForLoadBalance": 0,
-    "loadBalanceThrottle": "2",
-    "minActiveReplica": "2",
-    "inputs": {
-      "resource_0_0": {
-        "currentStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "SLAVE",
-          "localhost_3": "OFFLINE"
-        },
-        "bestPossibleStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "OFFLINE",
-          "localhost_3": "DROPPED"
-        },
-        "expectedStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "OFFLINE",
-          "localhost_3": "DROPPED"
-        }
-      },
-      "resource_0_1": {
-        "currentStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "OFFLINE"
-        },
-        "bestPossibleStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "SLAVE"
-        },
-        "expectedStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "SLAVE"
-        }
-      },
-      "resource_0_2": {
-        "currentStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "OFFLINE",
-          "localhost_3": "OFFLINE"
-        },
-        "bestPossibleStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "SLAVE",
-          "localhost_3": "SLAVE"
-        },
-        "expectedStates": {
-          "localhost_0": "MASTER",
-          "localhost_1": "SLAVE",
-          "localhost_2": "OFFLINE",
-          "localhost_3": "OFFLINE"
-        }
-      }
-    }
-  }
-]
diff --git 
a/helix-core/src/test/resources/TestRecoveryLoadBalance.OnlineOffline.json 
b/helix-core/src/test/resources/TestRecoveryLoadBalance.OnlineOffline.json
deleted file mode 100644
index 389d90e..0000000
--- a/helix-core/src/test/resources/TestRecoveryLoadBalance.OnlineOffline.json
+++ /dev/null
@@ -1,206 +0,0 @@
-[
-  {
-    "statemodel": "OnlineOffline",
-    "errorOrRecoveryPartitionThresholdForLoadBalance": 0,
-    "inputs": {
-      "resource_0_0": {
-        "currentStates": {
-          "localhost_0": "OFFLINE",
-          "localhost_1": "OFFLINE",
-          "localhost_2": "OFFLINE"
-        },
-        "bestPossibleStates": {
-          "localhost_0": "ONLINE",
-          "localhost_1": "ONLINE",
-          "localhost_2": "ONLINE"
-        },
-        "expectedStates": {
-          "localhost_0": "ONLINE",
-          "localhost_1": "ONLINE",
-          "localhost_2": "ONLINE"
-        }
-      },
-      "resource_0_1": {
-        "currentStates": {
-          "localhost_0": "ONLINE",
-          "localhost_1": "ONLINE",
-          "localhost_2": "ONLINE"
-        },
-        "bestPossibleStates": {
-          "localhost_0": "ONLINE",
-          "localhost_1": "ONLINE",
-          "localhost_2": "ONLINE",
-          "localhost_3": "ONLINE"
-        },
-        "expectedStates": {
-          "localhost_0": "ONLINE",
-          "localhost_1": "ONLINE",
-          "localhost_2": "ONLINE"
-        }
-      }
-    }
-  },
-  {
-    "statemodel": "OnlineOffline",
-    "errorOrRecoveryPartitionThresholdForLoadBalance": 0,
-    "inputs": {
-      "resource_0_0": {
-        "currentStates": {
-          "localhost_0": "OFFLINE",
-          "localhost_1": "OFFLINE",
-          "localhost_2": "OFFLINE"
-        },
-        "bestPossibleStates": {
-          "localhost_0": "ONLINE",
-          "localhost_1": "ONLINE",
-          "localhost_2": "ONLINE"
-        },
-        "expectedStates": {
-          "localhost_0": "ONLINE",
-          "localhost_1": "ONLINE",
-          "localhost_2": "ONLINE"
-        }
-      },
-      "resource_0_1": {
-        "currentStates": {
-          "localhost_0": "ONLINE",
-          "localhost_1": "ONLINE",
-          "localhost_2": "ONLINE",
-          "localhost_3": "ONLINE"
-        },
-        "bestPossibleStates": {
-          "localhost_0": "ONLINE",
-          "localhost_1": "ONLINE",
-          "localhost_2": "ONLINE",
-          "localhost_3": "DROPPED"
-        },
-        "expectedStates": {
-          "localhost_0": "ONLINE",
-          "localhost_1": "ONLINE",
-          "localhost_2": "ONLINE",
-          "localhost_3": "DROPPED"
-        }
-      }
-    }
-  },
-  {
-    "statemodel": "OnlineOffline",
-    "errorOrRecoveryPartitionThresholdForLoadBalance": 100,
-    "inputs": {
-      "resource_0_0": {
-        "currentStates": {
-          "localhost_0": "OFFLINE",
-          "localhost_1": "OFFLINE",
-          "localhost_2": "OFFLINE"
-        },
-        "bestPossibleStates": {
-          "localhost_0": "ONLINE",
-          "localhost_1": "ONLINE",
-          "localhost_2": "ONLINE"
-        },
-        "expectedStates": {
-          "localhost_0": "ONLINE",
-          "localhost_1": "ONLINE",
-          "localhost_2": "ONLINE"
-        }
-      },
-      "resource_0_1": {
-        "currentStates": {
-          "localhost_0": "ONLINE",
-          "localhost_1": "ONLINE",
-          "localhost_2": "ONLINE"
-        },
-        "bestPossibleStates": {
-          "localhost_0": "ONLINE",
-          "localhost_1": "ONLINE",
-          "localhost_2": "ONLINE",
-          "localhost_3": "ONLINE"
-        },
-        "expectedStates": {
-          "localhost_0": "ONLINE",
-          "localhost_1": "ONLINE",
-          "localhost_2": "ONLINE",
-          "localhost_3": "ONLINE"
-        }
-      }
-    }
-  },
-  {
-    "statemodel": "OnlineOffline",
-    "errorOrRecoveryPartitionThresholdForLoadBalance": 0,
-    "inputs": {
-      "resource_0_0": {
-        "currentStates": {
-          "localhost_0": "OFFLINE",
-          "localhost_1": "OFFLINE",
-          "localhost_2": "OFFLINE"
-        },
-        "bestPossibleStates": {
-          "localhost_0": "ONLINE",
-          "localhost_1": "ONLINE",
-          "localhost_2": "ONLINE"
-        },
-        "expectedStates": {
-          "localhost_0": "ONLINE",
-          "localhost_1": "ONLINE",
-          "localhost_2": "ONLINE"
-        }
-      },
-      "resource_0_1": {
-        "currentStates": {
-          "localhost_0": "ONLINE",
-          "localhost_1": "OFFLINE",
-          "localhost_2": "ONLINE"
-        },
-        "bestPossibleStates": {
-          "localhost_0": "DROPPED",
-          "localhost_1": "ONLINE",
-          "localhost_2": "ONLINE"
-        },
-        "expectedStates": {
-          "localhost_0": "ONLINE",
-          "localhost_1": "OFFLINE",
-          "localhost_2": "ONLINE"
-        }
-      }
-    }
-  },
-  {
-    "statemodel": "OnlineOffline",
-    "errorOrRecoveryPartitionThresholdForLoadBalance": 0,
-    "inputs": {
-      "resource_0_0": {
-        "currentStates": {
-          "localhost_0": "ONLINE",
-          "localhost_1": "ONLINE"
-        },
-        "bestPossibleStates": {
-          "localhost_0": "DROPPED",
-          "localhost_1": "ONLINE",
-          "localhost_2": "OFFLINE"
-        },
-        "expectedStates": {
-          "localhost_0": "ONLINE",
-          "localhost_1": "ONLINE"
-        }
-      },
-      "resource_0_1": {
-        "currentStates": {
-          "localhost_0": "OFFLINE",
-          "localhost_1": "OFFLINE",
-          "localhost_2": "OFFLINE"
-        },
-        "bestPossibleStates": {
-          "localhost_0": "ONLINE",
-          "localhost_1": "ONLINE",
-          "localhost_2": "ONLINE"
-        },
-        "expectedStates": {
-          "localhost_0": "ONLINE",
-          "localhost_1": "ONLINE",
-          "localhost_2": "ONLINE"
-        }
-      }
-    }
-  }
-]
\ No newline at end of file

Reply via email to