Repository: helix Updated Branches: refs/heads/master 8dc19afb9 -> 266b8bb1a
Add test to validate no duplicated state transition message sent when p2p message is enabled. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/266b8bb1 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/266b8bb1 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/266b8bb1 Branch: refs/heads/master Commit: 266b8bb1ae30c23c4310886c111f826e225205f4 Parents: 8dc19af Author: Lei Xia <[email protected]> Authored: Fri Apr 27 09:50:22 2018 -0700 Committer: Junkai Xue <[email protected]> Committed: Tue Jun 26 15:46:59 2018 -0700 ---------------------------------------------------------------------- .../common/caches/InstanceMessagesCache.java | 6 +- .../TestP2PMessagesAvoidDuplicatedMessage.java | 301 +++++++++++++++++++ 2 files changed, 302 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/266b8bb1/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java index 69094d3..1929776 100644 --- a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java +++ b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java @@ -171,11 +171,7 @@ public class InstanceMessagesCache { for (Message relayMsg : message.getRelayMessages().values()) { relayMsg.setRelayTime(transitionCompleteTime); - if (!relayMsg.isExpired()) { - relayMessages.add(relayMsg); - } else { - LOG.info("Relay message " + relayMsg.getId() + " already expired, ignore it!"); - } + relayMessages.add(relayMsg); } } } http://git-wip-us.apache.org/repos/asf/helix/blob/266b8bb1/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java new file mode 100644 index 0000000..fb84117 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java @@ -0,0 +1,301 @@ +package org.apache.helix.messaging.p2pMessage; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import org.apache.helix.HelixConstants; +import org.apache.helix.controller.common.PartitionStateMap; +import org.apache.helix.controller.common.ResourcesStateMap; +import org.apache.helix.controller.pipeline.Pipeline; +import org.apache.helix.controller.stages.AttributeName; +import org.apache.helix.controller.stages.BaseStageTest; +import org.apache.helix.controller.stages.BestPossibleStateCalcStage; +import org.apache.helix.controller.stages.BestPossibleStateOutput; +import org.apache.helix.controller.stages.ClusterDataCache; +import org.apache.helix.controller.stages.CurrentStateOutput; +import org.apache.helix.controller.stages.IntermediateStateCalcStage; +import org.apache.helix.controller.stages.IntermediateStateOutput; +import org.apache.helix.controller.stages.MessageGenerationPhase; +import org.apache.helix.controller.stages.MessageSelectionStage; +import org.apache.helix.controller.stages.MessageSelectionStageOutput; +import org.apache.helix.controller.stages.MessageThrottleStage; +import org.apache.helix.controller.stages.ReadClusterDataStage; +import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.MasterSlaveSMD; +import org.apache.helix.model.Message; +import org.apache.helix.model.Partition; +import org.apache.helix.model.Resource; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest { + String _db = "testDB"; + int _numPartition = 1; + int _numReplica = 3; + + Partition _partition = new Partition(_db + "_0"); + + ClusterDataCache _dataCache; + Pipeline _fullPipeline; + Pipeline _messagePipeline; + + ResourcesStateMap _bestpossibleState; + + private void preSetup() throws Exception { + setupIdealState(3, new String[] { _db }, _numPartition, _numReplica, + IdealState.RebalanceMode.SEMI_AUTO, BuiltInStateModelDefinitions.MasterSlave.name()); + setupStateModel(); + setupInstances(3); + setupLiveInstances(3); + + ClusterConfig clusterConfig = new ClusterConfig(_clusterName); + clusterConfig.enableP2PMessage(true); + setClusterConfig(clusterConfig); + + Map<String, Resource> resourceMap = getResourceMap(new String[] { _db }, _numPartition, + BuiltInStateModelDefinitions.MasterSlave.name(), clusterConfig, null); + + _dataCache = new ClusterDataCache(); + _dataCache.setAsyncTasksThreadPool(Executors.newSingleThreadExecutor()); + + event.addAttribute(AttributeName.ClusterDataCache.name(), _dataCache); + event.addAttribute(AttributeName.RESOURCES.name(), resourceMap); + event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap); + event.addAttribute(AttributeName.CURRENT_STATE.name(), new CurrentStateOutput()); + event.addAttribute(AttributeName.helixmanager.name(), manager); + + _fullPipeline = new Pipeline("FullPipeline"); + _fullPipeline.addStage(new ReadClusterDataStage()); + _fullPipeline.addStage(new BestPossibleStateCalcStage()); + _fullPipeline.addStage(new IntermediateStateCalcStage()); + _fullPipeline.addStage(new MessageGenerationPhase()); + _fullPipeline.addStage(new MessageSelectionStage()); + _fullPipeline.addStage(new MessageThrottleStage()); + + _messagePipeline = new Pipeline("MessagePipeline"); + _messagePipeline.addStage(new MessageGenerationPhase()); + _messagePipeline.addStage(new MessageSelectionStage()); + _messagePipeline.addStage(new MessageThrottleStage()); + + + _fullPipeline.handle(event); + _bestpossibleState = + event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name()); + } + + + @Test + public void testP2PAvoidDuplicatedMessage() throws Exception { + preSetup(); + + // Scenario 1: + // Disable old master ((initialMaster) instance, + // Validate: a M->S message should be sent to initialMaster with a P2P message attached for secondMaster. + String initialMaster = getTopStateInstance(_bestpossibleState.getInstanceStateMap(_db, _partition), + MasterSlaveSMD.States.MASTER.name()); + Assert.assertNotNull(initialMaster); + + // disable existing master instance + admin.enableInstance(_clusterName, initialMaster, false); + _dataCache = event.getAttribute(AttributeName.ClusterDataCache.name()); + _dataCache.notifyDataChange(HelixConstants.ChangeType.INSTANCE_CONFIG); + + CurrentStateOutput currentStateOutput = + populateCurrentStateFromBestPossible(_bestpossibleState); + event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); + + _fullPipeline.handle(event); + + _bestpossibleState = event.getAttribute(AttributeName.INTERMEDIATE_STATE.name()); + + MessageSelectionStageOutput messageOutput = + event.getAttribute(AttributeName.MESSAGES_SELECTED.name()); + List<Message> messages = messageOutput.getMessages(_db, _partition); + + Assert.assertEquals(messages.size(), 1); + Message toSlaveMessage = messages.get(0); + Assert.assertEquals(toSlaveMessage.getTgtName(), initialMaster); + Assert.assertEquals(toSlaveMessage.getFromState(), MasterSlaveSMD.States.MASTER.name()); + Assert.assertEquals(toSlaveMessage.getToState(), MasterSlaveSMD.States.SLAVE.name()); + + // verify p2p message are attached to the M->S message sent to the old master instance + Assert.assertEquals(toSlaveMessage.getRelayMessages().entrySet().size(), 1); + String secondMaster = + getTopStateInstance(_bestpossibleState.getInstanceStateMap(_db, _partition), MasterSlaveSMD.States.MASTER.name()); + + Message relayMessage = toSlaveMessage.getRelayMessage(secondMaster); + Assert.assertNotNull(relayMessage); + Assert.assertEquals(relayMessage.getMsgSubType(), Message.MessageType.RELAYED_MESSAGE.name()); + Assert.assertEquals(relayMessage.getTgtName(), secondMaster); + Assert.assertEquals(relayMessage.getRelaySrcHost(), initialMaster); + Assert.assertEquals(relayMessage.getFromState(), MasterSlaveSMD.States.SLAVE.name()); + Assert.assertEquals(relayMessage.getToState(), MasterSlaveSMD.States.MASTER.name()); + + // Scenario 2: + // Old master (initialMaster) completes the M->S transition, + // but has not forward p2p message to new master (secondMaster) yet. + // Validate: Controller should not send S->M message to new master. + + currentStateOutput.setCurrentState(_db, _partition, initialMaster, "SLAVE"); + currentStateOutput.setPendingState(_db, _partition, initialMaster, toSlaveMessage); + + event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); + + _fullPipeline.handle(event); + + messageOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name()); + messages = messageOutput.getMessages(_db, _partition); + Assert.assertEquals(messages.size(), 0); + + + // Scenario 3: + // Old master (initialMaster) completes the M->S transition, + // and has already forwarded p2p message to new master (secondMaster) + // The original S->M message sent to old master has been removed. + // Validate: Controller should send S->O to old master, but not S->M message to new master. + currentStateOutput = + populateCurrentStateFromBestPossible(_bestpossibleState); + currentStateOutput.setCurrentState(_db, _partition, initialMaster, "SLAVE"); + currentStateOutput.setPendingState(_db, _partition, secondMaster, relayMessage); + event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); + + _fullPipeline.handle(event); + + messageOutput = + event.getAttribute(AttributeName.MESSAGES_SELECTED.name()); + messages = messageOutput.getMessages(_db, _partition); + Assert.assertEquals(messages.size(), 1); + + Message toOfflineMessage = messages.get(0); + Assert.assertEquals(toOfflineMessage.getTgtName(), initialMaster); + Assert.assertEquals(toOfflineMessage.getFromState(), MasterSlaveSMD.States.SLAVE.name()); + Assert.assertEquals(toOfflineMessage.getToState(), MasterSlaveSMD.States.OFFLINE.name()); + + + // Scenario 4: + // The old master (initialMaster) finish state transition, but has not forward p2p message yet. + // Then the preference list has changed, so now the new master (thirdMaster) is different from previously calculated new master (secondMaster) + // Validate: controller should not send S->M to thirdMaster. + currentStateOutput.setCurrentState(_db, _partition, initialMaster, "OFFLINE"); + event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); + + String thirdMaster = + getTopStateInstance(_bestpossibleState.getInstanceStateMap(_db, _partition), + MasterSlaveSMD.States.SLAVE.name()); + + Map<String, String> instanceStateMap = _bestpossibleState.getInstanceStateMap(_db, _partition); + instanceStateMap.put(secondMaster, "SLAVE"); + instanceStateMap.put(thirdMaster, "MASTER"); + _bestpossibleState.setState(_db, _partition, instanceStateMap); + + event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), _bestpossibleState); + + _messagePipeline.handle(event); + + messageOutput = + event.getAttribute(AttributeName.MESSAGES_SELECTED.name()); + messages = messageOutput.getMessages(_db, _partition); + Assert.assertEquals(messages.size(), 0); + + + // Scenario 5: + // The initial master has forwarded the p2p message to secondMaster and deleted original M->S message on initialMaster, + // But the S->M state-transition has not completed yet in secondMaster. + // Validate: Controller should not send S->M to thirdMaster. + currentStateOutput.setPendingState(_db, _partition, secondMaster, relayMessage); + event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); + + event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), _bestpossibleState); + + _messagePipeline.handle(event); + + messageOutput = + event.getAttribute(AttributeName.MESSAGES_SELECTED.name()); + messages = messageOutput.getMessages(_db, _partition); + Assert.assertEquals(messages.size(), 0); + + + // Scenario 5: + // The thirdMaster completed the state transition and deleted the p2p message. + // Validate: Controller should M->S message to secondMaster. + currentStateOutput = + populateCurrentStateFromBestPossible(_bestpossibleState); + currentStateOutput.setCurrentState(_db, _partition, secondMaster, "MASTER"); + currentStateOutput.setCurrentState(_db, _partition, thirdMaster, "SLAVE"); + + event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); + + _messagePipeline.handle(event); + + messageOutput = + event.getAttribute(AttributeName.MESSAGES_SELECTED.name()); + messages = messageOutput.getMessages(_db, _partition); + Assert.assertEquals(messages.size(), 1); + + toSlaveMessage = messages.get(0); + Assert.assertEquals(toSlaveMessage.getTgtName(), secondMaster); + Assert.assertEquals(toSlaveMessage.getFromState(), MasterSlaveSMD.States.MASTER.name()); + Assert.assertEquals(toSlaveMessage.getToState(), MasterSlaveSMD.States.SLAVE.name()); + + // verify p2p message are attached to the M->S message sent to the secondMaster + Assert.assertEquals(toSlaveMessage.getRelayMessages().entrySet().size(), 1); + + relayMessage = toSlaveMessage.getRelayMessage(thirdMaster); + Assert.assertNotNull(relayMessage); + Assert.assertEquals(relayMessage.getMsgSubType(), Message.MessageType.RELAYED_MESSAGE.name()); + Assert.assertEquals(relayMessage.getTgtName(), thirdMaster); + Assert.assertEquals(relayMessage.getRelaySrcHost(), secondMaster); + Assert.assertEquals(relayMessage.getFromState(), MasterSlaveSMD.States.SLAVE.name()); + Assert.assertEquals(relayMessage.getToState(), MasterSlaveSMD.States.MASTER.name()); + } + + + private String getTopStateInstance(Map<String, String> instanceStateMap, String topState) { + String masterInstance = null; + for (Map.Entry<String, String> e : instanceStateMap.entrySet()) { + if (topState.equals(e.getValue())) { + masterInstance = e.getKey(); + } + } + + return masterInstance; + } + + private CurrentStateOutput populateCurrentStateFromBestPossible( + ResourcesStateMap bestPossibleStateOutput) { + CurrentStateOutput currentStateOutput = new CurrentStateOutput(); + for (String resource : bestPossibleStateOutput.getResourceStatesMap().keySet()) { + PartitionStateMap partitionStateMap = bestPossibleStateOutput.getPartitionStateMap(resource); + for (Partition p : partitionStateMap.partitionSet()) { + Map<String, String> stateMap = partitionStateMap.getPartitionMap(p); + + for (Map.Entry<String, String> e : stateMap.entrySet()) { + currentStateOutput.setCurrentState(resource, p, e.getKey(), e.getValue()); + } + } + } + return currentStateOutput; + } +}
