Repository: helix
Updated Branches:
  refs/heads/master f1c503712 -> 74145e8ad


http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
 
b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
index a88965b..c8048fa 100644
--- 
a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
+++ 
b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
@@ -21,6 +21,7 @@ package org.apache.helix.messaging.p2pMessage;
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executors;
 import org.apache.helix.HelixConstants;
 import org.apache.helix.controller.common.PartitionStateMap;
 import org.apache.helix.controller.pipeline.Pipeline;
@@ -54,8 +55,8 @@ public class TestP2PStateTransitionMessages extends 
BaseStageTest {
 
 
   private void preSetup() {
-    setupIdealState(3, new String[]{db}, numPartition, numReplica, 
IdealState.RebalanceMode.SEMI_AUTO,
-        BuiltInStateModelDefinitions.MasterSlave.name());
+    setupIdealState(3, new String[] { db }, numPartition, numReplica,
+        IdealState.RebalanceMode.SEMI_AUTO, 
BuiltInStateModelDefinitions.MasterSlave.name());
     setupStateModel();
     setupInstances(3);
     setupLiveInstances(3);
@@ -77,10 +78,194 @@ public class TestP2PStateTransitionMessages extends 
BaseStageTest {
     testP2PMessage(null, false);
   }
 
-  private void testP2PMessage(ClusterConfig clusterConfig, Boolean 
p2pMessageEnabled) throws Exception {
-    Map<String, Resource> resourceMap =
-        getResourceMap(new String[]{db}, numPartition, 
BuiltInStateModelDefinitions.MasterSlave.name(), clusterConfig,
-            null);
+  @Test
+  public void testAvoidDuplicatedMessageWithP2PEnabled() throws Exception {
+    preSetup();
+    ClusterConfig clusterConfig = new ClusterConfig(_clusterName);
+    clusterConfig.enableP2PMessage(true);
+    setClusterConfig(clusterConfig);
+
+    Map<String, Resource> resourceMap = getResourceMap(new String[] { db }, 
numPartition,
+        BuiltInStateModelDefinitions.MasterSlave.name(), clusterConfig, null);
+
+    ClusterDataCache cache = new ClusterDataCache();
+    cache.setAsyncTasksThreadPool(Executors.newSingleThreadExecutor());
+    event.addAttribute(AttributeName.ClusterDataCache.name(), cache);
+    event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
+    event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), 
resourceMap);
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), new 
CurrentStateOutput());
+    event.addAttribute(AttributeName.helixmanager.name(), manager);
+
+    Pipeline pipeline = createPipeline();
+    pipeline.handle(event);
+
+    BestPossibleStateOutput bestPossibleStateOutput =
+        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
+
+    CurrentStateOutput currentStateOutput =
+        populateCurrentStateFromBestPossible(bestPossibleStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
+    Partition p = new Partition(db + "_0");
+
+    String masterInstance = 
getTopStateInstance(bestPossibleStateOutput.getInstanceStateMap(db, p),
+        MasterSlaveSMD.States.MASTER.name());
+    Assert.assertNotNull(masterInstance);
+
+    admin.enableInstance(_clusterName, masterInstance, false);
+    cache = event.getAttribute(AttributeName.ClusterDataCache.name());
+    cache.notifyDataChange(HelixConstants.ChangeType.INSTANCE_CONFIG);
+
+    pipeline = createPipeline();
+    pipeline.handle(event);
+
+    bestPossibleStateOutput = 
event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
+
+    MessageSelectionStageOutput messageOutput =
+        event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+    List<Message> messages = messageOutput.getMessages(db, p);
+
+    Assert.assertEquals(messages.size(), 1);
+    Message toSlaveMessage = messages.get(0);
+    Assert.assertEquals(toSlaveMessage.getTgtName(), masterInstance);
+    Assert.assertEquals(toSlaveMessage.getFromState(), 
MasterSlaveSMD.States.MASTER.name());
+    Assert.assertEquals(toSlaveMessage.getToState(), 
MasterSlaveSMD.States.SLAVE.name());
+
+    // verify p2p message sent to the old master instance
+    Assert.assertEquals(toSlaveMessage.getRelayMessages().entrySet().size(), 
1);
+    String newMasterInstance =
+        getTopStateInstance(bestPossibleStateOutput.getInstanceStateMap(db, p),
+            MasterSlaveSMD.States.MASTER.name());
+
+    Message relayMessage = toSlaveMessage.getRelayMessage(newMasterInstance);
+    Assert.assertNotNull(relayMessage);
+    Assert.assertEquals(relayMessage.getMsgSubType(), 
Message.MessageType.RELAYED_MESSAGE.name());
+    Assert.assertEquals(relayMessage.getTgtName(), newMasterInstance);
+    Assert.assertEquals(relayMessage.getRelaySrcHost(), masterInstance);
+    Assert.assertEquals(relayMessage.getFromState(), 
MasterSlaveSMD.States.SLAVE.name());
+    Assert.assertEquals(relayMessage.getToState(), 
MasterSlaveSMD.States.MASTER.name());
+
+
+    // test the old master finish state transition, but has not forward p2p 
message yet.
+    currentStateOutput.setCurrentState(db, p, masterInstance, "SLAVE");
+    currentStateOutput.setPendingMessage(db, p, masterInstance, 
toSlaveMessage);
+    currentStateOutput.setPendingRelayMessage(db, p, masterInstance, 
relayMessage);
+
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
+    pipeline.handle(event);
+
+    messageOutput =
+        event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+    messages = messageOutput.getMessages(db, p);
+    Assert.assertEquals(messages.size(), 0);
+
+
+    currentStateOutput =
+        populateCurrentStateFromBestPossible(bestPossibleStateOutput);
+    currentStateOutput.setCurrentState(db, p, masterInstance, "SLAVE");
+    currentStateOutput.setPendingMessage(db, p, newMasterInstance, 
relayMessage);
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
+    pipeline.handle(event);
+
+    messageOutput =
+        event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+    messages = messageOutput.getMessages(db, p);
+    Assert.assertEquals(messages.size(), 1);
+
+    Message toOfflineMessage = messages.get(0);
+    Assert.assertEquals(toOfflineMessage.getTgtName(), masterInstance);
+    Assert.assertEquals(toOfflineMessage.getFromState(), 
MasterSlaveSMD.States.SLAVE.name());
+    Assert.assertEquals(toOfflineMessage.getToState(), 
MasterSlaveSMD.States.OFFLINE.name());
+
+
+    // Now, the old master finish state transition, but has not forward p2p 
message yet.
+    // Then the preference list has changed, so now the new master is 
different from previously calculated new master
+    // but controller should not send S->M to newly calculated master.
+    currentStateOutput.setCurrentState(db, p, masterInstance, "OFFLINE");
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
+    String slaveInstance =
+        getTopStateInstance(bestPossibleStateOutput.getInstanceStateMap(db, p),
+            MasterSlaveSMD.States.SLAVE.name());
+
+    Map<String, String> instanceStateMap = 
bestPossibleStateOutput.getInstanceStateMap(db, p);
+    instanceStateMap.put(newMasterInstance, "SLAVE");
+    instanceStateMap.put(slaveInstance, "MASTER");
+    bestPossibleStateOutput.setState(db, p, instanceStateMap);
+
+    event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), 
bestPossibleStateOutput);
+
+    pipeline = new Pipeline("test");
+    pipeline.addStage(new IntermediateStateCalcStage());
+    pipeline.addStage(new MessageGenerationPhase());
+    pipeline.addStage(new MessageSelectionStage());
+    pipeline.addStage(new MessageThrottleStage());
+
+    pipeline.handle(event);
+
+    messageOutput =
+        event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+    messages = messageOutput.getMessages(db, p);
+    Assert.assertEquals(messages.size(), 0);
+
+
+    // Now, the old master has forwarded the p2p master to previously 
calculated master,
+    // So the state-transition still happened in previously calculated master.
+    // Controller will not send S->M to new master.
+    currentStateOutput.setPendingMessage(db, p, newMasterInstance, 
relayMessage);
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
+    event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), 
bestPossibleStateOutput);
+    event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), 
bestPossibleStateOutput);
+
+
+    pipeline = new Pipeline("test");
+    pipeline.addStage(new IntermediateStateCalcStage());
+    pipeline.addStage(new MessageGenerationPhase());
+    pipeline.addStage(new MessageSelectionStage());
+    pipeline.addStage(new MessageThrottleStage());
+
+    pipeline.handle(event);
+
+    messageOutput =
+        event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+    messages = messageOutput.getMessages(db, p);
+    Assert.assertEquals(messages.size(), 0);
+
+
+    // now, the previous calculated master completed the state transition and 
deleted the p2p message.
+    // Controller should drop this master first.
+    currentStateOutput =
+        populateCurrentStateFromBestPossible(bestPossibleStateOutput);
+    currentStateOutput.setCurrentState(db, p, newMasterInstance, "MASTER");
+    currentStateOutput.setCurrentState(db, p, slaveInstance, "SLAVE");
+
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
+    pipeline = new Pipeline("test");
+    pipeline.addStage(new MessageGenerationPhase());
+    pipeline.addStage(new MessageSelectionStage());
+    pipeline.addStage(new MessageThrottleStage());
+
+    pipeline.handle(event);
+
+    messageOutput =
+        event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+    messages = messageOutput.getMessages(db, p);
+    Assert.assertEquals(messages.size(), 1);
+
+    toSlaveMessage = messages.get(0);
+    Assert.assertEquals(toSlaveMessage.getTgtName(), newMasterInstance);
+    Assert.assertEquals(toSlaveMessage.getFromState(), 
MasterSlaveSMD.States.MASTER.name());
+    Assert.assertEquals(toSlaveMessage.getToState(), 
MasterSlaveSMD.States.SLAVE.name());
+  }
+
+  private void testP2PMessage(ClusterConfig clusterConfig, Boolean 
p2pMessageEnabled)
+      throws Exception {
+    Map<String, Resource> resourceMap = getResourceMap(new String[] { db }, 
numPartition,
+        BuiltInStateModelDefinitions.MasterSlave.name(), clusterConfig, null);
 
     event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
     event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), 
