Repository: helix Updated Branches: refs/heads/master f8ee313ee -> 1c855ae85
Introduce IntermediateStateCalStage stage to rebalance pipeline which computes the intermediate states for all resources based on their best possible states and other constraints. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/ad760ae9 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/ad760ae9 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/ad760ae9 Branch: refs/heads/master Commit: ad760ae921bb94a6f928d0c745c056177b742dc9 Parents: f8ee313 Author: Lei Xia <[email protected]> Authored: Mon Sep 25 14:26:46 2017 -0700 Committer: Junkai Xue <[email protected]> Committed: Mon Sep 25 14:26:46 2017 -0700 ---------------------------------------------------------------------- .../controller/GenericHelixController.java | 2 + .../controller/common/PartitionStateMap.java | 79 ++++++++++++ .../controller/common/ResourcesStateMap.java | 95 +++++++++++++++ .../helix/controller/stages/AttributeName.java | 1 + .../stages/BestPossibleStateOutput.java | 120 +++++-------------- .../stages/IntermediateStateCalcStage.java | 78 ++++++++++++ .../stages/IntermediateStateOutput.java | 28 +++++ .../stages/MessageGenerationPhase.java | 10 +- .../stages/TestRebalancePipeline.java | 3 + 9 files changed, 319 insertions(+), 97 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/ad760ae9/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index 4cea33b..0cb23f9 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -51,6 +51,7 @@ import org.apache.helix.controller.stages.ClusterEventBlockingQueue; import org.apache.helix.controller.stages.CompatibilityCheckStage; import org.apache.helix.controller.stages.CurrentStateComputationStage; import org.apache.helix.controller.stages.ExternalViewComputeStage; +import org.apache.helix.controller.stages.IntermediateStateCalcStage; import org.apache.helix.controller.stages.MessageGenerationPhase; import org.apache.helix.controller.stages.MessageSelectionStage; import org.apache.helix.controller.stages.MessageThrottleStage; @@ -195,6 +196,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC rebalancePipeline.addStage(new ResourceValidationStage()); rebalancePipeline.addStage(new CurrentStateComputationStage()); rebalancePipeline.addStage(new BestPossibleStateCalcStage()); + rebalancePipeline.addStage(new IntermediateStateCalcStage()); rebalancePipeline.addStage(new MessageGenerationPhase()); rebalancePipeline.addStage(new MessageSelectionStage()); rebalancePipeline.addStage(new MessageThrottleStage()); http://git-wip-us.apache.org/repos/asf/helix/blob/ad760ae9/helix-core/src/main/java/org/apache/helix/controller/common/PartitionStateMap.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/common/PartitionStateMap.java b/helix-core/src/main/java/org/apache/helix/controller/common/PartitionStateMap.java new file mode 100644 index 0000000..57f3481 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/common/PartitionStateMap.java @@ -0,0 +1,79 @@ +package org.apache.helix.controller.common; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.model.Partition; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * Hold the partition->{Instance, State} mapping for a resource. + */ +public class PartitionStateMap { + // Map of partition->instance->state + private Map<Partition, Map<String, String>> _stateMap; + private String _resourceName; + + public PartitionStateMap(String resourceName) { + _resourceName = resourceName; + _stateMap = new HashMap<Partition, Map<String, String>>(); + } + + public PartitionStateMap(String resourceName, + Map<Partition, Map<String, String>> partitionStateMap) { + _resourceName = resourceName; + _stateMap = partitionStateMap; + } + + public Set<Partition> partitionSet() { + return _stateMap.keySet(); + } + + public void setState(Partition partition, Map<String, String> stateMappingForPartition) { + _stateMap.put(partition, stateMappingForPartition); + } + + public void setState(Partition partition, String instance, String state) { + if (!_stateMap.containsKey(partition)) { + _stateMap.put(partition, new HashMap<String, String>()); + } + _stateMap.get(partition).put(instance, state); + } + + public Map<String, String> getPartitionMap(Partition partition) { + Map<String, String> map = _stateMap.get(partition); + return map != null ? map : Collections.<String, String>emptyMap(); + } + + public Map<Partition, Map<String, String>> getStateMap() { + return _stateMap; + } + + public String getResourceName() { + return _resourceName; + } + + @Override public String toString() { + return _stateMap.toString(); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/ad760ae9/helix-core/src/main/java/org/apache/helix/controller/common/ResourcesStateMap.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/common/ResourcesStateMap.java b/helix-core/src/main/java/org/apache/helix/controller/common/ResourcesStateMap.java new file mode 100644 index 0000000..3559d7b --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/common/ResourcesStateMap.java @@ -0,0 +1,95 @@ +package org.apache.helix.controller.common; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.model.Partition; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * Hold the Resource -> partition -> instance -> state mapping for all resources. + * This is the base class for BestPossibleStateOutput, IdealState + */ +public class ResourcesStateMap { + // Map of resource->PartitionStateMap + protected Map<String, PartitionStateMap> _resourceStateMap; + + public ResourcesStateMap() { + _resourceStateMap = new HashMap<String, PartitionStateMap>(); + } + + public Set<String> resourceSet() { + return _resourceStateMap.keySet(); + } + + public void setState(String resourceName, Partition partition, + Map<String, String> instanceStateMappingForPartition) { + if (!_resourceStateMap.containsKey(resourceName)) { + _resourceStateMap.put(resourceName, new PartitionStateMap(resourceName)); + } + PartitionStateMap partitionStateMap = _resourceStateMap.get(resourceName); + partitionStateMap.setState(partition, instanceStateMappingForPartition); + } + + public void setState(String resourceName, + Map<Partition, Map<String, String>> instanceStateMappingForResource) { + _resourceStateMap + .put(resourceName, new PartitionStateMap(resourceName, instanceStateMappingForResource)); + } + + public void setState(String resourceName, PartitionStateMap partitionStateMapForResource) { + _resourceStateMap.put(resourceName, partitionStateMapForResource); + } + + public void setState(String resourceName, Partition partition, String instance, String state) { + if (!_resourceStateMap.containsKey(resourceName)) { + _resourceStateMap.put(resourceName, new PartitionStateMap(resourceName)); + } + _resourceStateMap.get(resourceName).setState(partition, instance, state); + } + + public Map<String, String> getInstanceStateMap(String resourceName, Partition partition) { + PartitionStateMap stateMap = _resourceStateMap.get(resourceName); + if (stateMap != null) { + return stateMap.getPartitionMap(partition); + } + return Collections.emptyMap(); + } + + public PartitionStateMap getPartitionStateMap(String resourceName) { + PartitionStateMap stateMap = _resourceStateMap.get(resourceName); + if (stateMap != null) { + return stateMap; + } + return new PartitionStateMap(resourceName); + } + + public Map<String, PartitionStateMap> getResourceStatesMap() { + return _resourceStateMap; + } + + @Override + public String toString() { + return _resourceStateMap.toString(); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/ad760ae9/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java index ae0278b..532cc2d 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java @@ -23,6 +23,7 @@ public enum AttributeName { RESOURCES, BEST_POSSIBLE_STATE, CURRENT_STATE, + INTERMEDIATE_STATE, MESSAGES_ALL, MESSAGES_SELECTED, MESSAGES_THROTTLE, http://git-wip-us.apache.org/repos/asf/helix/blob/ad760ae9/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java index a3ad56d..947e4d0 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java @@ -21,106 +21,42 @@ package org.apache.helix.controller.stages; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.Set; - +import org.apache.helix.controller.common.PartitionStateMap; +import org.apache.helix.controller.common.ResourcesStateMap; import org.apache.helix.model.Partition; -public class BestPossibleStateOutput { - // Map of resource->partition->instance->state - Map<String, Map<Partition, Map<String, String>>> _stateMap; - /* resource -> partition -> preference list */ - private Map<String, Map<String, List<String>>> _preferenceLists; - - public BestPossibleStateOutput() { - _stateMap = new HashMap<String, Map<Partition, Map<String, String>>>(); - } - - public Set<String> resourceSet() { - return _stateMap.keySet(); - } - - public void setState(String resourceName, Partition resource, - Map<String, String> bestInstanceStateMappingForResource) { - if (!_stateMap.containsKey(resourceName)) { - _stateMap.put(resourceName, new HashMap<Partition, Map<String, String>>()); - } - Map<Partition, Map<String, String>> map = _stateMap.get(resourceName); - map.put(resource, bestInstanceStateMappingForResource); - } - - public void setState(String resourceName, Partition partition, String instance, String state) { - if (!_stateMap.containsKey(resourceName)) { - _stateMap.put(resourceName, new HashMap<Partition, Map<String, String>>()); - } - if (!_stateMap.get(resourceName).containsKey(partition)) { - _stateMap.get(resourceName).put(partition, new HashMap<String, String>()); - } - _stateMap.get(resourceName).get(partition).put(instance, state); - } - - public Map<String, String> getInstanceStateMap(String resourceName, Partition partition) { - Map<Partition, Map<String, String>> map = _stateMap.get(resourceName); +/** + * Output for BestPossibleStateCalStage. + */ +public class BestPossibleStateOutput extends ResourcesStateMap { + /** + * Deprecated, use getResourceStatesMap instead. + * + * @param resourceName + * @return + */ + // TODO: remove this. + @Deprecated public Map<Partition, Map<String, String>> getResourceMap(String resourceName) { + PartitionStateMap map = _resourceStateMap.get(resourceName); if (map != null) { - return map.get(partition); + return map.getStateMap(); } return Collections.emptyMap(); } - public Map<Partition, Map<String, String>> getResourceMap(String resourceName) { - Map<Partition, Map<String, String>> map = _stateMap.get(resourceName); - if (map != null) { - return map; + /** + * Deprecated, use getResourceStatesMap instead. + * + * @return + */ + // TODO: remove this. + @Deprecated public Map<String, Map<Partition, Map<String, String>>> getStateMap() { + Map<String, Map<Partition, Map<String, String>>> stateMap = + new HashMap<String, Map<Partition, Map<String, String>>>(); + for (Map.Entry<String, PartitionStateMap> e : _resourceStateMap.entrySet()) { + stateMap.put(e.getKey(), e.getValue().getStateMap()); } - return Collections.emptyMap(); - } - - public Map<String, Map<Partition, Map<String, String>>> getStateMap() { - return _stateMap; - } - - public Map<String, Map<String, List<String>>> getPreferenceLists() { - return _preferenceLists; - } - - public Map<String, List<String>> getPreferenceLists(String resource) { - if (_preferenceLists != null && _preferenceLists.containsKey(resource)) { - return _preferenceLists.get(resource); - } - - return null; - } - - public List<String> getPreferenceList(String resource, String partition) { - if (_preferenceLists != null && _preferenceLists.containsKey(resource) && _preferenceLists - .get(resource).containsKey(partition)) { - return _preferenceLists.get(resource).get(partition); - } - - return null; - } - - public void setPreferenceList(String resource, String partition, List<String> list) { - if (_preferenceLists == null) { - _preferenceLists = new HashMap<String, Map<String, List<String>>>(); - } - if (!_preferenceLists.containsKey(resource)) { - _preferenceLists.put(resource, new HashMap<String, List<String>>()); - } - _preferenceLists.get(resource).put(partition, list); - } - - public void setPreferenceLists(String resource, - Map<String, List<String>> resourcePreferenceLists) { - if (_preferenceLists == null) { - _preferenceLists = new HashMap<String, Map<String, List<String>>>(); - } - _preferenceLists.put(resource, resourcePreferenceLists); - } - - @Override - public String toString() { - return _stateMap.toString(); + return stateMap; } } http://git-wip-us.apache.org/repos/asf/helix/blob/ad760ae9/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java new file mode 100644 index 0000000..5a13c7a --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java @@ -0,0 +1,78 @@ +package org.apache.helix.controller.stages; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.controller.pipeline.AbstractBaseStage; +import org.apache.helix.controller.pipeline.StageException; +import org.apache.helix.model.Resource; +import org.apache.log4j.Logger; + +import java.util.Map; + +/** + * For partition compute the Intermediate State (instance,state) pair based on + * the BestPossible State and Current State, with all constraints applied (such as state transition throttling). + */ +public class IntermediateStateCalcStage extends AbstractBaseStage { + private static final Logger logger = Logger.getLogger(IntermediateStateCalcStage.class.getName()); + + @Override + public void process(ClusterEvent event) throws Exception { + long startTime = System.currentTimeMillis(); + logger.info("START Intermediate.process()"); + + CurrentStateOutput currentStateOutput = + event.getAttribute(AttributeName.CURRENT_STATE.toString()); + + BestPossibleStateOutput bestPossibleStateOutput = + event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString()); + Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString()); + ClusterDataCache cache = event.getAttribute("ClusterDataCache"); + + if (currentStateOutput == null || bestPossibleStateOutput == null || resourceMap == null + || cache == null) { + throw new StageException("Missing attributes in event:" + event + + ". Requires CURRENT_STATE|BEST_POSSIBLE_STATE|RESOURCES|DataCache"); + } + + IntermediateStateOutput immediateStateOutput = + compute(event, resourceMap, currentStateOutput, bestPossibleStateOutput); + event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), immediateStateOutput); + + long endTime = System.currentTimeMillis(); + logger.info("END ImmediateStateCalcStage.process(). took: " + (endTime - startTime) + " ms"); + } + + private IntermediateStateOutput compute(ClusterEvent event, Map<String, Resource> resourceMap, + CurrentStateOutput currentStateOutput, BestPossibleStateOutput bestPossibleStateOutput) { + // for each resource + // get the best possible state and current state + // try to bring immediate state close to best possible state until + // the possible pending state transition numbers reach the set throttle number. + IntermediateStateOutput output = new IntermediateStateOutput(); + + // TODO: add throttling logic here. + for (String resourceName : resourceMap.keySet()) { + logger.debug("Processing resource:" + resourceName); + output.setState(resourceName, bestPossibleStateOutput.getPartitionStateMap(resourceName)); + } + return output; + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/ad760ae9/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateOutput.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateOutput.java new file mode 100644 index 0000000..e2276cc --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateOutput.java @@ -0,0 +1,28 @@ +package org.apache.helix.controller.stages; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.controller.common.ResourcesStateMap; + +/** + * Output for IntermediateStateCalStage. + */ +public class IntermediateStateOutput extends ResourcesStateMap { +} http://git-wip-us.apache.org/repos/asf/helix/blob/ad760ae9/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java index c1aa2a4..2f4a331 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java @@ -54,12 +54,12 @@ public class MessageGenerationPhase extends AbstractBaseStage { Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString()); CurrentStateOutput currentStateOutput = event.getAttribute(AttributeName.CURRENT_STATE.toString()); - BestPossibleStateOutput bestPossibleStateOutput = - event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString()); + IntermediateStateOutput intermediateStateOutput = + event.getAttribute(AttributeName.INTERMEDIATE_STATE.toString()); if (manager == null || cache == null || resourceMap == null || currentStateOutput == null - || bestPossibleStateOutput == null) { + || intermediateStateOutput == null) { throw new StageException("Missing attributes in event:" + event - + ". Requires HelixManager|DataCache|RESOURCES|CURRENT_STATE|BEST_POSSIBLE_STATE"); + + ". Requires HelixManager|DataCache|RESOURCES|CURRENT_STATE|INTERMEDIATE_STATE"); } Map<String, LiveInstance> liveInstances = cache.getLiveInstances(); @@ -78,7 +78,7 @@ public class MessageGenerationPhase extends AbstractBaseStage { for (Partition partition : resource.getPartitions()) { Map<String, String> instanceStateMap = - bestPossibleStateOutput.getInstanceStateMap(resourceName, partition); + intermediateStateOutput.getInstanceStateMap(resourceName, partition); // we should generate message based on the desired-state priority // so keep generated messages in a temp map keyed by state http://git-wip-us.apache.org/repos/asf/helix/blob/ad760ae9/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java index 452a683..18abf75 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java @@ -82,6 +82,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase { rebalancePipeline.addStage(new ResourceComputationStage()); rebalancePipeline.addStage(new CurrentStateComputationStage()); rebalancePipeline.addStage(new BestPossibleStateCalcStage()); + rebalancePipeline.addStage(new IntermediateStateCalcStage()); rebalancePipeline.addStage(new MessageGenerationPhase()); rebalancePipeline.addStage(new MessageSelectionStage()); rebalancePipeline.addStage(new MessageThrottleStage()); @@ -233,6 +234,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase { rebalancePipeline.addStage(new ResourceComputationStage()); rebalancePipeline.addStage(new CurrentStateComputationStage()); rebalancePipeline.addStage(new BestPossibleStateCalcStage()); + rebalancePipeline.addStage(new IntermediateStateCalcStage()); rebalancePipeline.addStage(new MessageGenerationPhase()); rebalancePipeline.addStage(new MessageSelectionStage()); rebalancePipeline.addStage(new MessageThrottleStage()); @@ -330,6 +332,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase { rebalancePipeline.addStage(new ResourceComputationStage()); rebalancePipeline.addStage(new CurrentStateComputationStage()); rebalancePipeline.addStage(new BestPossibleStateCalcStage()); + rebalancePipeline.addStage(new IntermediateStateCalcStage()); rebalancePipeline.addStage(new MessageGenerationPhase()); rebalancePipeline.addStage(new MessageSelectionStage()); rebalancePipeline.addStage(new MessageThrottleStage());
