This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch replica_level_throttle
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/replica_level_throttle by this
push:
new 9189840 Refactor/clean up code without logic change (#1760)
9189840 is described below
commit 91898404be8e4abb78a684cf7135f979d9c727ef
Author: Junkai Xue <[email protected]>
AuthorDate: Wed May 26 16:44:46 2021 -0700
Refactor/clean up code without logic change (#1760)
This commmit contains:
1. Remove unused functions and logic
2. Combine the Resource/Task message generation into one stage, since they
are both relying on best possible result.
---
.../helix/controller/GenericHelixController.java | 8 +-
.../stages/IntermediateStateCalcStage.java | 93 ----------------------
.../controller/stages/MessageGenerationPhase.java | 14 ++--
.../resource/ResourceMessageGenerationPhase.java | 38 ---------
.../stages/task/TaskMessageGenerationPhase.java | 38 ---------
.../stages/TestCancellationMessageGeneration.java | 14 ++--
.../controller/stages/TestRebalancePipeline.java | 13 ++-
.../controller/TestRedundantDroppedMessage.java | 4 +-
.../messaging/p2pMessage/TestP2PMessages.java | 4 +-
.../TestP2PMessagesAvoidDuplicatedMessage.java | 6 +-
.../p2pMessage/TestP2PStateTransitionMessages.java | 10 +--
.../TestP2PWithStateCancellationMessage.java | 5 +-
.../monitoring/mbeans/TestRebalancerMetrics.java | 6 +-
13 files changed, 42 insertions(+), 211 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 4baac38..b8fac92 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -19,7 +19,6 @@ package org.apache.helix.controller;
* under the License.
*/
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -82,6 +81,7 @@ import
org.apache.helix.controller.stages.CustomizedViewAggregationStage;
import org.apache.helix.controller.stages.ExternalViewComputeStage;
import org.apache.helix.controller.stages.IntermediateStateCalcStage;
import org.apache.helix.controller.stages.MaintenanceRecoveryStage;
+import org.apache.helix.controller.stages.MessageGenerationPhase;
import org.apache.helix.controller.stages.MessageSelectionStage;
import org.apache.helix.controller.stages.MessageThrottleStage;
import org.apache.helix.controller.stages.PersistAssignmentStage;
@@ -92,9 +92,7 @@ import
org.apache.helix.controller.stages.TargetExteralViewCalcStage;
import org.apache.helix.controller.stages.TaskGarbageCollectionStage;
import org.apache.helix.controller.stages.TopStateHandoffReportStage;
import
org.apache.helix.controller.stages.resource.ResourceMessageDispatchStage;
-import
org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
import org.apache.helix.controller.stages.task.TaskMessageDispatchStage;
-import org.apache.helix.controller.stages.task.TaskMessageGenerationPhase;
import org.apache.helix.controller.stages.task.TaskPersistDataStage;
import org.apache.helix.controller.stages.task.TaskSchedulingStage;
import org.apache.helix.model.ClusterConfig;
@@ -516,7 +514,7 @@ public class GenericHelixController implements
IdealStateChangeListener, LiveIns
// Need to add MaintenanceRecoveryStage here because
MAX_PARTITIONS_PER_INSTANCE check could
// only occur after IntermediateStateCalcStage calculation
rebalancePipeline.addStage(new MaintenanceRecoveryStage());
- rebalancePipeline.addStage(new ResourceMessageGenerationPhase());
+ rebalancePipeline.addStage(new MessageGenerationPhase());
rebalancePipeline.addStage(new MessageSelectionStage());
// The IntermediateStateCalcStage should be applied after message
selection
// Messages are throttled already removed by IntermediateStateCalcStage
in MessageSelection output
@@ -598,7 +596,7 @@ public class GenericHelixController implements
IdealStateChangeListener, LiveIns
rebalancePipeline.addStage(new TaskSchedulingStage());
rebalancePipeline.addStage(new TaskPersistDataStage());
rebalancePipeline.addStage(new TaskGarbageCollectionStage());
- rebalancePipeline.addStage(new TaskMessageGenerationPhase());
+ rebalancePipeline.addStage(new MessageGenerationPhase());
rebalancePipeline.addStage(new TaskMessageDispatchStage());
// backward compatibility check
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index b06352d..36bc9cb 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -582,73 +582,6 @@ public class IntermediateStateCalcStage extends
AbstractBaseStage {
}
/**
- * For a partition, given its preferenceList, bestPossibleState, and
currentState, determine which
- * type of rebalance is needed to model IdealState's states defined by the
state model definition.
- * @return RebalanceType needed to bring the replicas to idea states
- * RECOVERY_BALANCE - not all required states (replicas) are
available through all
- * replicas, or the partition is disabled
- * NONE - current state matches the ideal state
- * LOAD_BALANCE - although all replicas required exist, Helix needs
to optimize the
- * allocation
- */
- private RebalanceType getRebalanceType(ResourceControllerDataProvider cache,
- Map<String, String> bestPossibleMap, List<String> preferenceList,
- StateModelDefinition stateModelDef, Map<String, String> currentStateMap,
- IdealState idealState, String partitionName) {
- if (preferenceList == null) {
- preferenceList = Collections.emptyList();
- }
-
- // If there is a minimum active replica number specified in IS, we should
respect it.
- // TODO: We should implement the per replica level throttling with
generated message
- // Issue: https://github.com/apache/helix/issues/343
- int replica = idealState.getMinActiveReplicas() == -1
- ? idealState.getReplicaCount(preferenceList.size())
- : idealState.getMinActiveReplicas();
- Set<String> activeList = new HashSet<>(preferenceList);
- activeList.retainAll(cache.getEnabledLiveInstances());
-
- // For each state, check that this partition currently has the required
number of that state as
- // required by StateModelDefinition.
- LinkedHashMap<String, Integer> expectedStateCountMap =
- stateModelDef.getStateCountMap(activeList.size(), replica); //
StateModelDefinition's counts
- // Current counts without disabled partitions or disabled instances
- Map<String, String> currentStateMapWithoutDisabled = new
HashMap<>(currentStateMap);
- currentStateMapWithoutDisabled.keySet().removeAll(
- cache.getDisabledInstancesForPartition(idealState.getResourceName(),
partitionName));
- Map<String, Integer> currentStateCounts =
- StateModelDefinition.getStateCounts(currentStateMapWithoutDisabled);
-
- // Go through each state and compare counts
- for (String state : expectedStateCountMap.keySet()) {
- Integer expectedCount = expectedStateCountMap.get(state);
- Integer currentCount = currentStateCounts.get(state);
- expectedCount = expectedCount == null ? 0 : expectedCount;
- currentCount = currentCount == null ? 0 : currentCount;
-
- // If counts do not match up, this partition requires recovery
- if (currentCount < expectedCount) {
- // Recovery is not needed in cases where this partition just started,
was dropped, or is in
- // error
- if (!state.equals(HelixDefinedState.DROPPED.name())
- && !state.equals(HelixDefinedState.ERROR.name())
- && !state.equals(stateModelDef.getInitialState())) {
- return RebalanceType.RECOVERY_BALANCE;
- }
- }
- }
- // No recovery needed, all expected replicas exist
- // Check if this partition is actually in the BestPossibleState
- if (currentStateMap.equals(bestPossibleMap)) {
- return RebalanceType.NONE; // No further action required
- } else {
- return RebalanceType.LOAD_BALANCE; // Required state counts are
satisfied, but in order to
- // achieve BestPossibleState, load balance may be required
- // to shift replicas around
- }
- }
-
- /**
* Determine the message rebalance type with message and current states.
* @param desiredStates Ideally how may states we needed for
guarantee the health of replica
* @param message The message to be determined what is the
rebalance type
@@ -914,30 +847,4 @@ public class IntermediateStateCalcStage extends
AbstractBaseStage {
});
}
}
-
- /**
- * Handle a partition with a pending message so that the partition will not
be double-charged or double-assigned during recovery and load balance.
- * @param partition
- * @param partitionsNeedRecovery
- * @param partitionsNeedLoadbalance
- * @param rebalanceType
- */
- private void handlePendingStateTransitionsForThrottling(Partition partition,
- Set<Partition> partitionsNeedRecovery, Set<Partition>
partitionsNeedLoadbalance,
- RebalanceType rebalanceType, PartitionStateMap
bestPossiblePartitionStateMap,
- PartitionStateMap intermediatePartitionStateMap) {
- // Pass the best possible state directly into intermediatePartitionStateMap
- // This is safe to do so because we already have a pending transition for
this partition, implying that the assignment has been made in previous pipeline
- intermediatePartitionStateMap
- .setState(partition,
bestPossiblePartitionStateMap.getPartitionMap(partition));
- // Remove the partition's name from the set of partition (names) that need
to be charged and assigned to prevent double-processing
- switch (rebalanceType) {
- case RECOVERY_BALANCE:
- partitionsNeedRecovery.remove(partition);
- break;
- case LOAD_BALANCE:
- partitionsNeedLoadbalance.remove(partition);
- break;
- }
- }
}
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index 3112327..836e5df 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -58,7 +58,7 @@ import org.slf4j.LoggerFactory;
/**
* Compares the currentState, pendingState with IdealState and generate
messages
*/
-public abstract class MessageGenerationPhase extends AbstractBaseStage {
+public class MessageGenerationPhase extends AbstractBaseStage {
private final static String NO_DESIRED_STATE = "NoDesiredState";
// If we see there is any invalid pending message leaving on host, i.e.
message
@@ -75,8 +75,10 @@ public abstract class MessageGenerationPhase extends
AbstractBaseStage {
private static Logger logger =
LoggerFactory.getLogger(MessageGenerationPhase.class);
- protected void processEvent(ClusterEvent event, ResourcesStateMap
resourcesStateMap)
- throws Exception {
+ @Override
+ public void process(ClusterEvent event) throws Exception {
+ BestPossibleStateOutput bestPossibleStateOutput =
+ event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
_eventId = event.getEventId();
HelixManager manager =
event.getAttribute(AttributeName.helixmanager.name());
BaseControllerDataProvider cache =
event.getAttribute(AttributeName.ControllerDataProvider.name());
@@ -86,9 +88,9 @@ public abstract class MessageGenerationPhase extends
AbstractBaseStage {
Map<String, Map<String, Message>> messagesToCleanUp = new HashMap<>();
if (manager == null || cache == null || resourceMap == null ||
currentStateOutput == null
- || resourcesStateMap == null) {
+ || bestPossibleStateOutput == null) {
throw new StageException("Missing attributes in event:" + event
- + ". Requires
HelixManager|DataCache|RESOURCES|CURRENT_STATE|INTERMEDIATE_STATE");
+ + ". Requires
HelixManager|DataCache|RESOURCES|CURRENT_STATE|BESTPOSSIBLE_STATE");
}
Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
@@ -101,7 +103,7 @@ public abstract class MessageGenerationPhase extends
AbstractBaseStage {
for (Resource resource : resourceMap.values()) {
try {
- generateMessage(resource, cache, resourcesStateMap,
currentStateOutput, manager,
+ generateMessage(resource, cache, bestPossibleStateOutput,
currentStateOutput, manager,
sessionIdMap, event.getEventType(), output, messagesToCleanUp);
} catch (HelixException ex) {
LogUtil.logError(logger, _eventId,
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/resource/ResourceMessageGenerationPhase.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/resource/ResourceMessageGenerationPhase.java
deleted file mode 100644
index 73b309e..0000000
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/resource/ResourceMessageGenerationPhase.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package org.apache.helix.controller.stages.resource;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.controller.stages.AttributeName;
-import org.apache.helix.controller.stages.BestPossibleStateOutput;
-import org.apache.helix.controller.stages.ClusterEvent;
-import org.apache.helix.controller.stages.IntermediateStateOutput;
-import org.apache.helix.controller.stages.MessageGenerationPhase;
-
-/**
- * Compares the currentState, pendingState with IdealState and generate
messages for regular resource
- */
-public class ResourceMessageGenerationPhase extends MessageGenerationPhase {
- @Override
- public void process(ClusterEvent event) throws Exception {
- BestPossibleStateOutput bestPossibleStateOutput =
- event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
- processEvent(event, bestPossibleStateOutput);
- }
-}
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskMessageGenerationPhase.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskMessageGenerationPhase.java
deleted file mode 100644
index 5ef6c93..0000000
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskMessageGenerationPhase.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package org.apache.helix.controller.stages.task;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.controller.stages.AttributeName;
-import org.apache.helix.controller.stages.BestPossibleStateOutput;
-import org.apache.helix.controller.stages.ClusterEvent;
-import org.apache.helix.controller.stages.MessageGenerationPhase;
-
-/**
- * Compares the currentState, pendingState with IdealState and generate
messages for task pipeline
- */
-
-public class TaskMessageGenerationPhase extends MessageGenerationPhase {
- @Override
- public void process(ClusterEvent event) throws Exception {
- BestPossibleStateOutput bestPossibleStateOutput =
- event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
- processEvent(event, bestPossibleStateOutput);
- }
-}
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java
index b231e6d..2fcfc3e 100644
---
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java
+++
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java
@@ -100,15 +100,16 @@ public class TestCancellationMessageGeneration extends
MessageGenerationPhase {
event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(),
resourceMap);
// set up resource state map
- ResourcesStateMap resourcesStateMap = new ResourcesStateMap();
+ BestPossibleStateOutput bestPossibleStateOutput = new
BestPossibleStateOutput();
PartitionStateMap partitionStateMap = new PartitionStateMap(TEST_RESOURCE);
Map<Partition, Map<String, String>> stateMap =
partitionStateMap.getStateMap();
Map<String, String> instanceStateMap = new HashMap<>();
instanceStateMap.put(TEST_INSTANCE, HelixDefinedState.DROPPED.name());
stateMap.put(partition, instanceStateMap);
- resourcesStateMap.setState(TEST_RESOURCE, partition, instanceStateMap);
+ bestPossibleStateOutput.setState(TEST_RESOURCE, partition,
instanceStateMap);
- processEvent(event, resourcesStateMap);
+ event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(),
bestPossibleStateOutput);
+ process(event);
MessageOutput output =
event.getAttribute(AttributeName.MESSAGES_ALL.name());
Assert.assertEquals(output.getMessages(TEST_RESOURCE, partition).size(),
1);
}
@@ -194,16 +195,17 @@ public class TestCancellationMessageGeneration extends
MessageGenerationPhase {
event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(),
resourceMap);
// set up resource state map
- ResourcesStateMap resourcesStateMap = new ResourcesStateMap();
+ BestPossibleStateOutput bestPossibleStateOutput = new
BestPossibleStateOutput();
PartitionStateMap partitionStateMap = new PartitionStateMap(TEST_RESOURCE);
Map<Partition, Map<String, String>> stateMap =
partitionStateMap.getStateMap();
Map<String, String> instanceStateMap = new HashMap<>();
instanceStateMap.put(TEST_INSTANCE, currentState);
stateMap.put(partition, instanceStateMap);
- resourcesStateMap.setState(TEST_RESOURCE, partition, instanceStateMap);
+ bestPossibleStateOutput.setState(TEST_RESOURCE, partition,
instanceStateMap);
// Process the event
- processEvent(event, resourcesStateMap);
+ event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(),
bestPossibleStateOutput);
+ process(event);
MessageOutput output =
event.getAttribute(AttributeName.MESSAGES_ALL.name());
return output.getMessages(TEST_RESOURCE, partition);
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
index d262b7b..8b1b7bc 100644
---
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
+++
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
@@ -38,7 +38,6 @@ import
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.pipeline.Pipeline;
import org.apache.helix.controller.pipeline.StageException;
import
org.apache.helix.controller.stages.resource.ResourceMessageDispatchStage;
-import
org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
@@ -97,7 +96,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
rebalancePipeline.addStage(new ResourceComputationStage());
rebalancePipeline.addStage(new CurrentStateComputationStage());
rebalancePipeline.addStage(new BestPossibleStateCalcStage());
- rebalancePipeline.addStage(new ResourceMessageGenerationPhase());
+ rebalancePipeline.addStage(new MessageGenerationPhase());
rebalancePipeline.addStage(new MessageSelectionStage());
rebalancePipeline.addStage(new IntermediateStateCalcStage());
rebalancePipeline.addStage(new MessageThrottleStage());
@@ -133,7 +132,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
Pipeline messagePipeline = new Pipeline();
messagePipeline.addStage(new BestPossibleStateCalcStage());
- messagePipeline.addStage(new ResourceMessageGenerationPhase());
+ messagePipeline.addStage(new MessageGenerationPhase());
messagePipeline.addStage(new MessageSelectionStage());
messagePipeline.addStage(new IntermediateStateCalcStage());
messagePipeline.addStage(new MessageThrottleStage());
@@ -334,7 +333,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
rebalancePipeline.addStage(new ResourceComputationStage());
rebalancePipeline.addStage(new CurrentStateComputationStage());
rebalancePipeline.addStage(new BestPossibleStateCalcStage());
- rebalancePipeline.addStage(new ResourceMessageGenerationPhase());
+ rebalancePipeline.addStage(new MessageGenerationPhase());
rebalancePipeline.addStage(new MessageSelectionStage());
rebalancePipeline.addStage(new IntermediateStateCalcStage());
rebalancePipeline.addStage(new MessageThrottleStage());
@@ -431,7 +430,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
rebalancePipeline.addStage(new ResourceComputationStage());
rebalancePipeline.addStage(new CurrentStateComputationStage());
rebalancePipeline.addStage(new BestPossibleStateCalcStage());
- rebalancePipeline.addStage(new ResourceMessageGenerationPhase());
+ rebalancePipeline.addStage(new MessageGenerationPhase());
rebalancePipeline.addStage(new MessageSelectionStage());
rebalancePipeline.addStage(new IntermediateStateCalcStage());
rebalancePipeline.addStage(new MessageThrottleStage());
@@ -510,7 +509,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
rebalancePipeline.addStage(new ResourceComputationStage());
rebalancePipeline.addStage(new CurrentStateComputationStage());
rebalancePipeline.addStage(new BestPossibleStateCalcStage());
- rebalancePipeline.addStage(new ResourceMessageGenerationPhase());
+ rebalancePipeline.addStage(new MessageGenerationPhase());
rebalancePipeline.addStage(new MessageSelectionStage());
rebalancePipeline.addStage(new IntermediateStateCalcStage());
rebalancePipeline.addStage(new MessageThrottleStage());
@@ -590,7 +589,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
rebalancePipeline.addStage(new ResourceComputationStage());
rebalancePipeline.addStage(new CurrentStateComputationStage());
rebalancePipeline.addStage(new BestPossibleStateCalcStage());
- rebalancePipeline.addStage(new ResourceMessageGenerationPhase());
+ rebalancePipeline.addStage(new MessageGenerationPhase());
rebalancePipeline.addStage(new MessageSelectionStage());
rebalancePipeline.addStage(new IntermediateStateCalcStage());
rebalancePipeline.addStage(new MessageThrottleStage());
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java
b/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java
index 9539b98..899a427 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java
@@ -30,10 +30,10 @@ import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.ClusterEventType;
import org.apache.helix.controller.stages.CurrentStateComputationStage;
import org.apache.helix.controller.stages.IntermediateStateCalcStage;
+import org.apache.helix.controller.stages.MessageGenerationPhase;
import org.apache.helix.controller.stages.MessageOutput;
import org.apache.helix.controller.stages.MessageSelectionStage;
import org.apache.helix.controller.stages.ResourceComputationStage;
-import
org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Partition;
import org.apache.helix.task.TaskSynchronizedTestBase;
@@ -77,7 +77,7 @@ public class TestRedundantDroppedMessage extends
TaskSynchronizedTestBase {
runStage(event, new CurrentStateComputationStage());
runStage(event, new BestPossibleStateCalcStage());
Assert.assertEquals(cache.getCachedIdealMapping().size(), 1);
- runStage(event, new ResourceMessageGenerationPhase());
+ runStage(event, new MessageGenerationPhase());
runStage(event, new MessageSelectionStage());
runStage(event, new IntermediateStateCalcStage());
diff --git
a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java
b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java
index 1f1d7bd..649ff7e 100644
---
a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java
+++
b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java
@@ -37,12 +37,12 @@ import org.apache.helix.controller.stages.BaseStageTest;
import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
import org.apache.helix.controller.stages.CurrentStateComputationStage;
import org.apache.helix.controller.stages.IntermediateStateCalcStage;
+import org.apache.helix.controller.stages.MessageGenerationPhase;
import org.apache.helix.controller.stages.MessageSelectionStage;
import org.apache.helix.controller.stages.MessageThrottleStage;
import org.apache.helix.controller.stages.ReadClusterDataStage;
import org.apache.helix.controller.stages.ResourceComputationStage;
import
org.apache.helix.controller.stages.resource.ResourceMessageDispatchStage;
-import
org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.CurrentState;
@@ -100,7 +100,7 @@ public class TestP2PMessages extends BaseStageTest {
_fullPipeline.addStage(new ResourceComputationStage());
_fullPipeline.addStage(new CurrentStateComputationStage());
_fullPipeline.addStage(new BestPossibleStateCalcStage());
- _fullPipeline.addStage(new ResourceMessageGenerationPhase());
+ _fullPipeline.addStage(new MessageGenerationPhase());
_fullPipeline.addStage(new MessageSelectionStage());
_fullPipeline.addStage(new IntermediateStateCalcStage());
_fullPipeline.addStage(new MessageThrottleStage());
diff --git
a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
index 1d164c5..40d5c97 100644
---
a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
+++
b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
@@ -33,11 +33,11 @@ import org.apache.helix.controller.stages.BaseStageTest;
import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.controller.stages.IntermediateStateCalcStage;
+import org.apache.helix.controller.stages.MessageGenerationPhase;
import org.apache.helix.controller.stages.MessageOutput;
import org.apache.helix.controller.stages.MessageSelectionStage;
import org.apache.helix.controller.stages.MessageThrottleStage;
import org.apache.helix.controller.stages.ReadClusterDataStage;
-import
org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
@@ -87,13 +87,13 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends
BaseStageTest {
_fullPipeline = new Pipeline("FullPipeline");
_fullPipeline.addStage(new ReadClusterDataStage());
_fullPipeline.addStage(new BestPossibleStateCalcStage());
- _fullPipeline.addStage(new ResourceMessageGenerationPhase());
+ _fullPipeline.addStage(new MessageGenerationPhase());
_fullPipeline.addStage(new MessageSelectionStage());
_fullPipeline.addStage(new IntermediateStateCalcStage());
_fullPipeline.addStage(new MessageThrottleStage());
_messagePipeline = new Pipeline("MessagePipeline");
- _messagePipeline.addStage(new ResourceMessageGenerationPhase());
+ _messagePipeline.addStage(new MessageGenerationPhase());
_messagePipeline.addStage(new MessageSelectionStage());
_messagePipeline.addStage(new IntermediateStateCalcStage());
_messagePipeline.addStage(new MessageThrottleStage());
diff --git
a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
index cec0d2b..307022f 100644
---
a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
+++
b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
@@ -33,11 +33,11 @@ import
org.apache.helix.controller.stages.BestPossibleStateCalcStage;
import org.apache.helix.controller.stages.BestPossibleStateOutput;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.controller.stages.IntermediateStateCalcStage;
+import org.apache.helix.controller.stages.MessageGenerationPhase;
import org.apache.helix.controller.stages.MessageOutput;
import org.apache.helix.controller.stages.MessageSelectionStage;
import org.apache.helix.controller.stages.MessageThrottleStage;
import org.apache.helix.controller.stages.ReadClusterDataStage;
-import
org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
@@ -199,7 +199,7 @@ public class TestP2PStateTransitionMessages extends
BaseStageTest {
event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(),
bestPossibleStateOutput);
pipeline = new Pipeline("test");
- pipeline.addStage(new ResourceMessageGenerationPhase());
+ pipeline.addStage(new MessageGenerationPhase());
pipeline.addStage(new MessageSelectionStage());
pipeline.addStage(new IntermediateStateCalcStage());
pipeline.addStage(new MessageThrottleStage());
@@ -223,7 +223,7 @@ public class TestP2PStateTransitionMessages extends
BaseStageTest {
pipeline = new Pipeline("test");
- pipeline.addStage(new ResourceMessageGenerationPhase());
+ pipeline.addStage(new MessageGenerationPhase());
pipeline.addStage(new MessageSelectionStage());
pipeline.addStage(new IntermediateStateCalcStage());
pipeline.addStage(new MessageThrottleStage());
@@ -246,7 +246,7 @@ public class TestP2PStateTransitionMessages extends
BaseStageTest {
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
pipeline = new Pipeline("test");
- pipeline.addStage(new ResourceMessageGenerationPhase());
+ pipeline.addStage(new MessageGenerationPhase());
pipeline.addStage(new MessageSelectionStage());
pipeline.addStage(new MessageThrottleStage());
@@ -359,7 +359,7 @@ public class TestP2PStateTransitionMessages extends
BaseStageTest {
Pipeline pipeline = new Pipeline("test");
pipeline.addStage(new ReadClusterDataStage());
pipeline.addStage(new BestPossibleStateCalcStage());
- pipeline.addStage(new ResourceMessageGenerationPhase());
+ pipeline.addStage(new MessageGenerationPhase());
pipeline.addStage(new MessageSelectionStage());
pipeline.addStage(new IntermediateStateCalcStage());
pipeline.addStage(new MessageThrottleStage());
diff --git
a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java
b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java
index f685bf4..ea2a4aa 100644
---
a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java
+++
b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java
@@ -35,9 +35,8 @@ import
org.apache.helix.controller.stages.BestPossibleStateOutput;
import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.ClusterEventType;
import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.controller.stages.IntermediateStateOutput;
+import org.apache.helix.controller.stages.MessageGenerationPhase;
import org.apache.helix.controller.stages.MessageOutput;
-import
org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.model.ClusterConfig;
@@ -61,7 +60,7 @@ public class TestP2PWithStateCancellationMessage extends
BaseStageTest {
@Test
public void testP2PWithStateCancellationMessage() {
ClusterEvent event = generateClusterEvent();
- runStage(event, new ResourceMessageGenerationPhase());
+ runStage(event, new MessageGenerationPhase());
MessageOutput messageOutput =
event.getAttribute(AttributeName.MESSAGES_ALL.name());
// No message should be sent for partition 0
Assert.assertEquals(messageOutput.getMessages(RESOURCE_NAME, new
Partition("0")).size(), 0);
diff --git
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java
index ccc1431..57a3ad0 100644
---
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java
+++
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java
@@ -13,9 +13,9 @@ import
org.apache.helix.controller.stages.BestPossibleStateCalcStage;
import org.apache.helix.controller.stages.BestPossibleStateOutput;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.controller.stages.IntermediateStateCalcStage;
+import org.apache.helix.controller.stages.MessageGenerationPhase;
import org.apache.helix.controller.stages.MessageSelectionStage;
import org.apache.helix.controller.stages.ReadClusterDataStage;
-import
org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
@@ -79,7 +79,7 @@ public class TestRebalancerMetrics extends BaseStageTest {
setupThrottleConfig(cache.getClusterConfig(),
StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE,
maxPending);
runStage(event, new BestPossibleStateCalcStage());
- runStage(event, new ResourceMessageGenerationPhase());
+ runStage(event, new MessageGenerationPhase());
runStage(event, new MessageSelectionStage());
runStage(event, new IntermediateStateCalcStage());
@@ -140,7 +140,7 @@ public class TestRebalancerMetrics extends BaseStageTest {
setupThrottleConfig(cache.getClusterConfig(),
StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE, maxPending);
runStage(event, new BestPossibleStateCalcStage());
- runStage(event, new ResourceMessageGenerationPhase());
+ runStage(event, new MessageGenerationPhase());
runStage(event, new MessageSelectionStage());
runStage(event, new IntermediateStateCalcStage());