This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 17c52da9422fbde427eb5b1309ddeeda389c092c
Author: Junkai Xue <[email protected]>
AuthorDate: Wed May 26 16:44:46 2021 -0700

    Refactor/clean up code without logic change (#1760)
    
    This commmit contains:
    1. Remove unused functions and logic
    2. Combine the Resource/Task message generation into one stage, since they 
are both relying on best possible result.
---
 .../helix/controller/GenericHelixController.java   |  8 +-
 .../stages/IntermediateStateCalcStage.java         | 93 ----------------------
 .../controller/stages/MessageGenerationPhase.java  | 14 ++--
 .../resource/ResourceMessageGenerationPhase.java   | 38 ---------
 .../stages/task/TaskMessageGenerationPhase.java    | 38 ---------
 .../stages/TestCancellationMessageGeneration.java  | 14 ++--
 .../controller/stages/TestRebalancePipeline.java   | 13 ++-
 .../controller/TestRedundantDroppedMessage.java    |  4 +-
 .../messaging/p2pMessage/TestP2PMessages.java      |  4 +-
 .../TestP2PMessagesAvoidDuplicatedMessage.java     |  6 +-
 .../p2pMessage/TestP2PStateTransitionMessages.java | 10 +--
 .../TestP2PWithStateCancellationMessage.java       |  5 +-
 .../monitoring/mbeans/TestRebalancerMetrics.java   |  6 +-
 13 files changed, 42 insertions(+), 211 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 4baac38..b8fac92 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -19,7 +19,6 @@ package org.apache.helix.controller;
  * under the License.
  */
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -82,6 +81,7 @@ import 
org.apache.helix.controller.stages.CustomizedViewAggregationStage;
 import org.apache.helix.controller.stages.ExternalViewComputeStage;
 import org.apache.helix.controller.stages.IntermediateStateCalcStage;
 import org.apache.helix.controller.stages.MaintenanceRecoveryStage;
+import org.apache.helix.controller.stages.MessageGenerationPhase;
 import org.apache.helix.controller.stages.MessageSelectionStage;
 import org.apache.helix.controller.stages.MessageThrottleStage;
 import org.apache.helix.controller.stages.PersistAssignmentStage;
@@ -92,9 +92,7 @@ import 
org.apache.helix.controller.stages.TargetExteralViewCalcStage;
 import org.apache.helix.controller.stages.TaskGarbageCollectionStage;
 import org.apache.helix.controller.stages.TopStateHandoffReportStage;
 import 
org.apache.helix.controller.stages.resource.ResourceMessageDispatchStage;
-import 
org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
 import org.apache.helix.controller.stages.task.TaskMessageDispatchStage;
-import org.apache.helix.controller.stages.task.TaskMessageGenerationPhase;
 import org.apache.helix.controller.stages.task.TaskPersistDataStage;
 import org.apache.helix.controller.stages.task.TaskSchedulingStage;
 import org.apache.helix.model.ClusterConfig;
@@ -516,7 +514,7 @@ public class GenericHelixController implements 
IdealStateChangeListener, LiveIns
       // Need to add MaintenanceRecoveryStage here because 
MAX_PARTITIONS_PER_INSTANCE check could
       // only occur after IntermediateStateCalcStage calculation
       rebalancePipeline.addStage(new MaintenanceRecoveryStage());
-      rebalancePipeline.addStage(new ResourceMessageGenerationPhase());
+      rebalancePipeline.addStage(new MessageGenerationPhase());
       rebalancePipeline.addStage(new MessageSelectionStage());
       // The IntermediateStateCalcStage should be applied after message 
selection
       // Messages are throttled already removed by IntermediateStateCalcStage 
in MessageSelection output
@@ -598,7 +596,7 @@ public class GenericHelixController implements 
IdealStateChangeListener, LiveIns
       rebalancePipeline.addStage(new TaskSchedulingStage());
       rebalancePipeline.addStage(new TaskPersistDataStage());
       rebalancePipeline.addStage(new TaskGarbageCollectionStage());
-      rebalancePipeline.addStage(new TaskMessageGenerationPhase());
+      rebalancePipeline.addStage(new MessageGenerationPhase());
       rebalancePipeline.addStage(new TaskMessageDispatchStage());
 
       // backward compatibility check
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index b06352d..36bc9cb 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -582,73 +582,6 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
   }
 
   /**
-   * For a partition, given its preferenceList, bestPossibleState, and 
currentState, determine which
-   * type of rebalance is needed to model IdealState's states defined by the 
state model definition.
-   * @return RebalanceType needed to bring the replicas to idea states
-   *         RECOVERY_BALANCE - not all required states (replicas) are 
available through all
-   *         replicas, or the partition is disabled
-   *         NONE - current state matches the ideal state
-   *         LOAD_BALANCE - although all replicas required exist, Helix needs 
to optimize the
-   *         allocation
-   */
-  private RebalanceType getRebalanceType(ResourceControllerDataProvider cache,
-      Map<String, String> bestPossibleMap, List<String> preferenceList,
-      StateModelDefinition stateModelDef, Map<String, String> currentStateMap,
-      IdealState idealState, String partitionName) {
-    if (preferenceList == null) {
-      preferenceList = Collections.emptyList();
-    }
-
-    // If there is a minimum active replica number specified in IS, we should 
respect it.
-    // TODO: We should implement the per replica level throttling with 
generated message
-    // Issue: https://github.com/apache/helix/issues/343
-    int replica = idealState.getMinActiveReplicas() == -1
-        ? idealState.getReplicaCount(preferenceList.size())
-        : idealState.getMinActiveReplicas();
-    Set<String> activeList = new HashSet<>(preferenceList);
-    activeList.retainAll(cache.getEnabledLiveInstances());
-
-    // For each state, check that this partition currently has the required 
number of that state as
-    // required by StateModelDefinition.
-    LinkedHashMap<String, Integer> expectedStateCountMap =
-        stateModelDef.getStateCountMap(activeList.size(), replica); // 
StateModelDefinition's counts
-    // Current counts without disabled partitions or disabled instances
-    Map<String, String> currentStateMapWithoutDisabled = new 
HashMap<>(currentStateMap);
-    currentStateMapWithoutDisabled.keySet().removeAll(
-        cache.getDisabledInstancesForPartition(idealState.getResourceName(), 
partitionName));
-    Map<String, Integer> currentStateCounts =
-        StateModelDefinition.getStateCounts(currentStateMapWithoutDisabled);
-
-    // Go through each state and compare counts
-    for (String state : expectedStateCountMap.keySet()) {
-      Integer expectedCount = expectedStateCountMap.get(state);
-      Integer currentCount = currentStateCounts.get(state);
-      expectedCount = expectedCount == null ? 0 : expectedCount;
-      currentCount = currentCount == null ? 0 : currentCount;
-
-      // If counts do not match up, this partition requires recovery
-      if (currentCount < expectedCount) {
-        // Recovery is not needed in cases where this partition just started, 
was dropped, or is in
-        // error
-        if (!state.equals(HelixDefinedState.DROPPED.name())
-            && !state.equals(HelixDefinedState.ERROR.name())
-            && !state.equals(stateModelDef.getInitialState())) {
-          return RebalanceType.RECOVERY_BALANCE;
-        }
-      }
-    }
-    // No recovery needed, all expected replicas exist
-    // Check if this partition is actually in the BestPossibleState
-    if (currentStateMap.equals(bestPossibleMap)) {
-      return RebalanceType.NONE; // No further action required
-    } else {
-      return RebalanceType.LOAD_BALANCE; // Required state counts are 
satisfied, but in order to
-      // achieve BestPossibleState, load balance may be required
-      // to shift replicas around
-    }
-  }
-
-  /**
    * Determine the message rebalance type with message and current states.
    * @param desiredStates         Ideally how may states we needed for 
guarantee the health of replica
    * @param message               The message to be determined what is the 
rebalance type
@@ -914,30 +847,4 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
       });
     }
   }
-
-  /**
-   * Handle a partition with a pending message so that the partition will not 
be double-charged or double-assigned during recovery and load balance.
-   * @param partition
-   * @param partitionsNeedRecovery
-   * @param partitionsNeedLoadbalance
-   * @param rebalanceType
-   */
-  private void handlePendingStateTransitionsForThrottling(Partition partition,
-      Set<Partition> partitionsNeedRecovery, Set<Partition> 
partitionsNeedLoadbalance,
-      RebalanceType rebalanceType, PartitionStateMap 
bestPossiblePartitionStateMap,
-      PartitionStateMap intermediatePartitionStateMap) {
-    // Pass the best possible state directly into intermediatePartitionStateMap
-    // This is safe to do so because we already have a pending transition for 
this partition, implying that the assignment has been made in previous pipeline
-    intermediatePartitionStateMap
-        .setState(partition, 
bestPossiblePartitionStateMap.getPartitionMap(partition));
-    // Remove the partition's name from the set of partition (names) that need 
to be charged and assigned to prevent double-processing
-    switch (rebalanceType) {
-    case RECOVERY_BALANCE:
-      partitionsNeedRecovery.remove(partition);
-      break;
-    case LOAD_BALANCE:
-      partitionsNeedLoadbalance.remove(partition);
-      break;
-    }
-  }
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index 3112327..836e5df 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -58,7 +58,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Compares the currentState, pendingState with IdealState and generate 
messages
  */
