This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit 17c52da9422fbde427eb5b1309ddeeda389c092c Author: Junkai Xue <[email protected]> AuthorDate: Wed May 26 16:44:46 2021 -0700 Refactor/clean up code without logic change (#1760) This commmit contains: 1. Remove unused functions and logic 2. Combine the Resource/Task message generation into one stage, since they are both relying on best possible result. --- .../helix/controller/GenericHelixController.java | 8 +- .../stages/IntermediateStateCalcStage.java | 93 ---------------------- .../controller/stages/MessageGenerationPhase.java | 14 ++-- .../resource/ResourceMessageGenerationPhase.java | 38 --------- .../stages/task/TaskMessageGenerationPhase.java | 38 --------- .../stages/TestCancellationMessageGeneration.java | 14 ++-- .../controller/stages/TestRebalancePipeline.java | 13 ++- .../controller/TestRedundantDroppedMessage.java | 4 +- .../messaging/p2pMessage/TestP2PMessages.java | 4 +- .../TestP2PMessagesAvoidDuplicatedMessage.java | 6 +- .../p2pMessage/TestP2PStateTransitionMessages.java | 10 +-- .../TestP2PWithStateCancellationMessage.java | 5 +- .../monitoring/mbeans/TestRebalancerMetrics.java | 6 +- 13 files changed, 42 insertions(+), 211 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index 4baac38..b8fac92 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -19,7 +19,6 @@ package org.apache.helix.controller; * under the License. */ -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -82,6 +81,7 @@ import org.apache.helix.controller.stages.CustomizedViewAggregationStage; import org.apache.helix.controller.stages.ExternalViewComputeStage; import org.apache.helix.controller.stages.IntermediateStateCalcStage; import org.apache.helix.controller.stages.MaintenanceRecoveryStage; +import org.apache.helix.controller.stages.MessageGenerationPhase; import org.apache.helix.controller.stages.MessageSelectionStage; import org.apache.helix.controller.stages.MessageThrottleStage; import org.apache.helix.controller.stages.PersistAssignmentStage; @@ -92,9 +92,7 @@ import org.apache.helix.controller.stages.TargetExteralViewCalcStage; import org.apache.helix.controller.stages.TaskGarbageCollectionStage; import org.apache.helix.controller.stages.TopStateHandoffReportStage; import org.apache.helix.controller.stages.resource.ResourceMessageDispatchStage; -import org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase; import org.apache.helix.controller.stages.task.TaskMessageDispatchStage; -import org.apache.helix.controller.stages.task.TaskMessageGenerationPhase; import org.apache.helix.controller.stages.task.TaskPersistDataStage; import org.apache.helix.controller.stages.task.TaskSchedulingStage; import org.apache.helix.model.ClusterConfig; @@ -516,7 +514,7 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns // Need to add MaintenanceRecoveryStage here because MAX_PARTITIONS_PER_INSTANCE check could // only occur after IntermediateStateCalcStage calculation rebalancePipeline.addStage(new MaintenanceRecoveryStage()); - rebalancePipeline.addStage(new ResourceMessageGenerationPhase()); + rebalancePipeline.addStage(new MessageGenerationPhase()); rebalancePipeline.addStage(new MessageSelectionStage()); // The IntermediateStateCalcStage should be applied after message selection // Messages are throttled already removed by IntermediateStateCalcStage in MessageSelection output @@ -598,7 +596,7 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns rebalancePipeline.addStage(new TaskSchedulingStage()); rebalancePipeline.addStage(new TaskPersistDataStage()); rebalancePipeline.addStage(new TaskGarbageCollectionStage()); - rebalancePipeline.addStage(new TaskMessageGenerationPhase()); + rebalancePipeline.addStage(new MessageGenerationPhase()); rebalancePipeline.addStage(new TaskMessageDispatchStage()); // backward compatibility check diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java index b06352d..36bc9cb 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java @@ -582,73 +582,6 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { } /** - * For a partition, given its preferenceList, bestPossibleState, and currentState, determine which - * type of rebalance is needed to model IdealState's states defined by the state model definition. - * @return RebalanceType needed to bring the replicas to idea states - * RECOVERY_BALANCE - not all required states (replicas) are available through all - * replicas, or the partition is disabled - * NONE - current state matches the ideal state - * LOAD_BALANCE - although all replicas required exist, Helix needs to optimize the - * allocation - */ - private RebalanceType getRebalanceType(ResourceControllerDataProvider cache, - Map<String, String> bestPossibleMap, List<String> preferenceList, - StateModelDefinition stateModelDef, Map<String, String> currentStateMap, - IdealState idealState, String partitionName) { - if (preferenceList == null) { - preferenceList = Collections.emptyList(); - } - - // If there is a minimum active replica number specified in IS, we should respect it. - // TODO: We should implement the per replica level throttling with generated message - // Issue: https://github.com/apache/helix/issues/343 - int replica = idealState.getMinActiveReplicas() == -1 - ? idealState.getReplicaCount(preferenceList.size()) - : idealState.getMinActiveReplicas(); - Set<String> activeList = new HashSet<>(preferenceList); - activeList.retainAll(cache.getEnabledLiveInstances()); - - // For each state, check that this partition currently has the required number of that state as - // required by StateModelDefinition. - LinkedHashMap<String, Integer> expectedStateCountMap = - stateModelDef.getStateCountMap(activeList.size(), replica); // StateModelDefinition's counts - // Current counts without disabled partitions or disabled instances - Map<String, String> currentStateMapWithoutDisabled = new HashMap<>(currentStateMap); - currentStateMapWithoutDisabled.keySet().removeAll( - cache.getDisabledInstancesForPartition(idealState.getResourceName(), partitionName)); - Map<String, Integer> currentStateCounts = - StateModelDefinition.getStateCounts(currentStateMapWithoutDisabled); - - // Go through each state and compare counts - for (String state : expectedStateCountMap.keySet()) { - Integer expectedCount = expectedStateCountMap.get(state); - Integer currentCount = currentStateCounts.get(state); - expectedCount = expectedCount == null ? 0 : expectedCount; - currentCount = currentCount == null ? 0 : currentCount; - - // If counts do not match up, this partition requires recovery - if (currentCount < expectedCount) { - // Recovery is not needed in cases where this partition just started, was dropped, or is in - // error - if (!state.equals(HelixDefinedState.DROPPED.name()) - && !state.equals(HelixDefinedState.ERROR.name()) - && !state.equals(stateModelDef.getInitialState())) { - return RebalanceType.RECOVERY_BALANCE; - } - } - } - // No recovery needed, all expected replicas exist - // Check if this partition is actually in the BestPossibleState - if (currentStateMap.equals(bestPossibleMap)) { - return RebalanceType.NONE; // No further action required - } else { - return RebalanceType.LOAD_BALANCE; // Required state counts are satisfied, but in order to - // achieve BestPossibleState, load balance may be required - // to shift replicas around - } - } - - /** * Determine the message rebalance type with message and current states. * @param desiredStates Ideally how may states we needed for guarantee the health of replica * @param message The message to be determined what is the rebalance type @@ -914,30 +847,4 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { }); } } - - /** - * Handle a partition with a pending message so that the partition will not be double-charged or double-assigned during recovery and load balance. - * @param partition - * @param partitionsNeedRecovery - * @param partitionsNeedLoadbalance - * @param rebalanceType - */ - private void handlePendingStateTransitionsForThrottling(Partition partition, - Set<Partition> partitionsNeedRecovery, Set<Partition> partitionsNeedLoadbalance, - RebalanceType rebalanceType, PartitionStateMap bestPossiblePartitionStateMap, - PartitionStateMap intermediatePartitionStateMap) { - // Pass the best possible state directly into intermediatePartitionStateMap - // This is safe to do so because we already have a pending transition for this partition, implying that the assignment has been made in previous pipeline - intermediatePartitionStateMap - .setState(partition, bestPossiblePartitionStateMap.getPartitionMap(partition)); - // Remove the partition's name from the set of partition (names) that need to be charged and assigned to prevent double-processing - switch (rebalanceType) { - case RECOVERY_BALANCE: - partitionsNeedRecovery.remove(partition); - break; - case LOAD_BALANCE: - partitionsNeedLoadbalance.remove(partition); - break; - } - } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java index 3112327..836e5df 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java @@ -58,7 +58,7 @@ import org.slf4j.LoggerFactory; /** * Compares the currentState, pendingState with IdealState and generate messages */ -public abstract class MessageGenerationPhase extends AbstractBaseStage { +public class MessageGenerationPhase extends AbstractBaseStage { private final static String NO_DESIRED_STATE = "NoDesiredState"; // If we see there is any invalid pending message leaving on host, i.e. message @@ -75,8 +75,10 @@ public abstract class MessageGenerationPhase extends AbstractBaseStage { private static Logger logger = LoggerFactory.getLogger(MessageGenerationPhase.class); - protected void processEvent(ClusterEvent event, ResourcesStateMap resourcesStateMap) - throws Exception { + @Override + public void process(ClusterEvent event) throws Exception { + BestPossibleStateOutput bestPossibleStateOutput = + event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name()); _eventId = event.getEventId(); HelixManager manager = event.getAttribute(AttributeName.helixmanager.name()); BaseControllerDataProvider cache = event.getAttribute(AttributeName.ControllerDataProvider.name()); @@ -86,9 +88,9 @@ public abstract class MessageGenerationPhase extends AbstractBaseStage { Map<String, Map<String, Message>> messagesToCleanUp = new HashMap<>(); if (manager == null || cache == null || resourceMap == null || currentStateOutput == null - || resourcesStateMap == null) { + || bestPossibleStateOutput == null) { throw new StageException("Missing attributes in event:" + event - + ". Requires HelixManager|DataCache|RESOURCES|CURRENT_STATE|INTERMEDIATE_STATE"); + + ". Requires HelixManager|DataCache|RESOURCES|CURRENT_STATE|BESTPOSSIBLE_STATE"); } Map<String, LiveInstance> liveInstances = cache.getLiveInstances(); @@ -101,7 +103,7 @@ public abstract class MessageGenerationPhase extends AbstractBaseStage { for (Resource resource : resourceMap.values()) { try { - generateMessage(resource, cache, resourcesStateMap, currentStateOutput, manager, + generateMessage(resource, cache, bestPossibleStateOutput, currentStateOutput, manager, sessionIdMap, event.getEventType(), output, messagesToCleanUp); } catch (HelixException ex) { LogUtil.logError(logger, _eventId, diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/resource/ResourceMessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/resource/ResourceMessageGenerationPhase.java deleted file mode 100644 index 73b309e..0000000 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/resource/ResourceMessageGenerationPhase.java +++ /dev/null @@ -1,38 +0,0 @@ -package org.apache.helix.controller.stages.resource; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import org.apache.helix.controller.stages.AttributeName; -import org.apache.helix.controller.stages.BestPossibleStateOutput; -import org.apache.helix.controller.stages.ClusterEvent; -import org.apache.helix.controller.stages.IntermediateStateOutput; -import org.apache.helix.controller.stages.MessageGenerationPhase; - -/** - * Compares the currentState, pendingState with IdealState and generate messages for regular resource - */ -public class ResourceMessageGenerationPhase extends MessageGenerationPhase { - @Override - public void process(ClusterEvent event) throws Exception { - BestPossibleStateOutput bestPossibleStateOutput = - event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name()); - processEvent(event, bestPossibleStateOutput); - } -} diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskMessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskMessageGenerationPhase.java deleted file mode 100644 index 5ef6c93..0000000 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskMessageGenerationPhase.java +++ /dev/null @@ -1,38 +0,0 @@ -package org.apache.helix.controller.stages.task; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import org.apache.helix.controller.stages.AttributeName; -import org.apache.helix.controller.stages.BestPossibleStateOutput; -import org.apache.helix.controller.stages.ClusterEvent; -import org.apache.helix.controller.stages.MessageGenerationPhase; - -/** - * Compares the currentState, pendingState with IdealState and generate messages for task pipeline - */ - -public class TaskMessageGenerationPhase extends MessageGenerationPhase { - @Override - public void process(ClusterEvent event) throws Exception { - BestPossibleStateOutput bestPossibleStateOutput = - event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name()); - processEvent(event, bestPossibleStateOutput); - } -} diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java index b231e6d..2fcfc3e 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java @@ -100,15 +100,16 @@ public class TestCancellationMessageGeneration extends MessageGenerationPhase { event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap); // set up resource state map - ResourcesStateMap resourcesStateMap = new ResourcesStateMap(); + BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput(); PartitionStateMap partitionStateMap = new PartitionStateMap(TEST_RESOURCE); Map<Partition, Map<String, String>> stateMap = partitionStateMap.getStateMap(); Map<String, String> instanceStateMap = new HashMap<>(); instanceStateMap.put(TEST_INSTANCE, HelixDefinedState.DROPPED.name()); stateMap.put(partition, instanceStateMap); - resourcesStateMap.setState(TEST_RESOURCE, partition, instanceStateMap); + bestPossibleStateOutput.setState(TEST_RESOURCE, partition, instanceStateMap); - processEvent(event, resourcesStateMap); + event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput); + process(event); MessageOutput output = event.getAttribute(AttributeName.MESSAGES_ALL.name()); Assert.assertEquals(output.getMessages(TEST_RESOURCE, partition).size(), 1); } @@ -194,16 +195,17 @@ public class TestCancellationMessageGeneration extends MessageGenerationPhase { event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap); // set up resource state map - ResourcesStateMap resourcesStateMap = new ResourcesStateMap(); + BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput(); PartitionStateMap partitionStateMap = new PartitionStateMap(TEST_RESOURCE); Map<Partition, Map<String, String>> stateMap = partitionStateMap.getStateMap(); Map<String, String> instanceStateMap = new HashMap<>(); instanceStateMap.put(TEST_INSTANCE, currentState); stateMap.put(partition, instanceStateMap); - resourcesStateMap.setState(TEST_RESOURCE, partition, instanceStateMap); + bestPossibleStateOutput.setState(TEST_RESOURCE, partition, instanceStateMap); // Process the event - processEvent(event, resourcesStateMap); + event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput); + process(event); MessageOutput output = event.getAttribute(AttributeName.MESSAGES_ALL.name()); return output.getMessages(TEST_RESOURCE, partition); diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java index d262b7b..8b1b7bc 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java @@ -38,7 +38,6 @@ import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; import org.apache.helix.controller.pipeline.Pipeline; import org.apache.helix.controller.pipeline.StageException; import org.apache.helix.controller.stages.resource.ResourceMessageDispatchStage; -import org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase; import org.apache.helix.integration.manager.ClusterControllerManager; import org.apache.helix.manager.zk.ZKHelixAdmin; import org.apache.helix.manager.zk.ZKHelixDataAccessor; @@ -97,7 +96,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase { rebalancePipeline.addStage(new ResourceComputationStage()); rebalancePipeline.addStage(new CurrentStateComputationStage()); rebalancePipeline.addStage(new BestPossibleStateCalcStage()); - rebalancePipeline.addStage(new ResourceMessageGenerationPhase()); + rebalancePipeline.addStage(new MessageGenerationPhase()); rebalancePipeline.addStage(new MessageSelectionStage()); rebalancePipeline.addStage(new IntermediateStateCalcStage()); rebalancePipeline.addStage(new MessageThrottleStage()); @@ -133,7 +132,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase { Pipeline messagePipeline = new Pipeline(); messagePipeline.addStage(new BestPossibleStateCalcStage()); - messagePipeline.addStage(new ResourceMessageGenerationPhase()); + messagePipeline.addStage(new MessageGenerationPhase()); messagePipeline.addStage(new MessageSelectionStage()); messagePipeline.addStage(new IntermediateStateCalcStage()); messagePipeline.addStage(new MessageThrottleStage()); @@ -334,7 +333,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase { rebalancePipeline.addStage(new ResourceComputationStage()); rebalancePipeline.addStage(new CurrentStateComputationStage()); rebalancePipeline.addStage(new BestPossibleStateCalcStage()); - rebalancePipeline.addStage(new ResourceMessageGenerationPhase()); + rebalancePipeline.addStage(new MessageGenerationPhase()); rebalancePipeline.addStage(new MessageSelectionStage()); rebalancePipeline.addStage(new IntermediateStateCalcStage()); rebalancePipeline.addStage(new MessageThrottleStage()); @@ -431,7 +430,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase { rebalancePipeline.addStage(new ResourceComputationStage()); rebalancePipeline.addStage(new CurrentStateComputationStage()); rebalancePipeline.addStage(new BestPossibleStateCalcStage()); - rebalancePipeline.addStage(new ResourceMessageGenerationPhase()); + rebalancePipeline.addStage(new MessageGenerationPhase()); rebalancePipeline.addStage(new MessageSelectionStage()); rebalancePipeline.addStage(new IntermediateStateCalcStage()); rebalancePipeline.addStage(new MessageThrottleStage()); @@ -510,7 +509,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase { rebalancePipeline.addStage(new ResourceComputationStage()); rebalancePipeline.addStage(new CurrentStateComputationStage()); rebalancePipeline.addStage(new BestPossibleStateCalcStage()); - rebalancePipeline.addStage(new ResourceMessageGenerationPhase()); + rebalancePipeline.addStage(new MessageGenerationPhase()); rebalancePipeline.addStage(new MessageSelectionStage()); rebalancePipeline.addStage(new IntermediateStateCalcStage()); rebalancePipeline.addStage(new MessageThrottleStage()); @@ -590,7 +589,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase { rebalancePipeline.addStage(new ResourceComputationStage()); rebalancePipeline.addStage(new CurrentStateComputationStage()); rebalancePipeline.addStage(new BestPossibleStateCalcStage()); - rebalancePipeline.addStage(new ResourceMessageGenerationPhase()); + rebalancePipeline.addStage(new MessageGenerationPhase()); rebalancePipeline.addStage(new MessageSelectionStage()); rebalancePipeline.addStage(new IntermediateStateCalcStage()); rebalancePipeline.addStage(new MessageThrottleStage()); diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java b/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java index 9539b98..899a427 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java +++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java @@ -30,10 +30,10 @@ import org.apache.helix.controller.stages.ClusterEvent; import org.apache.helix.controller.stages.ClusterEventType; import org.apache.helix.controller.stages.CurrentStateComputationStage; import org.apache.helix.controller.stages.IntermediateStateCalcStage; +import org.apache.helix.controller.stages.MessageGenerationPhase; import org.apache.helix.controller.stages.MessageOutput; import org.apache.helix.controller.stages.MessageSelectionStage; import org.apache.helix.controller.stages.ResourceComputationStage; -import org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase; import org.apache.helix.model.IdealState; import org.apache.helix.model.Partition; import org.apache.helix.task.TaskSynchronizedTestBase; @@ -77,7 +77,7 @@ public class TestRedundantDroppedMessage extends TaskSynchronizedTestBase { runStage(event, new CurrentStateComputationStage()); runStage(event, new BestPossibleStateCalcStage()); Assert.assertEquals(cache.getCachedIdealMapping().size(), 1); - runStage(event, new ResourceMessageGenerationPhase()); + runStage(event, new MessageGenerationPhase()); runStage(event, new MessageSelectionStage()); runStage(event, new IntermediateStateCalcStage()); diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java index 1f1d7bd..649ff7e 100644 --- a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java +++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java @@ -37,12 +37,12 @@ import org.apache.helix.controller.stages.BaseStageTest; import org.apache.helix.controller.stages.BestPossibleStateCalcStage; import org.apache.helix.controller.stages.CurrentStateComputationStage; import org.apache.helix.controller.stages.IntermediateStateCalcStage; +import org.apache.helix.controller.stages.MessageGenerationPhase; import org.apache.helix.controller.stages.MessageSelectionStage; import org.apache.helix.controller.stages.MessageThrottleStage; import org.apache.helix.controller.stages.ReadClusterDataStage; import org.apache.helix.controller.stages.ResourceComputationStage; import org.apache.helix.controller.stages.resource.ResourceMessageDispatchStage; -import org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase; import org.apache.helix.model.BuiltInStateModelDefinitions; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.CurrentState; @@ -100,7 +100,7 @@ public class TestP2PMessages extends BaseStageTest { _fullPipeline.addStage(new ResourceComputationStage()); _fullPipeline.addStage(new CurrentStateComputationStage()); _fullPipeline.addStage(new BestPossibleStateCalcStage()); - _fullPipeline.addStage(new ResourceMessageGenerationPhase()); + _fullPipeline.addStage(new MessageGenerationPhase()); _fullPipeline.addStage(new MessageSelectionStage()); _fullPipeline.addStage(new IntermediateStateCalcStage()); _fullPipeline.addStage(new MessageThrottleStage()); diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java index 1d164c5..40d5c97 100644 --- a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java +++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java @@ -33,11 +33,11 @@ import org.apache.helix.controller.stages.BaseStageTest; import org.apache.helix.controller.stages.BestPossibleStateCalcStage; import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.controller.stages.IntermediateStateCalcStage; +import org.apache.helix.controller.stages.MessageGenerationPhase; import org.apache.helix.controller.stages.MessageOutput; import org.apache.helix.controller.stages.MessageSelectionStage; import org.apache.helix.controller.stages.MessageThrottleStage; import org.apache.helix.controller.stages.ReadClusterDataStage; -import org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase; import org.apache.helix.model.BuiltInStateModelDefinitions; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.IdealState; @@ -87,13 +87,13 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest { _fullPipeline = new Pipeline("FullPipeline"); _fullPipeline.addStage(new ReadClusterDataStage()); _fullPipeline.addStage(new BestPossibleStateCalcStage()); - _fullPipeline.addStage(new ResourceMessageGenerationPhase()); + _fullPipeline.addStage(new MessageGenerationPhase()); _fullPipeline.addStage(new MessageSelectionStage()); _fullPipeline.addStage(new IntermediateStateCalcStage()); _fullPipeline.addStage(new MessageThrottleStage()); _messagePipeline = new Pipeline("MessagePipeline"); - _messagePipeline.addStage(new ResourceMessageGenerationPhase()); + _messagePipeline.addStage(new MessageGenerationPhase()); _messagePipeline.addStage(new MessageSelectionStage()); _messagePipeline.addStage(new IntermediateStateCalcStage()); _messagePipeline.addStage(new MessageThrottleStage()); diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java index cec0d2b..307022f 100644 --- a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java +++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java @@ -33,11 +33,11 @@ import org.apache.helix.controller.stages.BestPossibleStateCalcStage; import org.apache.helix.controller.stages.BestPossibleStateOutput; import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.controller.stages.IntermediateStateCalcStage; +import org.apache.helix.controller.stages.MessageGenerationPhase; import org.apache.helix.controller.stages.MessageOutput; import org.apache.helix.controller.stages.MessageSelectionStage; import org.apache.helix.controller.stages.MessageThrottleStage; import org.apache.helix.controller.stages.ReadClusterDataStage; -import org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase; import org.apache.helix.model.BuiltInStateModelDefinitions; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.IdealState; @@ -199,7 +199,7 @@ public class TestP2PStateTransitionMessages extends BaseStageTest { event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput); pipeline = new Pipeline("test"); - pipeline.addStage(new ResourceMessageGenerationPhase()); + pipeline.addStage(new MessageGenerationPhase()); pipeline.addStage(new MessageSelectionStage()); pipeline.addStage(new IntermediateStateCalcStage()); pipeline.addStage(new MessageThrottleStage()); @@ -223,7 +223,7 @@ public class TestP2PStateTransitionMessages extends BaseStageTest { pipeline = new Pipeline("test"); - pipeline.addStage(new ResourceMessageGenerationPhase()); + pipeline.addStage(new MessageGenerationPhase()); pipeline.addStage(new MessageSelectionStage()); pipeline.addStage(new IntermediateStateCalcStage()); pipeline.addStage(new MessageThrottleStage()); @@ -246,7 +246,7 @@ public class TestP2PStateTransitionMessages extends BaseStageTest { event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); pipeline = new Pipeline("test"); - pipeline.addStage(new ResourceMessageGenerationPhase()); + pipeline.addStage(new MessageGenerationPhase()); pipeline.addStage(new MessageSelectionStage()); pipeline.addStage(new MessageThrottleStage()); @@ -359,7 +359,7 @@ public class TestP2PStateTransitionMessages extends BaseStageTest { Pipeline pipeline = new Pipeline("test"); pipeline.addStage(new ReadClusterDataStage()); pipeline.addStage(new BestPossibleStateCalcStage()); - pipeline.addStage(new ResourceMessageGenerationPhase()); + pipeline.addStage(new MessageGenerationPhase()); pipeline.addStage(new MessageSelectionStage()); pipeline.addStage(new IntermediateStateCalcStage()); pipeline.addStage(new MessageThrottleStage()); diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java index f685bf4..ea2a4aa 100644 --- a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java +++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java @@ -35,9 +35,8 @@ import org.apache.helix.controller.stages.BestPossibleStateOutput; import org.apache.helix.controller.stages.ClusterEvent; import org.apache.helix.controller.stages.ClusterEventType; import org.apache.helix.controller.stages.CurrentStateOutput; -import org.apache.helix.controller.stages.IntermediateStateOutput; +import org.apache.helix.controller.stages.MessageGenerationPhase; import org.apache.helix.controller.stages.MessageOutput; -import org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase; import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.manager.zk.ZKHelixManager; import org.apache.helix.model.ClusterConfig; @@ -61,7 +60,7 @@ public class TestP2PWithStateCancellationMessage extends BaseStageTest { @Test public void testP2PWithStateCancellationMessage() { ClusterEvent event = generateClusterEvent(); - runStage(event, new ResourceMessageGenerationPhase()); + runStage(event, new MessageGenerationPhase()); MessageOutput messageOutput = event.getAttribute(AttributeName.MESSAGES_ALL.name()); // No message should be sent for partition 0 Assert.assertEquals(messageOutput.getMessages(RESOURCE_NAME, new Partition("0")).size(), 0); diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java index ccc1431..57a3ad0 100644 --- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java +++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java @@ -13,9 +13,9 @@ import org.apache.helix.controller.stages.BestPossibleStateCalcStage; import org.apache.helix.controller.stages.BestPossibleStateOutput; import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.controller.stages.IntermediateStateCalcStage; +import org.apache.helix.controller.stages.MessageGenerationPhase; import org.apache.helix.controller.stages.MessageSelectionStage; import org.apache.helix.controller.stages.ReadClusterDataStage; -import org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase; import org.apache.helix.model.BuiltInStateModelDefinitions; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.IdealState; @@ -79,7 +79,7 @@ public class TestRebalancerMetrics extends BaseStageTest { setupThrottleConfig(cache.getClusterConfig(), StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE, maxPending); runStage(event, new BestPossibleStateCalcStage()); - runStage(event, new ResourceMessageGenerationPhase()); + runStage(event, new MessageGenerationPhase()); runStage(event, new MessageSelectionStage()); runStage(event, new IntermediateStateCalcStage()); @@ -140,7 +140,7 @@ public class TestRebalancerMetrics extends BaseStageTest { setupThrottleConfig(cache.getClusterConfig(), StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE, maxPending); runStage(event, new BestPossibleStateCalcStage()); - runStage(event, new ResourceMessageGenerationPhase()); + runStage(event, new MessageGenerationPhase()); runStage(event, new MessageSelectionStage()); runStage(event, new IntermediateStateCalcStage());
