Repository: helix Updated Branches: refs/heads/master c701a9456 -> b91d6eee4
Refactoring MessageGenerationOutput and MessageSelectStageOutput Since these two outputs share same functionality and APIs. Refactor them into one class MessageOutput class. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/a4e0cdd2 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/a4e0cdd2 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/a4e0cdd2 Branch: refs/heads/master Commit: a4e0cdd28a3d70195a0f69021b77cde580dc7d50 Parents: c701a94 Author: Junkai Xue <[email protected]> Authored: Mon Aug 6 13:59:36 2018 -0700 Committer: Lei Xia <[email protected]> Committed: Fri Sep 21 14:08:40 2018 -0700 ---------------------------------------------------------------------- .../stages/MessageGenerationOutput.java | 65 ----------------- .../stages/MessageGenerationPhase.java | 2 +- .../helix/controller/stages/MessageOutput.java | 73 ++++++++++++++++++++ .../stages/MessageSelectionStage.java | 5 +- .../stages/MessageSelectionStageOutput.java | 59 ---------------- .../controller/stages/MessageThrottleStage.java | 4 +- .../stages/MessageThrottleStageOutput.java | 52 -------------- .../controller/stages/TaskAssignmentStage.java | 2 +- .../stages/TestMessageThrottleStage.java | 8 +-- .../stages/TestRebalancePipeline.java | 8 +-- .../TestP2PMessagesAvoidDuplicatedMessage.java | 4 +- .../TestP2PStateTransitionMessages.java | 6 +- 12 files changed, 92 insertions(+), 196 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/a4e0cdd2/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationOutput.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationOutput.java deleted file mode 100644 index 359a959..0000000 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationOutput.java +++ /dev/null @@ -1,65 +0,0 @@ -package org.apache.helix.controller.stages; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.helix.model.Message; -import org.apache.helix.model.Partition; - -public class MessageGenerationOutput { - - private final Map<String, Map<Partition, List<Message>>> _messagesMap; - - public MessageGenerationOutput() { - _messagesMap = new HashMap<String, Map<Partition, List<Message>>>(); - - } - - public void addMessage(String resourceName, Partition partition, Message message) { - if (!_messagesMap.containsKey(resourceName)) { - _messagesMap.put(resourceName, new HashMap<Partition, List<Message>>()); - } - if (!_messagesMap.get(resourceName).containsKey(partition)) { - _messagesMap.get(resourceName).put(partition, new ArrayList<Message>()); - - } - _messagesMap.get(resourceName).get(partition).add(message); - - } - - public List<Message> getMessages(String resourceName, Partition resource) { - Map<Partition, List<Message>> map = _messagesMap.get(resourceName); - if (map != null) { - return map.get(resource); - } - return Collections.emptyList(); - - } - - @Override - public String toString() { - return _messagesMap.toString(); - } -} http://git-wip-us.apache.org/repos/asf/helix/blob/a4e0cdd2/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java index 3abc965..85059ca 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java @@ -87,7 +87,7 @@ public class MessageGenerationPhase extends AbstractBaseStage { for (LiveInstance liveInstance : liveInstances.values()) { sessionIdMap.put(liveInstance.getInstanceName(), liveInstance.getSessionId()); } - MessageGenerationOutput output = new MessageGenerationOutput(); + MessageOutput output = new MessageOutput(); for (String resourceName : resourceMap.keySet()) { Resource resource = resourceMap.get(resourceName); http://git-wip-us.apache.org/repos/asf/helix/blob/a4e0cdd2/helix-core/src/main/java/org/apache/helix/controller/stages/MessageOutput.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageOutput.java new file mode 100644 index 0000000..76b6e98 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageOutput.java @@ -0,0 +1,73 @@ +package org.apache.helix.controller.stages; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.helix.model.Message; +import org.apache.helix.model.Partition; + +public class MessageOutput { + + private final Map<String, Map<Partition, List<Message>>> _messagesMap; + + public MessageOutput() { + _messagesMap = new HashMap<>(); + + } + + public void addMessage(String resourceName, Partition partition, Message message) { + if (!_messagesMap.containsKey(resourceName)) { + _messagesMap.put(resourceName, new HashMap<Partition, List<Message>>()); + } + if (!_messagesMap.get(resourceName).containsKey(partition)) { + _messagesMap.get(resourceName).put(partition, new ArrayList<Message>()); + + } + _messagesMap.get(resourceName).get(partition).add(message); + + } + + public void addMessages(String resourceName, Partition partition, + List<Message> messages) { + if (!_messagesMap.containsKey(resourceName)) { + _messagesMap.put(resourceName, new HashMap<Partition, List<Message>>()); + } + _messagesMap.get(resourceName).put(partition, messages); + } + + public List<Message> getMessages(String resourceName, Partition resource) { + Map<Partition, List<Message>> map = _messagesMap.get(resourceName); + if (map != null && map.get(resource) != null) { + return map.get(resource); + } + return Collections.emptyList(); + + } + + @Override + public String toString() { + return _messagesMap.toString(); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/a4e0cdd2/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java index 03838f4..eeb36cd 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java @@ -26,7 +26,6 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.TreeMap; import org.apache.helix.controller.LogUtil; @@ -66,7 +65,7 @@ public class MessageSelectionStage extends AbstractBaseStage { Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name()); CurrentStateOutput currentStateOutput = event.getAttribute(AttributeName.CURRENT_STATE.name()); - MessageGenerationOutput messageGenOutput = + MessageOutput messageGenOutput = event.getAttribute(AttributeName.MESSAGES_ALL.name()); if (cache == null || resourceMap == null || currentStateOutput == null || messageGenOutput == null) { @@ -74,7 +73,7 @@ public class MessageSelectionStage extends AbstractBaseStage { + ". Requires DataCache|RESOURCES|CURRENT_STATE|MESSAGES_ALL"); } - MessageSelectionStageOutput output = new MessageSelectionStageOutput(); + MessageOutput output = new MessageOutput(); for (String resourceName : resourceMap.keySet()) { Resource resource = resourceMap.get(resourceName); http://git-wip-us.apache.org/repos/asf/helix/blob/a4e0cdd2/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStageOutput.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStageOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStageOutput.java deleted file mode 100644 index 7ea545c..0000000 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStageOutput.java +++ /dev/null @@ -1,59 +0,0 @@ -package org.apache.helix.controller.stages; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.helix.model.Message; -import org.apache.helix.model.Partition; - -public class MessageSelectionStageOutput { - private final Map<String, Map<Partition, List<Message>>> _messagesMap; - - public MessageSelectionStageOutput() { - _messagesMap = new HashMap<String, Map<Partition, List<Message>>>(); - } - - public void addMessages(String resourceName, Partition partition, List<Message> selectedMessages) { - if (!_messagesMap.containsKey(resourceName)) { - _messagesMap.put(resourceName, new HashMap<Partition, List<Message>>()); - } - _messagesMap.get(resourceName).put(partition, selectedMessages); - - } - - public List<Message> getMessages(String resourceName, Partition partition) { - Map<Partition, List<Message>> map = _messagesMap.get(resourceName); - if (map != null) { - return map.get(partition); - } - return Collections.emptyList(); - - } - - @Override - public String toString() { - return _messagesMap.toString(); - } - -} http://git-wip-us.apache.org/repos/asf/helix/blob/a4e0cdd2/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java index c504199..d68c3df 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java @@ -118,7 +118,7 @@ public class MessageThrottleStage extends AbstractBaseStage { public void process(ClusterEvent event) throws Exception { _eventId = event.getEventId(); ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name()); - MessageSelectionStageOutput msgSelectionOutput = + MessageOutput msgSelectionOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name()); Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name()); @@ -127,7 +127,7 @@ public class MessageThrottleStage extends AbstractBaseStage { + ". Requires ClusterDataCache|RESOURCES|MESSAGES_SELECTED"); } - MessageThrottleStageOutput output = new MessageThrottleStageOutput(); + MessageOutput output = new MessageOutput(); ClusterConstraints constraint = cache.getConstraint(ConstraintType.MESSAGE_CONSTRAINT); Map<String, Integer> throttleCounterMap = new HashMap<String, Integer>(); http://git-wip-us.apache.org/repos/asf/helix/blob/a4e0cdd2/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStageOutput.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStageOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStageOutput.java deleted file mode 100644 index 7415944..0000000 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStageOutput.java +++ /dev/null @@ -1,52 +0,0 @@ -package org.apache.helix.controller.stages; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.helix.model.Message; -import org.apache.helix.model.Partition; - -public class MessageThrottleStageOutput { - private final Map<String, Map<Partition, List<Message>>> _messagesMap; - - public MessageThrottleStageOutput() { - _messagesMap = new HashMap<String, Map<Partition, List<Message>>>(); - } - - public void addMessages(String resourceName, Partition partition, List<Message> selectedMessages) { - if (!_messagesMap.containsKey(resourceName)) { - _messagesMap.put(resourceName, new HashMap<Partition, List<Message>>()); - } - _messagesMap.get(resourceName).put(partition, selectedMessages); - - } - - public List<Message> getMessages(String resourceName, Partition partition) { - Map<Partition, List<Message>> map = _messagesMap.get(resourceName); - if (map != null) { - return map.get(partition); - } - return Collections.emptyList(); - } -} http://git-wip-us.apache.org/repos/asf/helix/blob/a4e0cdd2/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java index 6d483a0..1378ed5 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java @@ -50,7 +50,7 @@ public class TaskAssignmentStage extends AbstractBaseStage { HelixManager manager = event.getAttribute(AttributeName.helixmanager.name()); Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name()); - MessageThrottleStageOutput messageOutput = + MessageOutput messageOutput = event.getAttribute(AttributeName.MESSAGES_THROTTLE.name()); ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name()); Map<String, LiveInstance> liveInstanceMap = cache.getLiveInstances(); http://git-wip-us.apache.org/repos/asf/helix/blob/a4e0cdd2/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java index 6aed482..a4a4d54 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java @@ -101,7 +101,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase { } catch (Exception e) { // OK } - MessageSelectionStageOutput msgSelectOutput = new MessageSelectionStageOutput(); + MessageOutput msgSelectOutput = new MessageOutput(); List<Message> selectMessages = new ArrayList<Message>(); Message msg = createMessage(MessageType.STATE_TRANSITION, "msgId-001", "OFFLINE", "SLAVE", "TestDB", @@ -113,7 +113,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase { runStage(event, throttleStage); - MessageThrottleStageOutput msgThrottleOutput = + MessageOutput msgThrottleOutput = event.getAttribute(AttributeName.MESSAGES_THROTTLE.name()); Assert.assertEquals(msgThrottleOutput.getMessages("TestDB", new Partition("TestDB_0")).size(), 1); @@ -268,7 +268,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase { dataRefresh.addStage(new ReadClusterDataStage()); runPipeline(event, dataRefresh); runStage(event, new ResourceComputationStage()); - MessageSelectionStageOutput msgSelectOutput = new MessageSelectionStageOutput(); + MessageOutput msgSelectOutput = new MessageOutput(); Message msg3 = createMessage(MessageType.STATE_TRANSITION, "msgId-003", "OFFLINE", "SLAVE", "TestDB", @@ -299,7 +299,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase { runStage(event, throttleStage); - MessageThrottleStageOutput msgThrottleOutput = + MessageOutput msgThrottleOutput = event.getAttribute(AttributeName.MESSAGES_THROTTLE.name()); List<Message> throttleMessages = msgThrottleOutput.getMessages("TestDB", new Partition("TestDB_0")); http://git-wip-us.apache.org/repos/asf/helix/blob/a4e0cdd2/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java index 90363c1..1c2e06b 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java @@ -96,7 +96,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase { runPipeline(event, dataRefresh); runPipeline(event, rebalancePipeline); - MessageSelectionStageOutput msgSelOutput = + MessageOutput msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name()); List<Message> messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0")); @@ -281,7 +281,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase { runPipeline(event, dataRefresh); runPipeline(event, rebalancePipeline); - MessageSelectionStageOutput msgSelOutput = + MessageOutput msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name()); List<Message> messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0")); @@ -375,7 +375,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase { runPipeline(event, dataRefresh); runPipeline(event, rebalancePipeline); - MessageSelectionStageOutput msgSelOutput = + MessageOutput msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name()); List<Message> messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0")); @@ -455,7 +455,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase { runPipeline(event, dataRefresh); runPipeline(event, rebalancePipeline); - MessageSelectionStageOutput msgSelOutput = + MessageOutput msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name()); List<Message> messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0")); http://git-wip-us.apache.org/repos/asf/helix/blob/a4e0cdd2/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java index bc1e554..c773efe 100644 --- a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java +++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java @@ -39,7 +39,7 @@ import org.apache.helix.controller.stages.IntermediateStateCalcStage; import org.apache.helix.controller.stages.IntermediateStateOutput; import org.apache.helix.controller.stages.MessageGenerationPhase; import org.apache.helix.controller.stages.MessageSelectionStage; -import org.apache.helix.controller.stages.MessageSelectionStageOutput; +import org.apache.helix.controller.stages.MessageOutput; import org.apache.helix.controller.stages.MessageThrottleStage; import org.apache.helix.controller.stages.ReadClusterDataStage; import org.apache.helix.model.BuiltInStateModelDefinitions; @@ -133,7 +133,7 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest { _bestpossibleState = event.getAttribute(AttributeName.INTERMEDIATE_STATE.name()); - MessageSelectionStageOutput messageOutput = + MessageOutput messageOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name()); List<Message> messages = messageOutput.getMessages(_db, _partition); http://git-wip-us.apache.org/repos/asf/helix/blob/a4e0cdd2/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java index c8048fa..b843331 100644 --- a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java +++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java @@ -34,7 +34,7 @@ import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.controller.stages.IntermediateStateCalcStage; import org.apache.helix.controller.stages.MessageGenerationPhase; import org.apache.helix.controller.stages.MessageSelectionStage; -import org.apache.helix.controller.stages.MessageSelectionStageOutput; +import org.apache.helix.controller.stages.MessageOutput; import org.apache.helix.controller.stages.MessageThrottleStage; import org.apache.helix.controller.stages.ReadClusterDataStage; import org.apache.helix.model.BuiltInStateModelDefinitions; @@ -121,7 +121,7 @@ public class TestP2PStateTransitionMessages extends BaseStageTest { bestPossibleStateOutput = event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name()); - MessageSelectionStageOutput messageOutput = + MessageOutput messageOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name()); List<Message> messages = messageOutput.getMessages(db, p); @@ -297,7 +297,7 @@ public class TestP2PStateTransitionMessages extends BaseStageTest { bestPossibleStateOutput = event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name()); - MessageSelectionStageOutput messageOutput = + MessageOutput messageOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name()); List<Message> messages = messageOutput.getMessages(db, p);