-public abstract class MessageGenerationPhase extends AbstractBaseStage {
+public class MessageGenerationPhase extends AbstractBaseStage {
   private final static String NO_DESIRED_STATE = "NoDesiredState";
 
   // If we see there is any invalid pending message leaving on host, i.e. 
message
@@ -75,8 +75,10 @@ public abstract class MessageGenerationPhase extends 
AbstractBaseStage {
 
   private static Logger logger = 
LoggerFactory.getLogger(MessageGenerationPhase.class);
 
-  protected void processEvent(ClusterEvent event, ResourcesStateMap 
resourcesStateMap)
-      throws Exception {
+  @Override
+  public void process(ClusterEvent event) throws Exception {
+    BestPossibleStateOutput bestPossibleStateOutput =
+        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
     _eventId = event.getEventId();
     HelixManager manager = 
event.getAttribute(AttributeName.helixmanager.name());
     BaseControllerDataProvider cache = 
event.getAttribute(AttributeName.ControllerDataProvider.name());
@@ -86,9 +88,9 @@ public abstract class MessageGenerationPhase extends 
AbstractBaseStage {
 
     Map<String, Map<String, Message>> messagesToCleanUp = new HashMap<>();
     if (manager == null || cache == null || resourceMap == null || 
currentStateOutput == null
-        || resourcesStateMap == null) {
+        || bestPossibleStateOutput == null) {
       throw new StageException("Missing attributes in event:" + event
-          + ". Requires 
HelixManager|DataCache|RESOURCES|CURRENT_STATE|INTERMEDIATE_STATE");
+          + ". Requires 
HelixManager|DataCache|RESOURCES|CURRENT_STATE|BESTPOSSIBLE_STATE");
     }
 
     Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
@@ -101,7 +103,7 @@ public abstract class MessageGenerationPhase extends 
AbstractBaseStage {
 
     for (Resource resource : resourceMap.values()) {
       try {
-        generateMessage(resource, cache, resourcesStateMap, 
currentStateOutput, manager,
+        generateMessage(resource, cache, bestPossibleStateOutput, 
currentStateOutput, manager,
             sessionIdMap, event.getEventType(), output, messagesToCleanUp);
       } catch (HelixException ex) {
         LogUtil.logError(logger, _eventId,
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/resource/ResourceMessageGenerationPhase.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/resource/ResourceMessageGenerationPhase.java
deleted file mode 100644
index 73b309e..0000000
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/resource/ResourceMessageGenerationPhase.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package org.apache.helix.controller.stages.resource;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.controller.stages.AttributeName;
-import org.apache.helix.controller.stages.BestPossibleStateOutput;
-import org.apache.helix.controller.stages.ClusterEvent;
-import org.apache.helix.controller.stages.IntermediateStateOutput;
-import org.apache.helix.controller.stages.MessageGenerationPhase;
-
-/**
- * Compares the currentState, pendingState with IdealState and generate 
messages for regular resource
- */
-public class ResourceMessageGenerationPhase extends MessageGenerationPhase {
-  @Override
-  public void process(ClusterEvent event) throws Exception {
-    BestPossibleStateOutput bestPossibleStateOutput =
-        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
-    processEvent(event, bestPossibleStateOutput);
-  }
-}
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskMessageGenerationPhase.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskMessageGenerationPhase.java
deleted file mode 100644
index 5ef6c93..0000000
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskMessageGenerationPhase.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package org.apache.helix.controller.stages.task;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.controller.stages.AttributeName;
-import org.apache.helix.controller.stages.BestPossibleStateOutput;
-import org.apache.helix.controller.stages.ClusterEvent;
-import org.apache.helix.controller.stages.MessageGenerationPhase;
-
-/**
- * Compares the currentState, pendingState with IdealState and generate 
messages for task pipeline
- */
-
-public class TaskMessageGenerationPhase extends MessageGenerationPhase {
-  @Override
-  public void process(ClusterEvent event) throws Exception {
-    BestPossibleStateOutput bestPossibleStateOutput =
-        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
-    processEvent(event, bestPossibleStateOutput);
-  }
-}
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java
 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java
index b231e6d..2fcfc3e 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java
@@ -100,15 +100,16 @@ public class TestCancellationMessageGeneration extends 
MessageGenerationPhase {
     event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), 
resourceMap);
 
     // set up resource state map
-    ResourcesStateMap resourcesStateMap = new ResourcesStateMap();
+    BestPossibleStateOutput bestPossibleStateOutput = new 
BestPossibleStateOutput();
     PartitionStateMap partitionStateMap = new PartitionStateMap(TEST_RESOURCE);
     Map<Partition, Map<String, String>> stateMap = 
partitionStateMap.getStateMap();
     Map<String, String> instanceStateMap = new HashMap<>();
     instanceStateMap.put(TEST_INSTANCE, HelixDefinedState.DROPPED.name());
     stateMap.put(partition, instanceStateMap);
-    resourcesStateMap.setState(TEST_RESOURCE, partition, instanceStateMap);
+    bestPossibleStateOutput.setState(TEST_RESOURCE, partition, 
instanceStateMap);
 
-    processEvent(event, resourcesStateMap);
+    event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), 
bestPossibleStateOutput);
+    process(event);
     MessageOutput output = 
event.getAttribute(AttributeName.MESSAGES_ALL.name());
     Assert.assertEquals(output.getMessages(TEST_RESOURCE, partition).size(), 
1);
   }
@@ -194,16 +195,17 @@ public class TestCancellationMessageGeneration extends 
MessageGenerationPhase {
     event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), 
resourceMap);
 
     // set up resource state map
-    ResourcesStateMap resourcesStateMap = new ResourcesStateMap();
+    BestPossibleStateOutput bestPossibleStateOutput = new 
BestPossibleStateOutput();
     PartitionStateMap partitionStateMap = new PartitionStateMap(TEST_RESOURCE);
     Map<Partition, Map<String, String>> stateMap = 
partitionStateMap.getStateMap();
     Map<String, String> instanceStateMap = new HashMap<>();
     instanceStateMap.put(TEST_INSTANCE, currentState);
     stateMap.put(partition, instanceStateMap);
-    resourcesStateMap.setState(TEST_RESOURCE, partition, instanceStateMap);
+    bestPossibleStateOutput.setState(TEST_RESOURCE, partition, 
instanceStateMap);
 
     // Process the event
-    processEvent(event, resourcesStateMap);
+    event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), 
bestPossibleStateOutput);
+    process(event);
     MessageOutput output = 
event.getAttribute(AttributeName.MESSAGES_ALL.name());
 
     return output.getMessages(TEST_RESOURCE, partition);
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
index d262b7b..8b1b7bc 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
@@ -38,7 +38,6 @@ import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.pipeline.Pipeline;
 import org.apache.helix.controller.pipeline.StageException;
 import 
org.apache.helix.controller.stages.resource.ResourceMessageDispatchStage;
-import 
org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
@@ -97,7 +96,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     rebalancePipeline.addStage(new ResourceComputationStage());
     rebalancePipeline.addStage(new CurrentStateComputationStage());
     rebalancePipeline.addStage(new BestPossibleStateCalcStage());
-    rebalancePipeline.addStage(new ResourceMessageGenerationPhase());
+    rebalancePipeline.addStage(new MessageGenerationPhase());
     rebalancePipeline.addStage(new MessageSelectionStage());
     rebalancePipeline.addStage(new IntermediateStateCalcStage());
     rebalancePipeline.addStage(new MessageThrottleStage());
@@ -133,7 +132,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
 
     Pipeline messagePipeline = new Pipeline();
     messagePipeline.addStage(new BestPossibleStateCalcStage());
-    messagePipeline.addStage(new ResourceMessageGenerationPhase());
+    messagePipeline.addStage(new MessageGenerationPhase());
     messagePipeline.addStage(new MessageSelectionStage());
     messagePipeline.addStage(new IntermediateStateCalcStage());
     messagePipeline.addStage(new MessageThrottleStage());
@@ -334,7 +333,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     rebalancePipeline.addStage(new ResourceComputationStage());
     rebalancePipeline.addStage(new CurrentStateComputationStage());
     rebalancePipeline.addStage(new BestPossibleStateCalcStage());
-    rebalancePipeline.addStage(new ResourceMessageGenerationPhase());
+    rebalancePipeline.addStage(new MessageGenerationPhase());
     rebalancePipeline.addStage(new MessageSelectionStage());
     rebalancePipeline.addStage(new IntermediateStateCalcStage());
     rebalancePipeline.addStage(new MessageThrottleStage());
@@ -431,7 +430,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     rebalancePipeline.addStage(new ResourceComputationStage());
     rebalancePipeline.addStage(new CurrentStateComputationStage());
     rebalancePipeline.addStage(new BestPossibleStateCalcStage());
-    rebalancePipeline.addStage(new ResourceMessageGenerationPhase());
+    rebalancePipeline.addStage(new MessageGenerationPhase());
     rebalancePipeline.addStage(new MessageSelectionStage());
     rebalancePipeline.addStage(new IntermediateStateCalcStage());
     rebalancePipeline.addStage(new MessageThrottleStage());
@@ -510,7 +509,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     rebalancePipeline.addStage(new ResourceComputationStage());
     rebalancePipeline.addStage(new CurrentStateComputationStage());
     rebalancePipeline.addStage(new BestPossibleStateCalcStage());
-    rebalancePipeline.addStage(new ResourceMessageGenerationPhase());
+    rebalancePipeline.addStage(new MessageGenerationPhase());
     rebalancePipeline.addStage(new MessageSelectionStage());
     rebalancePipeline.addStage(new IntermediateStateCalcStage());
     rebalancePipeline.addStage(new MessageThrottleStage());
@@ -590,7 +589,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     rebalancePipeline.addStage(new ResourceComputationStage());
     rebalancePipeline.addStage(new CurrentStateComputationStage());
     rebalancePipeline.addStage(new BestPossibleStateCalcStage());
-    rebalancePipeline.addStage(new ResourceMessageGenerationPhase());
+    rebalancePipeline.addStage(new MessageGenerationPhase());
     rebalancePipeline.addStage(new MessageSelectionStage());
     rebalancePipeline.addStage(new IntermediateStateCalcStage());
     rebalancePipeline.addStage(new MessageThrottleStage());
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java
 
b/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java
index 9539b98..899a427 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java
@@ -30,10 +30,10 @@ import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.controller.stages.ClusterEventType;
 import org.apache.helix.controller.stages.CurrentStateComputationStage;
 import org.apache.helix.controller.stages.IntermediateStateCalcStage;
+import org.apache.helix.controller.stages.MessageGenerationPhase;
 import org.apache.helix.controller.stages.MessageOutput;
 import org.apache.helix.controller.stages.MessageSelectionStage;
 import org.apache.helix.controller.stages.ResourceComputationStage;
-import 
org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Partition;
 import org.apache.helix.task.TaskSynchronizedTestBase;
@@ -77,7 +77,7 @@ public class TestRedundantDroppedMessage extends 
TaskSynchronizedTestBase {
     runStage(event, new CurrentStateComputationStage());
     runStage(event, new BestPossibleStateCalcStage());
     Assert.assertEquals(cache.getCachedIdealMapping().size(), 1);
-    runStage(event, new ResourceMessageGenerationPhase());
+    runStage(event, new MessageGenerationPhase());
     runStage(event, new MessageSelectionStage());
     runStage(event, new IntermediateStateCalcStage());
 
diff --git 
a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java
 
b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java
index 1f1d7bd..649ff7e 100644
--- 
a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java
+++ 
b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java
@@ -37,12 +37,12 @@ import org.apache.helix.controller.stages.BaseStageTest;
 import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
 import org.apache.helix.controller.stages.CurrentStateComputationStage;
 import org.apache.helix.controller.stages.IntermediateStateCalcStage;
+import org.apache.helix.controller.stages.MessageGenerationPhase;
 import org.apache.helix.controller.stages.MessageSelectionStage;
 import org.apache.helix.controller.stages.MessageThrottleStage;
 import org.apache.helix.controller.stages.ReadClusterDataStage;
 import org.apache.helix.controller.stages.ResourceComputationStage;
 import 
org.apache.helix.controller.stages.resource.ResourceMessageDispatchStage;
-import 
org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.CurrentState;
@@ -100,7 +100,7 @@ public class TestP2PMessages extends BaseStageTest {
     _fullPipeline.addStage(new ResourceComputationStage());
     _fullPipeline.addStage(new CurrentStateComputationStage());
     _fullPipeline.addStage(new BestPossibleStateCalcStage());
-    _fullPipeline.addStage(new ResourceMessageGenerationPhase());
+    _fullPipeline.addStage(new MessageGenerationPhase());
     _fullPipeline.addStage(new MessageSelectionStage());
     _fullPipeline.addStage(new IntermediateStateCalcStage());
     _fullPipeline.addStage(new MessageThrottleStage());
diff --git 
a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
 
b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
index 1d164c5..40d5c97 100644
--- 
a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
+++ 
b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
@@ -33,11 +33,11 @@ import org.apache.helix.controller.stages.BaseStageTest;
 import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.controller.stages.IntermediateStateCalcStage;
+import org.apache.helix.controller.stages.MessageGenerationPhase;
 import org.apache.helix.controller.stages.MessageOutput;
 import org.apache.helix.controller.stages.MessageSelectionStage;
 import org.apache.helix.controller.stages.MessageThrottleStage;
 import org.apache.helix.controller.stages.ReadClusterDataStage;
-import 
org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
@@ -87,13 +87,13 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends 
BaseStageTest {
     _fullPipeline = new Pipeline("FullPipeline");
     _fullPipeline.addStage(new ReadClusterDataStage());
     _fullPipeline.addStage(new BestPossibleStateCalcStage());
-    _fullPipeline.addStage(new ResourceMessageGenerationPhase());
+    _fullPipeline.addStage(new MessageGenerationPhase());
     _fullPipeline.addStage(new MessageSelectionStage());
     _fullPipeline.addStage(new IntermediateStateCalcStage());
     _fullPipeline.addStage(new MessageThrottleStage());
 
     _messagePipeline = new Pipeline("MessagePipeline");
-    _messagePipeline.addStage(new ResourceMessageGenerationPhase());
+    _messagePipeline.addStage(new MessageGenerationPhase());
     _messagePipeline.addStage(new MessageSelectionStage());
     _messagePipeline.addStage(new IntermediateStateCalcStage());
     _messagePipeline.addStage(new MessageThrottleStage());
diff --git 
a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
 
b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
index cec0d2b..307022f 100644
--- 
a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
+++ 
b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
@@ -33,11 +33,11 @@ import 
org.apache.helix.controller.stages.BestPossibleStateCalcStage;
 import org.apache.helix.controller.stages.BestPossibleStateOutput;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.controller.stages.IntermediateStateCalcStage;
+import org.apache.helix.controller.stages.MessageGenerationPhase;
 import org.apache.helix.controller.stages.MessageOutput;
 import org.apache.helix.controller.stages.MessageSelectionStage;
 import org.apache.helix.controller.stages.MessageThrottleStage;
 import org.apache.helix.controller.stages.ReadClusterDataStage;
-import 
org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
@@ -199,7 +199,7 @@ public class TestP2PStateTransitionMessages extends 
BaseStageTest {
     event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), 
bestPossibleStateOutput);
 
     pipeline = new Pipeline("test");
-    pipeline.addStage(new ResourceMessageGenerationPhase());
+    pipeline.addStage(new MessageGenerationPhase());
     pipeline.addStage(new MessageSelectionStage());
     pipeline.addStage(new IntermediateStateCalcStage());
     pipeline.addStage(new MessageThrottleStage());
@@ -223,7 +223,7 @@ public class TestP2PStateTransitionMessages extends 
BaseStageTest {
 
 
     pipeline = new Pipeline("test");
-    pipeline.addStage(new ResourceMessageGenerationPhase());
+    pipeline.addStage(new MessageGenerationPhase());
     pipeline.addStage(new MessageSelectionStage());
     pipeline.addStage(new IntermediateStateCalcStage());
     pipeline.addStage(new MessageThrottleStage());
@@ -246,7 +246,7 @@ public class TestP2PStateTransitionMessages extends 
BaseStageTest {
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
 
     pipeline = new Pipeline("test");
-    pipeline.addStage(new ResourceMessageGenerationPhase());
+    pipeline.addStage(new MessageGenerationPhase());
     pipeline.addStage(new MessageSelectionStage());
     pipeline.addStage(new MessageThrottleStage());
 
@@ -359,7 +359,7 @@ public class TestP2PStateTransitionMessages extends 
BaseStageTest {
     Pipeline pipeline = new Pipeline("test");
     pipeline.addStage(new ReadClusterDataStage());
     pipeline.addStage(new BestPossibleStateCalcStage());
-    pipeline.addStage(new ResourceMessageGenerationPhase());
+    pipeline.addStage(new MessageGenerationPhase());
     pipeline.addStage(new MessageSelectionStage());
     pipeline.addStage(new IntermediateStateCalcStage());
     pipeline.addStage(new MessageThrottleStage());
diff --git 
a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java
 
b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java
index f685bf4..ea2a4aa 100644
--- 
a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java
+++ 
b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java
@@ -35,9 +35,8 @@ import 
org.apache.helix.controller.stages.BestPossibleStateOutput;
 import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.controller.stages.ClusterEventType;
 import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.controller.stages.IntermediateStateOutput;
+import org.apache.helix.controller.stages.MessageGenerationPhase;
 import org.apache.helix.controller.stages.MessageOutput;
-import 
org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.model.ClusterConfig;
@@ -61,7 +60,7 @@ public class TestP2PWithStateCancellationMessage extends 
BaseStageTest {
   @Test
   public void testP2PWithStateCancellationMessage() {
     ClusterEvent event = generateClusterEvent();
-    runStage(event, new ResourceMessageGenerationPhase());
+    runStage(event, new MessageGenerationPhase());
     MessageOutput messageOutput = 
event.getAttribute(AttributeName.MESSAGES_ALL.name());
     // No message should be sent for partition 0
     Assert.assertEquals(messageOutput.getMessages(RESOURCE_NAME, new 
Partition("0")).size(), 0);
diff --git 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java
 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java
index ccc1431..57a3ad0 100644
--- 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java
+++ 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java
@@ -13,9 +13,9 @@ import 
org.apache.helix.controller.stages.BestPossibleStateCalcStage;
 import org.apache.helix.controller.stages.BestPossibleStateOutput;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.controller.stages.IntermediateStateCalcStage;
+import org.apache.helix.controller.stages.MessageGenerationPhase;
 import org.apache.helix.controller.stages.MessageSelectionStage;
 import org.apache.helix.controller.stages.ReadClusterDataStage;
-import 
org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
@@ -79,7 +79,7 @@ public class TestRebalancerMetrics extends BaseStageTest {
     setupThrottleConfig(cache.getClusterConfig(),
         StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE, 
maxPending);
     runStage(event, new BestPossibleStateCalcStage());
-    runStage(event, new ResourceMessageGenerationPhase());
+    runStage(event, new MessageGenerationPhase());
     runStage(event, new MessageSelectionStage());
     runStage(event, new IntermediateStateCalcStage());
 
@@ -140,7 +140,7 @@ public class TestRebalancerMetrics extends BaseStageTest {
     setupThrottleConfig(cache.getClusterConfig(),
         StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE, maxPending);
     runStage(event, new BestPossibleStateCalcStage());
-    runStage(event, new ResourceMessageGenerationPhase());
+    runStage(event, new MessageGenerationPhase());
     runStage(event, new MessageSelectionStage());
     runStage(event, new IntermediateStateCalcStage());
 

Reply via email to