resourceMap);
@@ -90,26 +275,30 @@ public class TestP2PStateTransitionMessages extends 
BaseStageTest {
     Pipeline pipeline = createPipeline();
     pipeline.handle(event);
 
-    BestPossibleStateOutput bestPossibleStateOutput = 
event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
+    BestPossibleStateOutput bestPossibleStateOutput =
+        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
 
-    CurrentStateOutput currentStateOutput = 
populateCurrentStateFromBestPossible(bestPossibleStateOutput);
+    CurrentStateOutput currentStateOutput =
+        populateCurrentStateFromBestPossible(bestPossibleStateOutput);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
 
     Partition p = new Partition(db + "_0");
 
-    String masterInstance =
-        getTopStateInstance(bestPossibleStateOutput.getInstanceStateMap(db, 
p), MasterSlaveSMD.States.MASTER.name());
+    String masterInstance = 
getTopStateInstance(bestPossibleStateOutput.getInstanceStateMap(db, p),
+        MasterSlaveSMD.States.MASTER.name());
     Assert.assertNotNull(masterInstance);
 
     admin.enableInstance(_clusterName, masterInstance, false);
     ClusterDataCache cache = 
event.getAttribute(AttributeName.ClusterDataCache.name());
     cache.notifyDataChange(HelixConstants.ChangeType.INSTANCE_CONFIG);
 
+    pipeline = createPipeline();
     pipeline.handle(event);
 
     bestPossibleStateOutput = 
event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
 
-    MessageSelectionStageOutput messageOutput = 
event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+    MessageSelectionStageOutput messageOutput =
+        event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
     List<Message> messages = messageOutput.getMessages(db, p);
 
     Assert.assertEquals(messages.size(), 1);
@@ -121,7 +310,8 @@ public class TestP2PStateTransitionMessages extends 
BaseStageTest {
     if (p2pMessageEnabled) {
       Assert.assertEquals(message.getRelayMessages().entrySet().size(), 1);
       String newMasterInstance =
-          getTopStateInstance(bestPossibleStateOutput.getInstanceStateMap(db, 
p), MasterSlaveSMD.States.MASTER.name());
+          getTopStateInstance(bestPossibleStateOutput.getInstanceStateMap(db, 
p),
+              MasterSlaveSMD.States.MASTER.name());
 
       Message relayMessage = message.getRelayMessage(newMasterInstance);
       Assert.assertNotNull(relayMessage);
@@ -146,7 +336,8 @@ public class TestP2PStateTransitionMessages extends 
BaseStageTest {
     return masterInstance;
   }
 
-  private CurrentStateOutput 
populateCurrentStateFromBestPossible(BestPossibleStateOutput 
bestPossibleStateOutput) {
+  private CurrentStateOutput populateCurrentStateFromBestPossible(
+      BestPossibleStateOutput bestPossibleStateOutput) {
     CurrentStateOutput currentStateOutput = new CurrentStateOutput();
     for (String resource : 
bestPossibleStateOutput.getResourceStatesMap().keySet()) {
       PartitionStateMap partitionStateMap = 
bestPossibleStateOutput.getPartitionStateMap(resource);

Reply via email to