Repository: helix
Updated Branches:
  refs/heads/master f8ee313ee -> 1c855ae85


Introduce IntermediateStateCalStage stage to rebalance pipeline which computes 
the intermediate states for all resources based on their best possible states 
and other constraints.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/ad760ae9
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/ad760ae9
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/ad760ae9

Branch: refs/heads/master
Commit: ad760ae921bb94a6f928d0c745c056177b742dc9
Parents: f8ee313
Author: Lei Xia <[email protected]>
Authored: Mon Sep 25 14:26:46 2017 -0700
Committer: Junkai Xue <[email protected]>
Committed: Mon Sep 25 14:26:46 2017 -0700

----------------------------------------------------------------------
 .../controller/GenericHelixController.java      |   2 +
 .../controller/common/PartitionStateMap.java    |  79 ++++++++++++
 .../controller/common/ResourcesStateMap.java    |  95 +++++++++++++++
 .../helix/controller/stages/AttributeName.java  |   1 +
 .../stages/BestPossibleStateOutput.java         | 120 +++++--------------
 .../stages/IntermediateStateCalcStage.java      |  78 ++++++++++++
 .../stages/IntermediateStateOutput.java         |  28 +++++
 .../stages/MessageGenerationPhase.java          |  10 +-
 .../stages/TestRebalancePipeline.java           |   3 +
 9 files changed, 319 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/ad760ae9/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 4cea33b..0cb23f9 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -51,6 +51,7 @@ import 
org.apache.helix.controller.stages.ClusterEventBlockingQueue;
 import org.apache.helix.controller.stages.CompatibilityCheckStage;
 import org.apache.helix.controller.stages.CurrentStateComputationStage;
 import org.apache.helix.controller.stages.ExternalViewComputeStage;
+import org.apache.helix.controller.stages.IntermediateStateCalcStage;
 import org.apache.helix.controller.stages.MessageGenerationPhase;
 import org.apache.helix.controller.stages.MessageSelectionStage;
 import org.apache.helix.controller.stages.MessageThrottleStage;
@@ -195,6 +196,7 @@ public class GenericHelixController implements 
ConfigChangeListener, IdealStateC
       rebalancePipeline.addStage(new ResourceValidationStage());
       rebalancePipeline.addStage(new CurrentStateComputationStage());
       rebalancePipeline.addStage(new BestPossibleStateCalcStage());
+      rebalancePipeline.addStage(new IntermediateStateCalcStage());
       rebalancePipeline.addStage(new MessageGenerationPhase());
       rebalancePipeline.addStage(new MessageSelectionStage());
       rebalancePipeline.addStage(new MessageThrottleStage());

http://git-wip-us.apache.org/repos/asf/helix/blob/ad760ae9/helix-core/src/main/java/org/apache/helix/controller/common/PartitionStateMap.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/common/PartitionStateMap.java
 
b/helix-core/src/main/java/org/apache/helix/controller/common/PartitionStateMap.java
new file mode 100644
index 0000000..57f3481
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/common/PartitionStateMap.java
@@ -0,0 +1,79 @@
+package org.apache.helix.controller.common;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.model.Partition;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Hold the partition->{Instance, State} mapping for a resource.
+ */
+public class PartitionStateMap {
+  // Map of partition->instance->state
+  private Map<Partition, Map<String, String>> _stateMap;
+  private String _resourceName;
+
+  public PartitionStateMap(String resourceName) {
+    _resourceName = resourceName;
+    _stateMap = new HashMap<Partition, Map<String, String>>();
+  }
+
+  public PartitionStateMap(String resourceName,
+      Map<Partition, Map<String, String>> partitionStateMap) {
+    _resourceName = resourceName;
+    _stateMap = partitionStateMap;
+  }
+
+  public Set<Partition> partitionSet() {
+    return _stateMap.keySet();
+  }
+
+  public void setState(Partition partition, Map<String, String> 
stateMappingForPartition) {
+    _stateMap.put(partition, stateMappingForPartition);
+  }
+
+  public void setState(Partition partition, String instance, String state) {
+    if (!_stateMap.containsKey(partition)) {
+      _stateMap.put(partition, new HashMap<String, String>());
+    }
+    _stateMap.get(partition).put(instance, state);
+  }
+
+  public Map<String, String> getPartitionMap(Partition partition) {
+    Map<String, String> map = _stateMap.get(partition);
+    return map != null ? map : Collections.<String, String>emptyMap();
+  }
+
+  public Map<Partition, Map<String, String>> getStateMap() {
+    return _stateMap;
+  }
+
+  public String getResourceName() {
+    return _resourceName;
+  }
+
+  @Override public String toString() {
+    return _stateMap.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/ad760ae9/helix-core/src/main/java/org/apache/helix/controller/common/ResourcesStateMap.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/common/ResourcesStateMap.java
 
b/helix-core/src/main/java/org/apache/helix/controller/common/ResourcesStateMap.java
new file mode 100644
index 0000000..3559d7b
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/common/ResourcesStateMap.java
@@ -0,0 +1,95 @@
+package org.apache.helix.controller.common;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.model.Partition;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Hold the Resource -> partition -> instance -> state mapping for all 
resources.
+ * This is the base class for BestPossibleStateOutput, IdealState
+ */
+public class ResourcesStateMap {
+  // Map of resource->PartitionStateMap
+  protected Map<String, PartitionStateMap> _resourceStateMap;
+
+  public ResourcesStateMap() {
+    _resourceStateMap = new HashMap<String, PartitionStateMap>();
+  }
+
+  public Set<String> resourceSet() {
+    return _resourceStateMap.keySet();
+  }
+
+  public void setState(String resourceName, Partition partition,
+      Map<String, String> instanceStateMappingForPartition) {
+    if (!_resourceStateMap.containsKey(resourceName)) {
+      _resourceStateMap.put(resourceName, new PartitionStateMap(resourceName));
+    }
+    PartitionStateMap partitionStateMap = _resourceStateMap.get(resourceName);
+    partitionStateMap.setState(partition, instanceStateMappingForPartition);
+  }
+
+  public void setState(String resourceName,
+      Map<Partition, Map<String, String>> instanceStateMappingForResource) {
+    _resourceStateMap
+        .put(resourceName, new PartitionStateMap(resourceName, 
instanceStateMappingForResource));
+  }
+
+  public void setState(String resourceName, PartitionStateMap 
partitionStateMapForResource) {
+    _resourceStateMap.put(resourceName, partitionStateMapForResource);
+  }
+
+  public void setState(String resourceName, Partition partition, String 
instance, String state) {
+    if (!_resourceStateMap.containsKey(resourceName)) {
+      _resourceStateMap.put(resourceName, new PartitionStateMap(resourceName));
+    }
+    _resourceStateMap.get(resourceName).setState(partition, instance, state);
+  }
+
+  public Map<String, String> getInstanceStateMap(String resourceName, 
Partition partition) {
+    PartitionStateMap stateMap = _resourceStateMap.get(resourceName);
+    if (stateMap != null) {
+      return stateMap.getPartitionMap(partition);
+    }
+    return Collections.emptyMap();
+  }
+
+  public PartitionStateMap getPartitionStateMap(String resourceName) {
+    PartitionStateMap stateMap = _resourceStateMap.get(resourceName);
+    if (stateMap != null) {
+      return stateMap;
+    }
+    return new PartitionStateMap(resourceName);
+  }
+
+  public Map<String, PartitionStateMap> getResourceStatesMap() {
+    return _resourceStateMap;
+  }
+
+  @Override
+  public String toString() {
+    return _resourceStateMap.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/ad760ae9/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
index ae0278b..532cc2d 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
@@ -23,6 +23,7 @@ public enum AttributeName {
   RESOURCES,
   BEST_POSSIBLE_STATE,
   CURRENT_STATE,
+  INTERMEDIATE_STATE,
   MESSAGES_ALL,
   MESSAGES_SELECTED,
   MESSAGES_THROTTLE,

http://git-wip-us.apache.org/repos/asf/helix/blob/ad760ae9/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
index a3ad56d..947e4d0 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
@@ -21,106 +21,42 @@ package org.apache.helix.controller.stages;
 
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.Set;
-
+import org.apache.helix.controller.common.PartitionStateMap;
+import org.apache.helix.controller.common.ResourcesStateMap;
 import org.apache.helix.model.Partition;
 
-public class BestPossibleStateOutput {
-  // Map of resource->partition->instance->state
-  Map<String, Map<Partition, Map<String, String>>> _stateMap;
-  /* resource -> partition -> preference list */
-  private Map<String, Map<String, List<String>>> _preferenceLists;
-
-  public BestPossibleStateOutput() {
-    _stateMap = new HashMap<String, Map<Partition, Map<String, String>>>();
-  }
-
-  public Set<String> resourceSet() {
-    return _stateMap.keySet();
-  }
-
-  public void setState(String resourceName, Partition resource,
-      Map<String, String> bestInstanceStateMappingForResource) {
-    if (!_stateMap.containsKey(resourceName)) {
-      _stateMap.put(resourceName, new HashMap<Partition, Map<String, 
String>>());
-    }
-    Map<Partition, Map<String, String>> map = _stateMap.get(resourceName);
-    map.put(resource, bestInstanceStateMappingForResource);
-  }
-
-  public void setState(String resourceName, Partition partition, String 
instance, String state) {
-    if (!_stateMap.containsKey(resourceName)) {
-      _stateMap.put(resourceName, new HashMap<Partition, Map<String, 
String>>());
-    }
-    if (!_stateMap.get(resourceName).containsKey(partition)) {
-      _stateMap.get(resourceName).put(partition, new HashMap<String, 
String>());
-    }
-    _stateMap.get(resourceName).get(partition).put(instance, state);
-  }
-
-  public Map<String, String> getInstanceStateMap(String resourceName, 
Partition partition) {
-    Map<Partition, Map<String, String>> map = _stateMap.get(resourceName);
+/**
+ * Output for BestPossibleStateCalStage.
+ */
+public class BestPossibleStateOutput extends ResourcesStateMap {
+  /**
+   * Deprecated, use getResourceStatesMap instead.
+   *
+   * @param resourceName
+   * @return
+   */
+  // TODO: remove this.
+  @Deprecated public Map<Partition, Map<String, String>> getResourceMap(String 
resourceName) {
+    PartitionStateMap map = _resourceStateMap.get(resourceName);
     if (map != null) {
-      return map.get(partition);
+      return map.getStateMap();
     }
     return Collections.emptyMap();
   }
 
-  public Map<Partition, Map<String, String>> getResourceMap(String 
resourceName) {
-    Map<Partition, Map<String, String>> map = _stateMap.get(resourceName);
-    if (map != null) {
-      return map;
+  /**
+   * Deprecated, use getResourceStatesMap instead.
+   *
+   * @return
+   */
+  // TODO: remove this.
+  @Deprecated public Map<String, Map<Partition, Map<String, String>>> 
getStateMap() {
+    Map<String, Map<Partition, Map<String, String>>> stateMap =
+        new HashMap<String, Map<Partition, Map<String, String>>>();
+    for (Map.Entry<String, PartitionStateMap> e : 
_resourceStateMap.entrySet()) {
+      stateMap.put(e.getKey(), e.getValue().getStateMap());
     }
-    return Collections.emptyMap();
-  }
-
-  public Map<String, Map<Partition, Map<String, String>>> getStateMap() {
-    return _stateMap;
-  }
-
-  public Map<String, Map<String, List<String>>> getPreferenceLists() {
-    return _preferenceLists;
-  }
-
-  public Map<String, List<String>> getPreferenceLists(String resource) {
-    if (_preferenceLists != null && _preferenceLists.containsKey(resource)) {
-      return _preferenceLists.get(resource);
-    }
-
-    return null;
-  }
-
-  public List<String> getPreferenceList(String resource, String partition) {
-    if (_preferenceLists != null && _preferenceLists.containsKey(resource) && 
_preferenceLists
-        .get(resource).containsKey(partition)) {
-      return _preferenceLists.get(resource).get(partition);
-    }
-
-    return null;
-  }
-
-  public void setPreferenceList(String resource, String partition, 
List<String> list) {
-    if (_preferenceLists == null) {
-      _preferenceLists = new HashMap<String, Map<String, List<String>>>();
-    }
-    if (!_preferenceLists.containsKey(resource)) {
-      _preferenceLists.put(resource, new HashMap<String, List<String>>());
-    }
-    _preferenceLists.get(resource).put(partition, list);
-  }
-
-  public void setPreferenceLists(String resource,
-      Map<String, List<String>> resourcePreferenceLists) {
-    if (_preferenceLists == null) {
-      _preferenceLists = new HashMap<String, Map<String, List<String>>>();
-    }
-    _preferenceLists.put(resource, resourcePreferenceLists);
-  }
-
-  @Override
-  public String toString() {
-    return _stateMap.toString();
+    return stateMap;
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/ad760ae9/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
new file mode 100644
index 0000000..5a13c7a
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -0,0 +1,78 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.Resource;
+import org.apache.log4j.Logger;
+
+import java.util.Map;
+
+/**
+ * For partition compute the Intermediate State (instance,state) pair based on
+ * the BestPossible State and Current State, with all constraints applied 
(such as state transition throttling).
+ */
+public class IntermediateStateCalcStage extends AbstractBaseStage {
+  private static final Logger logger = 
Logger.getLogger(IntermediateStateCalcStage.class.getName());
+
+  @Override
+  public void process(ClusterEvent event) throws Exception {
+    long startTime = System.currentTimeMillis();
+    logger.info("START Intermediate.process()");
+
+    CurrentStateOutput currentStateOutput =
+        event.getAttribute(AttributeName.CURRENT_STATE.toString());
+
+    BestPossibleStateOutput bestPossibleStateOutput =
+        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+    Map<String, Resource> resourceMap = 
event.getAttribute(AttributeName.RESOURCES.toString());
+    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+
+    if (currentStateOutput == null || bestPossibleStateOutput == null || 
resourceMap == null
+        || cache == null) {
+      throw new StageException("Missing attributes in event:" + event
+          + ". Requires 
CURRENT_STATE|BEST_POSSIBLE_STATE|RESOURCES|DataCache");
+    }
+
+    IntermediateStateOutput immediateStateOutput =
+        compute(event, resourceMap, currentStateOutput, 
bestPossibleStateOutput);
+    event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), 
immediateStateOutput);
+
+    long endTime = System.currentTimeMillis();
+    logger.info("END ImmediateStateCalcStage.process(). took: " + (endTime - 
startTime) + " ms");
+  }
+
+  private IntermediateStateOutput compute(ClusterEvent event, Map<String, 
Resource> resourceMap,
+      CurrentStateOutput currentStateOutput, BestPossibleStateOutput 
bestPossibleStateOutput) {
+    // for each resource
+    // get the best possible state and current state
+    // try to bring immediate state close to best possible state until
+    // the possible pending state transition numbers reach the set throttle 
number.
+    IntermediateStateOutput output = new IntermediateStateOutput();
+
+    // TODO: add throttling logic here.
+    for (String resourceName : resourceMap.keySet()) {
+      logger.debug("Processing resource:" + resourceName);
+      output.setState(resourceName, 
bestPossibleStateOutput.getPartitionStateMap(resourceName));
+    }
+    return output;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/ad760ae9/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateOutput.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateOutput.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateOutput.java
new file mode 100644
index 0000000..e2276cc
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateOutput.java
@@ -0,0 +1,28 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.controller.common.ResourcesStateMap;
+
+/**
+ * Output for IntermediateStateCalStage.
+ */
+public class IntermediateStateOutput extends ResourcesStateMap {
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/ad760ae9/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index c1aa2a4..2f4a331 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -54,12 +54,12 @@ public class MessageGenerationPhase extends 
AbstractBaseStage {
     Map<String, Resource> resourceMap = 
event.getAttribute(AttributeName.RESOURCES.toString());
     CurrentStateOutput currentStateOutput =
         event.getAttribute(AttributeName.CURRENT_STATE.toString());
-    BestPossibleStateOutput bestPossibleStateOutput =
-        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+    IntermediateStateOutput intermediateStateOutput =
+        event.getAttribute(AttributeName.INTERMEDIATE_STATE.toString());
     if (manager == null || cache == null || resourceMap == null || 
currentStateOutput == null
-        || bestPossibleStateOutput == null) {
+        || intermediateStateOutput == null) {
       throw new StageException("Missing attributes in event:" + event
-          + ". Requires 
HelixManager|DataCache|RESOURCES|CURRENT_STATE|BEST_POSSIBLE_STATE");
+          + ". Requires 
HelixManager|DataCache|RESOURCES|CURRENT_STATE|INTERMEDIATE_STATE");
     }
 
     Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
@@ -78,7 +78,7 @@ public class MessageGenerationPhase extends AbstractBaseStage 
{
       for (Partition partition : resource.getPartitions()) {
 
         Map<String, String> instanceStateMap =
-            bestPossibleStateOutput.getInstanceStateMap(resourceName, 
partition);
+            intermediateStateOutput.getInstanceStateMap(resourceName, 
partition);
 
         // we should generate message based on the desired-state priority
         // so keep generated messages in a temp map keyed by state

http://git-wip-us.apache.org/repos/asf/helix/blob/ad760ae9/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
index 452a683..18abf75 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
@@ -82,6 +82,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     rebalancePipeline.addStage(new ResourceComputationStage());
     rebalancePipeline.addStage(new CurrentStateComputationStage());
     rebalancePipeline.addStage(new BestPossibleStateCalcStage());
+    rebalancePipeline.addStage(new IntermediateStateCalcStage());
     rebalancePipeline.addStage(new MessageGenerationPhase());
     rebalancePipeline.addStage(new MessageSelectionStage());
     rebalancePipeline.addStage(new MessageThrottleStage());
@@ -233,6 +234,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     rebalancePipeline.addStage(new ResourceComputationStage());
     rebalancePipeline.addStage(new CurrentStateComputationStage());
     rebalancePipeline.addStage(new BestPossibleStateCalcStage());
+    rebalancePipeline.addStage(new IntermediateStateCalcStage());
     rebalancePipeline.addStage(new MessageGenerationPhase());
     rebalancePipeline.addStage(new MessageSelectionStage());
     rebalancePipeline.addStage(new MessageThrottleStage());
@@ -330,6 +332,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     rebalancePipeline.addStage(new ResourceComputationStage());
     rebalancePipeline.addStage(new CurrentStateComputationStage());
     rebalancePipeline.addStage(new BestPossibleStateCalcStage());
+    rebalancePipeline.addStage(new IntermediateStateCalcStage());
     rebalancePipeline.addStage(new MessageGenerationPhase());
     rebalancePipeline.addStage(new MessageSelectionStage());
     rebalancePipeline.addStage(new MessageThrottleStage());

Reply via email to