Fix P2P message logic in controller to avoid sending duplicated messages to 
participants.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/880f8851
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/880f8851
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/880f8851

Branch: refs/heads/master
Commit: 880f885121afecab4e186282fbf94a146a2cf04a
Parents: f1c5037
Author: Lei Xia <[email protected]>
Authored: Tue Apr 24 18:18:40 2018 -0700
Committer: Lei Xia <[email protected]>
Committed: Mon Sep 17 15:08:28 2018 -0700

----------------------------------------------------------------------
 .../common/caches/InstanceMessagesCache.java    | 146 +++++++--
 .../controller/stages/ClusterDataCache.java     |  12 +-
 .../stages/CurrentStateComputationStage.java    |  57 +++-
 .../controller/stages/CurrentStateOutput.java   |  58 ++--
 .../stages/MessageGenerationPhase.java          |  17 +-
 .../stages/MessageSelectionStage.java           |  31 +-
 .../controller/stages/TaskAssignmentStage.java  |   7 +-
 .../messaging/handling/HelixTaskExecutor.java   |  22 +-
 .../java/org/apache/helix/model/Message.java    |  12 +-
 .../helix/task/AbstractTaskDispatcher.java      |   2 +-
 .../helix/task/DeprecatedTaskRebalancer.java    |   2 +-
 .../FixedTargetTaskAssignmentCalculator.java    |   4 +-
 .../org/apache/helix/task/JobRebalancer.java    |   5 +-
 .../ZkHelixClusterVerifier.java                 |   2 +-
 .../org/apache/helix/common/ZkTestBase.java     |  34 ++
 .../TestCurrentStateComputationStage.java       |   2 +-
 .../stages/TestMsgSelectionStage.java           |   5 +-
 .../messaging/TestP2PMessageSemiAuto.java       |  68 ++--
 .../messaging/TestP2PNoDuplicatedMessage.java   | 315 +++++++++++++++++++
 .../PartitionMigration/TestExpandCluster.java   |   2 -
 .../handling/MockHelixTaskExecutor.java         | 111 +++++++
 .../TestP2PMessagesAvoidDuplicatedMessage.java  |  13 +-
 .../TestP2PStateTransitionMessages.java         | 217 ++++++++++++-
 23 files changed, 990 insertions(+), 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
 
b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
index f8001da..13b77cc 100644
--- 
a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
+++ 
b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
@@ -22,7 +22,7 @@ package org.apache.helix.common.caches;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -43,9 +43,16 @@ import org.slf4j.LoggerFactory;
 public class InstanceMessagesCache {
   private static final Logger LOG = 
LoggerFactory.getLogger(InstanceMessagesCache.class.getName());
   private Map<String, Map<String, Message>> _messageMap;
+  private Map<String, Map<String, Message>> _relayMessageMap;
 
   // maintain a cache of participant messages across pipeline runs
+  // <instance -> {<MessageId, Message>}>
   private Map<String, Map<String, Message>> _messageCache = Maps.newHashMap();
+
+  // maintain a set of valid pending P2P messages.
+  // <instance -> {<MessageId, Message>}>
+  private Map<String, Map<String, Message>> _relayMessageCache = 
Maps.newHashMap();
+
   private String _clusterName;
 
   public InstanceMessagesCache(String clusterName) {
@@ -54,17 +61,15 @@ public class InstanceMessagesCache {
 
   /**
    * This refreshes all pending messages in the cluster by re-fetching the 
data from zookeeper in an
-   * efficient way
-   * current state must be refreshed before refreshing relay messages because 
we need to use current
-   * state to validate all relay messages.
+   * efficient way current state must be refreshed before refreshing relay 
messages because we need
+   * to use current state to validate all relay messages.
    *
    * @param accessor
    * @param liveInstanceMap
    *
    * @return
    */
-  public boolean refresh(HelixDataAccessor accessor,
-      Map<String, LiveInstance> liveInstanceMap) {
+  public boolean refresh(HelixDataAccessor accessor, Map<String, LiveInstance> 
liveInstanceMap) {
     LOG.info("START: InstanceMessagesCache.refresh()");
     long startTime = System.currentTimeMillis();
 
@@ -124,13 +129,16 @@ public class InstanceMessagesCache {
           System.currentTimeMillis() - startTime) + " ms.");
     }
 
+    LOG.info("END: InstanceMessagesCache.refresh()");
+
     return true;
   }
 
   // update all valid relay messages attached to existing state transition 
messages into message map.
   public void updateRelayMessages(Map<String, LiveInstance> liveInstanceMap,
       Map<String, Map<String, Map<String, CurrentState>>> currentStateMap) {
-    List<Message> relayMessages = new ArrayList<>();
+
+    // refresh _relayMessageCache
     for (String instance : _messageMap.keySet()) {
       Map<String, Message> instanceMessages = _messageMap.get(instance);
       Map<String, Map<String, CurrentState>> instanceCurrentStateMap =
@@ -170,51 +178,131 @@ public class InstanceMessagesCache {
 
           for (Message relayMsg : message.getRelayMessages().values()) {
             relayMsg.setRelayTime(transitionCompleteTime);
-            relayMessages.add(relayMsg);
+            cacheRelayMessage(relayMsg);
           }
         }
       }
     }
 
-    for (Message message : relayMessages) {
-      String instance = message.getTgtName();
-      Map<String, Message> instanceMessages = _messageMap.get(instance);
-      if (instanceMessages == null) {
-        instanceMessages = new HashMap<>();
-        _messageMap.put(instance, instanceMessages);
+    Map<String, Map<String, Message>> relayMessageMap = new HashMap<>();
+    // refresh _relayMessageMap
+    for (String instance : _relayMessageCache.keySet()) {
+      Map<String, Message> messages = _relayMessageCache.get(instance);
+      Map<String, Map<String, CurrentState>> instanceCurrentStateMap =
+          currentStateMap.get(instance);
+      if (instanceCurrentStateMap == null) {
+        continue;
+      }
+
+      Iterator<Map.Entry<String, Message>> iterator = 
messages.entrySet().iterator();
+      while (iterator.hasNext()) {
+        Message message = iterator.next().getValue();
+        String sessionId = message.getTgtSessionId();
+        String resourceName = message.getResourceName();
+        String partitionName = message.getPartitionName();
+        String targetState = message.getToState();
+        String instanceSessionId = 
liveInstanceMap.get(instance).getSessionId();
+
+        if (_messageMap.get(instance).containsKey(message.getMsgId())) {
+          // relay message has already been sent to target host
+          // remove the message from relayMessageCache.
+          LOG.info(
+              "Relay message has already been sent to target host, remove 
relay message from the cache"
+                  + message.getId());
+          iterator.remove();
+          continue;
+        }
+
+        if (!instanceSessionId.equals(sessionId)) {
+          LOG.info(
+              "Instance SessionId does not match, remove relay message from 
the cache" + message
+                  .getId());
+          iterator.remove();
+          continue;
+        }
+
+        Map<String, CurrentState> sessionCurrentStateMap = 
instanceCurrentStateMap.get(sessionId);
+        if (sessionCurrentStateMap == null) {
+          LOG.info("No sessionCurrentStateMap found, ignore relay message from 
the cache" + message
+              .getId());
+          continue;
+        }
+        CurrentState currentState = sessionCurrentStateMap.get(resourceName);
+        if (currentState != null && 
targetState.equals(currentState.getState(partitionName))) {
+          LOG.info("CurrentState " + currentState
+              + " match the target state of the relay message, remove relay 
from cache." + message
+              .getId());
+          iterator.remove();
+          continue;
+        }
+
+        if (message.isExpired()) {
+          LOG.info("relay message " + message.getId() + " expired, remove it 
from cache."
+              + message.getId());
+          iterator.remove();
+          continue;
+        }
+
+        if (!relayMessageMap.containsKey(instance)) {
+          relayMessageMap.put(instance, Maps.<String, Message>newHashMap());
+        }
+        relayMessageMap.get(instance).put(message.getMsgId(), message);
       }
-      instanceMessages.put(message.getId(), message);
     }
+
+    _relayMessageMap = Collections.unmodifiableMap(relayMessageMap);
   }
 
   /**
-   * Provides a list of current outstanding transitions on a given instance.
+   * Provides a list of current outstanding pending state transition messages 
on a given instance.
    *
    * @param instanceName
    *
    * @return
    */
   public Map<String, Message> getMessages(String instanceName) {
-    Map<String, Message> map = _messageMap.get(instanceName);
-    if (map != null) {
-      return map;
-    } else {
-      return Collections.emptyMap();
+    if (_messageMap.containsKey(instanceName)) {
+      return _messageMap.get(instanceName);
     }
+    return Collections.emptyMap();
   }
 
-  public void cacheMessages(List<Message> messages) {
+  /**
+   * Provides a list of current outstanding pending relay (p2p) messages on a 
given instance.
+   *
+   * @param instanceName
+   *
+   * @return
+   */
+  public Map<String, Message> getRelayMessages(String instanceName) {
+    if (_relayMessageMap.containsKey(instanceName)) {
+      return _relayMessageMap.get(instanceName);
+    }
+    return Collections.emptyMap();
+  }
+
+  public void cacheMessages(Collection<Message> messages) {
     for (Message message : messages) {
       String instanceName = message.getTgtName();
-      Map<String, Message> instMsgMap;
-      if (_messageCache.containsKey(instanceName)) {
-        instMsgMap = _messageCache.get(instanceName);
-      } else {
-        instMsgMap = Maps.newHashMap();
-        _messageCache.put(instanceName, instMsgMap);
+      if (!_messageCache.containsKey(instanceName)) {
+        _messageCache.put(instanceName, Maps.<String, Message>newHashMap());
       }
-      instMsgMap.put(message.getId(), message);
+      _messageCache.get(instanceName).put(message.getId(), message);
+
+      if (message.hasRelayMessages()) {
+        for (Message relayMsg : message.getRelayMessages().values()) {
+          cacheRelayMessage(relayMsg);
+        }
+      }
+    }
+  }
+
+  protected void cacheRelayMessage(Message message) {
+    String instanceName = message.getTgtName();
+    if (!_relayMessageCache.containsKey(instanceName)) {
+      _relayMessageCache.put(instanceName, Maps.<String, Message>newHashMap());
     }
+    _relayMessageCache.get(instanceName).put(message.getId(), message);
   }
 
   @Override public String toString() {

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 3e6bd86..577b2c7 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -20,6 +20,7 @@ package org.apache.helix.controller.stages;
  */
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -476,7 +477,16 @@ public class ClusterDataCache {
     return _instanceMessagesCache.getMessages(instanceName);
   }
 
-  public void cacheMessages(List<Message> messages) {
+  /**
+   * Provides a list of current outstanding pending relay messages on a given 
instance.
+   * @param instanceName
+   * @return
+   */
+  public Map<String, Message> getRelayMessages(String instanceName) {
+    return _instanceMessagesCache.getRelayMessages(instanceName);
+  }
+
+  public void cacheMessages(Collection<Message> messages) {
     _instanceMessagesCache.cacheMessages(messages);
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index a56d194..9cc9506 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -66,10 +66,12 @@ public class CurrentStateComputationStage extends 
AbstractBaseStage {
 
       // update pending messages
       Map<String, Message> messages = cache.getMessages(instanceName);
-      updatePendingMessages(instance, messages.values(), currentStateOutput, 
resourceMap);
+      Map<String, Message> relayMessages = 
cache.getRelayMessages(instanceName);
+      updatePendingMessages(instance, messages.values(), currentStateOutput, 
relayMessages.values(), resourceMap);
 
       // update current states.
-      Map<String, CurrentState> currentStateMap = 
cache.getCurrentState(instanceName, instanceSessionId);
+      Map<String, CurrentState> currentStateMap = 
cache.getCurrentState(instanceName,
+          instanceSessionId);
       updateCurrentStates(instance, currentStateMap.values(), 
currentStateOutput, resourceMap);
     }
 
@@ -84,7 +86,8 @@ public class CurrentStateComputationStage extends 
AbstractBaseStage {
 
   // update all pending messages to CurrentStateOutput.
   private void updatePendingMessages(LiveInstance instance, 
Collection<Message> pendingMessages,
-      CurrentStateOutput currentStateOutput, Map<String, Resource> 
resourceMap) {
+      CurrentStateOutput currentStateOutput, Collection<Message> 
pendingRelayMessages,
+      Map<String, Resource> resourceMap) {
     String instanceName = instance.getInstanceName();
     String instanceSessionId = instance.getSessionId();
 
@@ -100,6 +103,9 @@ public class CurrentStateComputationStage extends 
AbstractBaseStage {
       String resourceName = message.getResourceName();
       Resource resource = resourceMap.get(resourceName);
       if (resource == null) {
+        LogUtil.logInfo(LOG, _eventId, String.format(
+            "Ignore a pending relay message %s for a non-exist resource %s and 
partition %s",
+            message.getMsgId(), resourceName, message.getPartitionName()));
         continue;
       }
 
@@ -109,7 +115,9 @@ public class CurrentStateComputationStage extends 
AbstractBaseStage {
         if (partition != null) {
           setMessageState(currentStateOutput, resourceName, partition, 
instanceName, message);
         } else {
-          // log
+          LogUtil.logInfo(LOG, _eventId, String
+              .format("Ignore a pending message %s for a non-exist resource %s 
and partition %s",
+                  message.getMsgId(), resourceName, 
message.getPartitionName()));
         }
       } else {
         List<String> partitionNames = message.getPartitionNames();
@@ -119,12 +127,47 @@ public class CurrentStateComputationStage extends 
AbstractBaseStage {
             if (partition != null) {
               setMessageState(currentStateOutput, resourceName, partition, 
instanceName, message);
             } else {
-              // log
+              LogUtil.logInfo(LOG, _eventId, String.format(
+                  "Ignore a pending message %s for a non-exist resource %s and 
partition %s",
+                  message.getMsgId(), resourceName, 
message.getPartitionName()));
             }
           }
         }
       }
     }
+
+
+    // update all pending relay messages
+    for (Message message : pendingRelayMessages) {
+      if (!message.isRelayMessage()) {
+        LogUtil.logWarn(LOG, _eventId,
+            String.format("Not a relay message %s, ignored!", 
message.getMsgId()));
+        continue;
+      }
+      String resourceName = message.getResourceName();
+      Resource resource = resourceMap.get(resourceName);
+      if (resource == null) {
+        LogUtil.logInfo(LOG, _eventId, String.format(
+            "Ignore a pending relay message %s for a non-exist resource %s and 
partition %s",
+            message.getMsgId(), resourceName, message.getPartitionName()));
+        continue;
+      }
+
+      if (!message.getBatchMessageMode()) {
+        String partitionName = message.getPartitionName();
+        Partition partition = resource.getPartition(partitionName);
+        if (partition != null) {
+          currentStateOutput.setPendingRelayMessage(resourceName, partition, 
instanceName, message);
+        } else {
+          LogUtil.logInfo(LOG, _eventId, String.format(
+              "Ignore a pending relay message %s for a non-exist resource %s 
and partition %s",
+              message.getMsgId(), resourceName, message.getPartitionName()));
+        }
+      } else {
+        LogUtil.logWarn(LOG, _eventId, String
+            .format("A relay message %s should not be batched, ignored!", 
message.getMsgId()));
+      }
+    }
   }
 
   // update current states in CurrentStateOutput
@@ -169,9 +212,9 @@ public class CurrentStateComputationStage extends 
AbstractBaseStage {
   private void setMessageState(CurrentStateOutput currentStateOutput, String 
resourceName,
       Partition partition, String instanceName, Message message) {
     if 
(MessageType.STATE_TRANSITION.name().equalsIgnoreCase(message.getMsgType())) {
-      currentStateOutput.setPendingState(resourceName, partition, 
instanceName, message);
+      currentStateOutput.setPendingMessage(resourceName, partition, 
instanceName, message);
     } else {
-      currentStateOutput.setCancellationState(resourceName, partition, 
instanceName, message);
+      currentStateOutput.setCancellationMessage(resourceName, partition, 
instanceName, message);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
index 4ebef97..b634703 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
@@ -37,8 +37,9 @@ import com.google.common.collect.Sets;
  */
 public class CurrentStateOutput {
   private final Map<String, Map<Partition, Map<String, String>>> 
_currentStateMap;
-  private final Map<String, Map<Partition, Map<String, Message>>> 
_pendingStateMap;
-  private final Map<String, Map<Partition, Map<String, Message>>> 
_cancellationStateMap;
+  private final Map<String, Map<Partition, Map<String, Message>>> 
_pendingMessageMap;
+  private final Map<String, Map<Partition, Map<String, Message>>> 
_cancellationMessageMap;
+  private final Map<String, Map<Partition, Map<String, Message>>> 
_pendingRelayMessageMap;
 
   // resourceName -> (Partition -> (instanceName -> endTime))
   // Note that startTime / endTime in CurrentState marks that of state 
transition
@@ -61,8 +62,9 @@ public class CurrentStateOutput {
 
   public CurrentStateOutput() {
     _currentStateMap = new HashMap<>();
-    _pendingStateMap = new HashMap<>();
-    _cancellationStateMap = new HashMap<>();
+    _pendingMessageMap = new HashMap<>();
+    _pendingRelayMessageMap = new HashMap<>();
+    _cancellationMessageMap = new HashMap<>();
     _currentStateEndTimeMap = new HashMap<>();
     _resourceStateModelMap = new HashMap<>();
     _curStateMetaMap = new HashMap<>();
@@ -140,9 +142,9 @@ public class CurrentStateOutput {
     _infoMap.get(resourceName).get(partition).put(instanceName, state);
   }
 
-  public void setPendingState(String resourceName, Partition partition, String 
instanceName,
+  public void setPendingMessage(String resourceName, Partition partition, 
String instanceName,
       Message message) {
-    setStateMessage(resourceName, partition, instanceName, message, 
_pendingStateMap);
+    setStateMessage(resourceName, partition, instanceName, message, 
_pendingMessageMap);
   }
 
   /**
@@ -152,9 +154,14 @@ public class CurrentStateOutput {
    * @param instanceName
    * @param message
    */
-  public void setCancellationState(String resourceName, Partition partition, 
String instanceName,
+  public void setCancellationMessage(String resourceName, Partition partition, 
String instanceName,
       Message message) {
-    setStateMessage(resourceName, partition, instanceName, message, 
_cancellationStateMap);
+    setStateMessage(resourceName, partition, instanceName, message, 
_cancellationMessageMap);
+  }
+
+  public void setPendingRelayMessage(String resourceName, Partition partition, 
String instanceName,
+      Message message) {
+    setStateMessage(resourceName, partition, instanceName, message, 
_pendingRelayMessageMap);
   }
 
   private void setStateMessage(String resourceName, Partition partition, 
String instanceName,
@@ -221,15 +228,24 @@ public class CurrentStateOutput {
   }
 
   /**
-   * given (resource, partition, instance), returns toState
+   * given (resource, partition, instance), returns pending message on this 
instance.
    * @param resourceName
    * @param partition
    * @param instanceName
    * @return pending message
    */
-  // TODO: this should return toState, not pending message, create a separate 
method
-  public Message getPendingState(String resourceName, Partition partition, 
String instanceName) {
-    return getStateMessage(resourceName, partition, instanceName, 
_pendingStateMap);
+  public Message getPendingMessage(String resourceName, Partition partition, 
String instanceName) {
+    return getStateMessage(resourceName, partition, instanceName, 
_pendingMessageMap);
+  }
+
+  public Map<String, Message> getPendingRelayMessageMap(String resourceName, 
Partition partition) {
+    if (_pendingRelayMessageMap.containsKey(resourceName)) {
+      Map<Partition, Map<String, Message>> map = 
_pendingRelayMessageMap.get(resourceName);
+      if (map.containsKey(partition)) {
+        return map.get(partition);
+      }
+    }
+    return Collections.emptyMap();
   }
 
   /**
@@ -239,9 +255,9 @@ public class CurrentStateOutput {
    * @param instanceName
    * @return
    */
-  public Message getCancellationState(String resourceName, Partition partition,
+  public Message getCancellationMessage(String resourceName, Partition 
partition,
       String instanceName) {
-    return getStateMessage(resourceName, partition, instanceName, 
_cancellationStateMap);
+    return getStateMessage(resourceName, partition, instanceName, 
_cancellationMessageMap);
   }
 
   private Message getStateMessage(String resourceName, Partition partition, 
String instanceName,
@@ -291,8 +307,8 @@ public class CurrentStateOutput {
    * @return pending target state map
    */
   public Map<String, String> getPendingStateMap(String resourceName, Partition 
partition) {
-    if (_pendingStateMap.containsKey(resourceName)) {
-      Map<Partition, Map<String, Message>> map = 
_pendingStateMap.get(resourceName);
+    if (_pendingMessageMap.containsKey(resourceName)) {
+      Map<Partition, Map<String, Message>> map = 
_pendingMessageMap.get(resourceName);
       if (map.containsKey(partition)) {
         Map<String, Message> pendingMsgMap = map.get(partition);
         Map<String, String> pendingStateMap = new HashMap<String, String>();
@@ -312,8 +328,8 @@ public class CurrentStateOutput {
    * @return pending messages map
    */
   public Map<String, Message> getPendingMessageMap(String resourceName, 
Partition partition) {
-    if (_pendingStateMap.containsKey(resourceName)) {
-      Map<Partition, Map<String, Message>> map = 
_pendingStateMap.get(resourceName);
+    if (_pendingMessageMap.containsKey(resourceName)) {
+      Map<Partition, Map<String, Message>> map = 
_pendingMessageMap.get(resourceName);
       if (map.containsKey(partition)) {
         return map.get(partition);
       }
@@ -328,7 +344,7 @@ public class CurrentStateOutput {
    */
   public Set<Partition> getCurrentStateMappedPartitions(String resourceId) {
     Map<Partition, Map<String, String>> currentStateMap = 
_currentStateMap.get(resourceId);
-    Map<Partition, Map<String, Message>> pendingStateMap = 
_pendingStateMap.get(resourceId);
+    Map<Partition, Map<String, Message>> pendingStateMap = 
_pendingMessageMap.get(resourceId);
     Set<Partition> partitionSet = Sets.newHashSet();
     if (currentStateMap != null) {
       partitionSet.addAll(currentStateMap.keySet());
@@ -346,7 +362,7 @@ public class CurrentStateOutput {
    * @return set of participants to partitions mapping
    */
   public Map<String, Integer> getPartitionCountWithPendingState(String 
resourceStateModel, String state) {
-    return getPartitionCountWithState(resourceStateModel, state, (Map) 
_pendingStateMap);
+    return getPartitionCountWithState(resourceStateModel, state, (Map) 
_pendingMessageMap);
   }
 
   /**
@@ -397,7 +413,7 @@ public class CurrentStateOutput {
   public String toString() {
     StringBuilder sb = new StringBuilder();
     sb.append("current state= ").append(_currentStateMap);
-    sb.append(", pending state= ").append(_pendingStateMap);
+    sb.append(", pending state= ").append(_pendingMessageMap);
     return sb.toString();
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index e21c607..3abc965 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -82,7 +82,7 @@ public class MessageGenerationPhase extends AbstractBaseStage 
{
     }
 
     Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
-    Map<String, String> sessionIdMap = new HashMap<String, String>();
+    Map<String, String> sessionIdMap = new HashMap<>();
 
     for (LiveInstance liveInstance : liveInstances.values()) {
       sessionIdMap.put(liveInstance.getInstanceName(), 
liveInstance.getSessionId());
@@ -101,7 +101,7 @@ public class MessageGenerationPhase extends 
AbstractBaseStage {
 
       for (Partition partition : resource.getPartitions()) {
 
-        Map<String, String> instanceStateMap = new HashMap<String, String>(
+        Map<String, String> instanceStateMap = new HashMap<>(
             intermediateStateOutput.getInstanceStateMap(resourceName, 
partition));
         Map<String, String> pendingStateMap =
             currentStateOutput.getPendingStateMap(resourceName, partition);
@@ -120,24 +120,27 @@ public class MessageGenerationPhase extends 
AbstractBaseStage {
         // we should generate message based on the desired-state priority
         // so keep generated messages in a temp map keyed by state
         // desired-state->list of generated-messages
-        Map<String, List<Message>> messageMap = new HashMap<String, 
List<Message>>();
+        Map<String, List<Message>> messageMap = new HashMap<>();
 
         for (String instanceName : instanceStateMap.keySet()) {
           String desiredState = instanceStateMap.get(instanceName);
 
-          String currentState = 
currentStateOutput.getCurrentState(resourceName, partition, instanceName);
+          String currentState = 
currentStateOutput.getCurrentState(resourceName, partition,
+              instanceName);
           if (currentState == null) {
             currentState = stateModelDef.getInitialState();
           }
 
-          Message pendingMessage = 
currentStateOutput.getPendingState(resourceName, partition, instanceName);
+          Message pendingMessage = 
currentStateOutput.getPendingMessage(resourceName, partition,
+              instanceName);
           boolean isCancellationEnabled = 
cache.getClusterConfig().isStateTransitionCancelEnabled();
-          Message cancellationMessage = 
currentStateOutput.getCancellationState(resourceName, partition, instanceName);
+          Message cancellationMessage = 
currentStateOutput.getCancellationMessage(resourceName,
+              partition, instanceName);
           String nextState = 
stateModelDef.getNextStateForTransition(currentState, desiredState);
 
           Message message = null;
 
-          if (shouldCleanUpPendingMessage(pendingMessage, currentState,
+          if (pendingMessage != null && 
shouldCleanUpPendingMessage(pendingMessage, currentState,
               currentStateOutput.getEndTime(resourceName, partition, 
instanceName))) {
             LogUtil.logInfo(logger, _eventId, String.format(
                 "Adding pending message %s on instance %s to clean up. Msg: 
%s->%s, current state of resource %s:%s is %s",

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
index a061598..03838f4 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
@@ -20,11 +20,13 @@ package org.apache.helix.controller.stages;
  */
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 
 import org.apache.helix.controller.LogUtil;
@@ -60,6 +62,7 @@ public class MessageSelectionStage extends AbstractBaseStage {
   public void process(ClusterEvent event) throws Exception {
     _eventId = event.getEventId();
     ClusterDataCache cache = 
event.getAttribute(AttributeName.ClusterDataCache.name());
+
     Map<String, Resource> resourceMap = 
event.getAttribute(AttributeName.RESOURCES.name());
     CurrentStateOutput currentStateOutput =
         event.getAttribute(AttributeName.CURRENT_STATE.name());
@@ -86,6 +89,7 @@ public class MessageSelectionStage extends AbstractBaseStage {
         List<Message> selectedMessages = 
selectMessages(cache.getLiveInstances(),
             currentStateOutput.getCurrentStateMap(resourceName, partition),
             currentStateOutput.getPendingMessageMap(resourceName, partition), 
messages,
+            currentStateOutput.getPendingRelayMessageMap(resourceName, 
partition).values(),
             stateConstraints, stateTransitionPriorities, stateModelDef,
             resource.isP2PMessageEnabled());
         output.addMessages(resourceName, partition, selectedMessages);
@@ -127,9 +131,9 @@ public class MessageSelectionStage extends 
AbstractBaseStage {
    */
   List<Message> selectMessages(Map<String, LiveInstance> liveInstances,
       Map<String, String> currentStates, Map<String, Message> pendingMessages,
-      List<Message> messages, Map<String, Bounds> stateConstraints,
-      final Map<String, Integer> stateTransitionPriorities, 
StateModelDefinition stateModelDef,
-      boolean p2pMessageEnabled) {
+      List<Message> messages, Collection<Message> pendingRelayMessages,
+      Map<String, Bounds> stateConstraints, final Map<String, Integer> 
stateTransitionPriorities,
+      StateModelDefinition stateModelDef, boolean p2pMessageEnabled) {
     if (messages == null || messages.isEmpty()) {
       return Collections.emptyList();
     }
@@ -188,6 +192,27 @@ public class MessageSelectionStage extends 
AbstractBaseStage {
     for (List<Message> messageList : 
messagesGroupByStateTransitPriority.values()) {
       for (Message message : messageList) {
         String toState = message.getToState();
+        String fromState = message.getFromState();
+
+        if (toState.equals(stateModelDef.getTopState())) {
+          // find if there are any pending relay messages match this message.
+          // if yes, rebuild the message to use the same message id from the 
original relay message.
+          for (Message relayMsg : pendingRelayMessages) {
+            if (relayMsg.getToState().equals(toState) && 
relayMsg.getFromState()
+                .equals(fromState)) {
+              if (relayMsg.getTgtName().equals(message.getTgtName())) {
+                message = new Message(message, relayMsg.getMsgId());
+              } else {
+                // if there are pending relay message that was sent to a 
different host than the current message
+                // we should not send the toState message now.
+                LOG.info(
+                    "There is pending relay message to a different host, not 
send message: {}, pending relay message: {}",
+                    message, relayMsg);
+                continue;
+              }
+            }
+          }
+        }
 
         if (stateConstraints.containsKey(toState)) {
           int newCnt = (stateCnts.containsKey(toState) ? 
stateCnts.get(toState) + 1 : 1);

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
index c196a26..6036f34 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
@@ -160,6 +160,11 @@ public class TaskAssignmentStage extends AbstractBaseStage 
{
       keys.add(keyBuilder.message(message.getTgtName(), message.getId()));
     }
 
-    dataAccessor.createChildren(keys, new ArrayList<>(messages));
+    boolean[] results = dataAccessor.createChildren(keys, new 
ArrayList<>(messages));
+    for (int i = 0; i < results.length; i++) {
+      if (!results[i]) {
+        LogUtil.logWarn(logger, _eventId, "Failed to send message: " + 
keys.get(i));
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 9734cc8..925f127 100644
--- 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -488,8 +488,7 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
           return true;
         } else {
           _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class,
-              "fail to cancel task: " + taskId,
-              notificationContext.getManager());
+              "fail to cancel task: " + taskId, 
notificationContext.getManager());
         }
       } else {
         _statusUpdateUtil.logWarning(message, HelixTaskExecutor.class,
@@ -789,14 +788,6 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
         continue;
       }
 
-      if (message.isExpired()) {
-        LOG.info(
-            "Dropping expired message. mid: " + message.getId() + ", from: " + 
message.getMsgSrc() + " relayed from: "
-                + message.getRelaySrcHost());
-        reportAndRemoveMessage(message, accessor, instanceName, 
ProcessedMessageState.DISCARDED);
-        continue;
-      }
-
       String tgtSessionId = message.getTgtSessionId();
       // sessionId mismatch normally means message comes from expired session, 
just remove it
       if (!sessionId.equals(tgtSessionId) && !tgtSessionId.equals("*")) {
@@ -843,6 +834,14 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
         continue;
       }
 
+      if (message.isExpired()) {
+        LOG.info(
+            "Dropping expired message. mid: " + message.getId() + ", from: " + 
message.getMsgSrc()
+                + " relayed from: " + message.getRelaySrcHost());
+        reportAndRemoveMessage(message, accessor, instanceName, 
ProcessedMessageState.DISCARDED);
+        continue;
+      }
+
       // State Transition Cancellation
       if 
(message.getMsgType().equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) 
{
         boolean success = cancelNotStartedStateTransition(message, 
stateTransitionHandlers, accessor, instanceName);
@@ -874,6 +873,7 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
                 duplicatedMessage.getToState(), message.getFromState(), 
message.getToState()));
           } else if 
(message.getMsgType().equals(MessageType.STATE_TRANSITION.name())
               && isStateTransitionInProgress(messageTarget)) {
+
             // If there is another state transition for same partition is 
going on,
             // discard the message. Controller will resend if this is a valid 
message
             throw new HelixException(String
@@ -1095,7 +1095,7 @@ public class HelixTaskExecutor implements 
MessageListener, TaskExecutor {
         && 
stateTranstionMessage.getToState().equalsIgnoreCase(cancellationMessage.getToState());
   }
 
-  private String getMessageTarget(String resourceName, String partitionName) {
+  String getMessageTarget(String resourceName, String partitionName) {
     return String.format("%s_%s", resourceName, partitionName);
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/model/Message.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java 
b/helix-core/src/main/java/org/apache/helix/model/Message.java
index 8195092..51d03cb 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -178,6 +178,16 @@ public class Message extends HelixProperty {
   }
 
   /**
+   * Instantiate a message with a new id
+   * @param message message to be copied
+   * @param id unique message identifier
+   */
+  public Message(Message message, String id) {
+    super(new ZNRecord(message.getRecord(), id));
+    setMsgId(id);
+  }
+
+  /**
    * Set a subtype of the message
    * @param subType name of the subtype
    */
@@ -820,7 +830,7 @@ public class Message extends HelixProperty {
     // use relay time if this is a relay message
     if (isRelayMessage()) {
       long relayTime = getRelayTime();
-      return relayTime <= 0 || (relayTime + expiry < current);
+      return (relayTime > 0 && (relayTime + expiry < current));
     }
 
     return getCreateTimeStamp() + expiry < current;

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java 
b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index eb67d59..e230fb5 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -65,7 +65,7 @@ public abstract class AbstractTaskDispatcher {
 
         // Check for pending state transitions on this (partition, instance).
         Message pendingMessage =
-            currStateOutput.getPendingState(jobResource, new Partition(pName), 
instance);
+            currStateOutput.getPendingMessage(jobResource, new 
Partition(pName), instance);
         if (pendingMessage != null && 
!pendingMessage.getToState().equals(currState.name())) {
           processTaskWithPendingMessage(prevTaskToInstanceStateAssignment, 
pId, pName, instance,
               pendingMessage, jobState, currState, paMap, assignedPartitions);

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java 
b/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
index 97e02fc..9903117 100644
--- 
a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
@@ -366,7 +366,7 @@ public abstract class DeprecatedTaskRebalancer implements 
Rebalancer, MappingCal
 
         // Check for pending state transitions on this (partition, instance).
         Message pendingMessage =
-            currStateOutput.getPendingState(jobResource, new Partition(pName), 
instance);
+            currStateOutput.getPendingMessage(jobResource, new 
Partition(pName), instance);
         if (pendingMessage != null) {
           // There is a pending state transition for this (partition, 
instance). Just copy forward
           // the state assignment from the previous ideal state.

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
 
b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
index 87b57c7..4389484 100644
--- 
a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
+++ 
b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
@@ -156,7 +156,7 @@ public class FixedTargetTaskAssignmentCalculator extends 
TaskAssignmentCalculato
       int pId = partitions.get(0);
       if (includeSet.contains(pId)) {
         for (String instance : instances) {
-          Message pendingMessage = 
currStateOutput.getPendingState(tgtIs.getResourceName(),
+          Message pendingMessage = 
currStateOutput.getPendingMessage(tgtIs.getResourceName(),
               new Partition(pName), instance);
           if (pendingMessage != null) {
             continue;
@@ -230,7 +230,7 @@ public class FixedTargetTaskAssignmentCalculator extends 
TaskAssignmentCalculato
           // If there is, we should wait until the pending message gets 
processed, so skip
           // assignment this time around
           Message pendingMessage =
-              
currStateOutput.getPendingState(targetIdealState.getResourceName(),
+              
currStateOutput.getPendingMessage(targetIdealState.getResourceName(),
                   new Partition(targetResourcePartitionName), instance);
           if (pendingMessage != null) {
             continue;

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java 
b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index ddda41a..5b29c23 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -270,7 +270,7 @@ public class JobRebalancer extends TaskRebalancer {
           paMap.put(pId, new PartitionAssignment(instance, 
TaskPartitionState.TASK_ABORTED.name()));
         }
         Partition partition = new Partition(pName(jobResource, pId));
-        Message pendingMessage = currStateOutput.getPendingState(jobResource, 
partition, instance);
+        Message pendingMessage = 
currStateOutput.getPendingMessage(jobResource, partition, instance);
         // While job is failing, if the task is pending on INIT->RUNNING, set 
it back to INIT,
         // so that Helix will cancel the transition.
         if (jobCtx.getPartitionState(pId) == TaskPartitionState.INIT && 
pendingMessage != null) {
@@ -337,7 +337,8 @@ public class JobRebalancer extends TaskRebalancer {
       TaskPartitionState state = jobContext.getPartitionState(pId);
       Partition partition = new Partition(pName(jobResource, pId));
       String instance = jobContext.getAssignedParticipant(pId);
-      Message pendingMessage = currentStateOutput.getPendingState(jobResource, 
partition, instance);
+      Message pendingMessage = 
currentStateOutput.getPendingMessage(jobResource, partition,
+          instance);
       // If state is INIT but is pending INIT->RUNNING, it's not yet safe to 
say the job finished
       if (state == TaskPartitionState.RUNNING
           || (state == TaskPartitionState.INIT && pendingMessage != null)) {

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
 
b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
index 9d8e63d..dbf9272 100644
--- 
a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
+++ 
b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
@@ -42,7 +42,7 @@ import java.util.concurrent.TimeUnit;
 public abstract class ZkHelixClusterVerifier
     implements IZkChildListener, IZkDataListener, HelixClusterVerifier {
   private static Logger LOG = 
LoggerFactory.getLogger(ZkHelixClusterVerifier.class);
-  protected static int DEFAULT_TIMEOUT = 30 * 1000;
+  protected static int DEFAULT_TIMEOUT = 300 * 1000;
   protected static int DEFAULT_PERIOD = 100;
 
 

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java 
b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
index 189e95c..c69744e 100644
--- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
@@ -42,6 +42,7 @@ import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.api.config.HelixConfigProperty;
 import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
 import org.apache.helix.controller.pipeline.Pipeline;
 import org.apache.helix.controller.pipeline.Stage;
@@ -65,6 +66,7 @@ import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.OnlineOfflineSMD;
+import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.builder.ConfigScopeBuilder;
 import org.apache.helix.tools.ClusterSetup;
@@ -240,6 +242,38 @@ public class ZkTestBase {
     configAccessor.setInstanceConfig(clusterName, instanceName, 
instanceConfig);
   }
 
+  protected void enableP2PInCluster(String clusterName, ConfigAccessor 
configAccessor,
+      boolean enable) {
+    // enable p2p message in cluster.
+    if (enable) {
+      ClusterConfig clusterConfig = 
configAccessor.getClusterConfig(clusterName);
+      clusterConfig.enableP2PMessage(true);
+      configAccessor.setClusterConfig(clusterName, clusterConfig);
+    } else {
+      ClusterConfig clusterConfig = 
configAccessor.getClusterConfig(clusterName);
+      clusterConfig.getRecord().getSimpleFields()
+          .remove(HelixConfigProperty.P2P_MESSAGE_ENABLED.name());
+      configAccessor.setClusterConfig(clusterName, clusterConfig);
+    }
+  }
+
+  protected void enableP2PInResource(String clusterName, ConfigAccessor 
configAccessor,
+      String dbName, boolean enable) {
+    if (enable) {
+      ResourceConfig resourceConfig =
+          new 
ResourceConfig.Builder(dbName).setP2PMessageEnabled(true).build();
+      configAccessor.setResourceConfig(clusterName, dbName, resourceConfig);
+    } else {
+      // remove P2P Message in resource config
+      ResourceConfig resourceConfig = 
configAccessor.getResourceConfig(clusterName, dbName);
+      if (resourceConfig != null) {
+        resourceConfig.getRecord().getSimpleFields()
+            .remove(HelixConfigProperty.P2P_MESSAGE_ENABLED.name());
+        configAccessor.setResourceConfig(clusterName, dbName, resourceConfig);
+      }
+    }
+  }
+
   protected void setDelayTimeInCluster(ZkClient zkClient, String clusterName, 
long delay) {
     ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
     ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
index 4479cf6..06fbf24 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
@@ -78,7 +78,7 @@ public class TestCurrentStateComputationStage extends 
BaseStageTest {
     runStage(event, stage);
     CurrentStateOutput output2 = 
event.getAttribute(AttributeName.CURRENT_STATE.name());
     String pendingState =
-        output2.getPendingState("testResourceName", new 
Partition("testResourceName_1"),
+        output2.getPendingMessage("testResourceName", new 
Partition("testResourceName_1"),
             "localhost_3").getToState();
     AssertJUnit.assertEquals(pendingState, "SLAVE");
 

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
index 79e4e2c..45e1062 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
@@ -20,6 +20,7 @@ package org.apache.helix.controller.stages;
  */
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -85,7 +86,7 @@ public class TestMsgSelectionStage {
 
     List<Message> selectedMsg =
         new MessageSelectionStage().selectMessages(liveInstances, 
currentStates, pendingMessages,
-            messages, stateConstraints, stateTransitionPriorities,
+            messages, Collections.<Message>emptyList(), stateConstraints, 
stateTransitionPriorities,
             
BuiltInStateModelDefinitions.MasterSlave.getStateModelDefinition(), false);
 
     Assert.assertEquals(selectedMsg.size(), 1);
@@ -123,7 +124,7 @@ public class TestMsgSelectionStage {
 
     List<Message> selectedMsg =
         new MessageSelectionStage().selectMessages(liveInstances, 
currentStates, pendingMessages,
-            messages, stateConstraints, stateTransitionPriorities,
+            messages, Collections.<Message>emptyList(), stateConstraints, 
stateTransitionPriorities,
             
BuiltInStateModelDefinitions.MasterSlave.getStateModelDefinition(), false);
 
     Assert.assertEquals(selectedMsg.size(), 0);

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java
 
b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java
index 551ea78..3c95c2f 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.Map;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.api.config.HelixConfigProperty;
 import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.integration.DelayedTransitionBase;
@@ -33,11 +32,9 @@ import 
org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
-import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.MasterSlaveSMD;
-import org.apache.helix.model.ResourceConfig;
 import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
 import org.testng.Assert;
@@ -55,7 +52,7 @@ public class TestP2PMessageSemiAuto extends ZkTestBase {
   static final String DB_NAME_1 = "TestDB_1";
   static final String DB_NAME_2 = "TestDB_2";
 
-  static final int PARTITION_NUMBER = 20;
+  static final int PARTITION_NUMBER = 200;
   static final int REPLICA_NUMBER = 3;
 
   List<MockParticipantManager> _participants = new ArrayList<>();
@@ -126,23 +123,25 @@ public class TestP2PMessageSemiAuto extends ZkTestBase {
     _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, 
prevMasterInstance, false);
 
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
-    verifyP2PMessage(DB_NAME_1,_instances.get(1), 
MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName());
-    verifyP2PMessage(DB_NAME_2,_instances.get(1), 
MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName());
+    verifyP2PMessage(DB_NAME_1,_instances.get(1), 
MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName(), 1);
+    verifyP2PMessage(DB_NAME_2,_instances.get(1), 
MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName(), 1);
 
 
     //re-enable the old master
     _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, 
prevMasterInstance, true);
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
-    verifyP2PMessage(DB_NAME_1, prevMasterInstance, 
MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName());
-    verifyP2PMessage(DB_NAME_2, prevMasterInstance, 
MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName());
+    verifyP2PMessage(DB_NAME_1, prevMasterInstance, 
MasterSlaveSMD.States.MASTER.name(),
+        _controller.getInstanceName(), 1);
+    verifyP2PMessage(DB_NAME_2, prevMasterInstance, 
MasterSlaveSMD.States.MASTER.name(),
+        _controller.getInstanceName(), 1);
   }
 
   @Test (dependsOnMethods = {"testP2PStateTransitionDisabled"})
   public void testP2PStateTransitionEnabledInCluster() {
-    enableP2PInCluster(true);
-    enableP2PInResource(DB_NAME_1,false);
-    enableP2PInResource(DB_NAME_2,false);
+    enableP2PInCluster(CLUSTER_NAME, _configAccessor, true);
+    enableP2PInResource(CLUSTER_NAME, _configAccessor, DB_NAME_1,false);
+    enableP2PInResource(CLUSTER_NAME, _configAccessor, DB_NAME_2,false);
 
     // disable the master instance
     String prevMasterInstance = _instances.get(0);
@@ -156,15 +155,17 @@ public class TestP2PMessageSemiAuto extends ZkTestBase {
     _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, 
prevMasterInstance, true);
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
-    verifyP2PMessage(DB_NAME_1, prevMasterInstance, 
MasterSlaveSMD.States.MASTER.name(), _instances.get(1));
-    verifyP2PMessage(DB_NAME_2, prevMasterInstance, 
MasterSlaveSMD.States.MASTER.name(), _instances.get(1));
+    verifyP2PMessage(DB_NAME_1, prevMasterInstance, 
MasterSlaveSMD.States.MASTER.name(), _instances.get(
+        1));
+    verifyP2PMessage(DB_NAME_2, prevMasterInstance, 
MasterSlaveSMD.States.MASTER.name(), _instances.get(
+        1));
   }
 
   @Test (dependsOnMethods = {"testP2PStateTransitionDisabled"})
   public void testP2PStateTransitionEnabledInResource() {
-    enableP2PInCluster(false);
-    enableP2PInResource(DB_NAME_1,true);
-    enableP2PInResource(DB_NAME_2,false);
+    enableP2PInCluster(CLUSTER_NAME, _configAccessor, false);
+    enableP2PInResource(CLUSTER_NAME, _configAccessor, DB_NAME_1,true);
+    enableP2PInResource(CLUSTER_NAME, _configAccessor, DB_NAME_2,false);
 
 
     // disable the master instance
@@ -173,7 +174,7 @@ public class TestP2PMessageSemiAuto extends ZkTestBase {
 
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
     verifyP2PMessage(DB_NAME_1, _instances.get(1), 
MasterSlaveSMD.States.MASTER.name(), prevMasterInstance);
-    verifyP2PMessage(DB_NAME_2, _instances.get(1), 
MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName());
+    verifyP2PMessage(DB_NAME_2, _instances.get(1), 
MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName(), 1);
 
 
     //re-enable the old master
@@ -181,37 +182,14 @@ public class TestP2PMessageSemiAuto extends ZkTestBase {
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
     verifyP2PMessage(DB_NAME_1, prevMasterInstance, 
MasterSlaveSMD.States.MASTER.name(), _instances.get(1));
-    verifyP2PMessage(DB_NAME_2, prevMasterInstance, 
MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName());
-  }
-
-  private void enableP2PInCluster(boolean enable) {
-    // enable p2p message in cluster.
-    if (enable) {
-      ClusterConfig clusterConfig = 
_configAccessor.getClusterConfig(CLUSTER_NAME);
-      clusterConfig.enableP2PMessage(true);
-      _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
-    } else {
-      ClusterConfig clusterConfig = 
_configAccessor.getClusterConfig(CLUSTER_NAME);
-      
clusterConfig.getRecord().getSimpleFields().remove(HelixConfigProperty.P2P_MESSAGE_ENABLED.name());
-      _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
-    }
+    verifyP2PMessage(DB_NAME_2, prevMasterInstance, 
MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName(), 1);
   }
 
-  private void enableP2PInResource(String dbName, boolean enable) {
-    if (enable) {
-      ResourceConfig resourceConfig = new 
ResourceConfig.Builder(dbName).setP2PMessageEnabled(true).build();
-      _configAccessor.setResourceConfig(CLUSTER_NAME, dbName, resourceConfig);
-    } else {
-      // remove P2P Message in resource config
-      ResourceConfig resourceConfig = 
_configAccessor.getResourceConfig(CLUSTER_NAME, dbName);
-      if (resourceConfig != null) {
-        
resourceConfig.getRecord().getSimpleFields().remove(HelixConfigProperty.P2P_MESSAGE_ENABLED.name());
-        _configAccessor.setResourceConfig(CLUSTER_NAME, dbName, 
resourceConfig);
-      }
-    }
+  private void verifyP2PMessage(String dbName, String instance, String 
expectedState, String expectedTriggerHost) {
+    verifyP2PMessage(dbName, instance, expectedState, expectedTriggerHost, 
0.7);
   }
 
-  private void verifyP2PMessage(String dbName, String instance, String 
expectedState, String expectedTriggerHost) {
+  private void verifyP2PMessage(String dbName, String instance, String 
expectedState, String expectedTriggerHost, double expectedRatio) {
     ClusterDataCache dataCache = new ClusterDataCache(CLUSTER_NAME);
     dataCache.refresh(_accessor);
 
@@ -239,7 +217,7 @@ public class TestP2PMessageSemiAuto extends ZkTestBase {
     }
 
     double ratio = ((double) expectedHost) / ((double) total);
-    Assert.assertTrue(ratio >= 0.7, String
+    Assert.assertTrue(ratio >= expectedRatio, String
         .format("Only %d out of %d percent transitions to Master were 
triggered by expected host!",
             expectedHost, total));
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
 
b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
new file mode 100644
index 0000000..b3ef3e5
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
@@ -0,0 +1,315 @@
+package org.apache.helix.integration.messaging;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import org.apache.helix.common.ZkTestBase;
+import 
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.integration.DelayedTransitionBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.messaging.DefaultMessagingService;
+import org.apache.helix.messaging.handling.HelixTaskExecutor;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.messaging.handling.MockHelixTaskExecutor;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.monitoring.mbeans.MessageQueueMonitor;
+import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor;
+import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestP2PNoDuplicatedMessage extends ZkTestBase {
+  private static Logger logger = 
LoggerFactory.getLogger(TestP2PNoDuplicatedMessage.class);
+
+  final String CLASS_NAME = getShortClassName();
+  final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+
+  static final int PARTICIPANT_NUMBER = 6;
+  static final int PARTICIPANT_START_PORT = 12918;
+
+  static final int DB_COUNT = 2;
+
+  static final int PARTITION_NUMBER = 50;
+  static final int REPLICA_NUMBER = 3;
+
+  final String _controllerName = CONTROLLER_PREFIX + "_0";
+
+  List<MockParticipantManager> _participants = new ArrayList<>();
+  List<String> _instances = new ArrayList<>();
+  ClusterControllerManager _controller;
+
+  ZkHelixClusterVerifier _clusterVerifier;
+  ConfigAccessor _configAccessor;
+  HelixDataAccessor _accessor;
+
+  @BeforeClass
+  public void beforeClass()
+      throws InterruptedException {
+    System.out.println(
+        "START " + getShortClassName() + " at " + new 
Date(System.currentTimeMillis()));
+
+    // setup storage cluster
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
+
+    for (int i = 0; i < PARTICIPANT_NUMBER; i++) {
+      String instance = PARTICIPANT_PREFIX + "_" + (PARTICIPANT_START_PORT + 
i);
+      _gSetupTool.addInstanceToCluster(CLUSTER_NAME, instance);
+      _instances.add(instance);
+    }
+
+    // start dummy participants
+    for (int i = 0; i < PARTICIPANT_NUMBER; i++) {
+      MockParticipantManager participant =
+          new TestParticipantManager(ZK_ADDR, CLUSTER_NAME, _instances.get(i));
+      participant.setTransition(new DelayedTransitionBase(100));
+      participant.syncStart();
+      _participants.add(participant);
+    }
+
+    enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true);
+    enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+
+    for (int i = 0; i < DB_COUNT; i++) {
+      createResourceWithDelayedRebalance(CLUSTER_NAME, "TestDB_" + i,
+          BuiltInStateModelDefinitions.MasterSlave.name(), PARTITION_NUMBER, 
REPLICA_NUMBER,
+          REPLICA_NUMBER - 1, 1000000L, 
CrushEdRebalanceStrategy.class.getName());
+    }
+
+    // start controller
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, 
_controllerName);
+    _controller.syncStart();
+
+    _clusterVerifier = new 
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient).build();
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    _configAccessor = new ConfigAccessor(_gZkClient);
+    _accessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    _controller.syncStop();
+    for (MockParticipantManager p : _participants) {
+      if (p.isConnected()) {
+        p.syncStop();
+      }
+    }
+    deleteCluster(CLUSTER_NAME);
+    System.out.println("END " + CLASS_NAME + " at " + new 
Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testP2PStateTransitionDisabled() throws InterruptedException {
+    enableP2PInCluster(CLUSTER_NAME, _configAccessor, false);
+
+    MockHelixTaskExecutor.resetStats();
+    // rolling upgrade the cluster
+    for (String ins : _instances) {
+      _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, ins, 
false);
+      Assert.assertTrue(_clusterVerifier.verifyByPolling());
+      verifyP2PDisabled();
+
+      _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, ins, 
true);
+      Assert.assertTrue(_clusterVerifier.verifyByPolling());
+      verifyP2PDisabled();
+    }
+
+    Assert.assertEquals(MockHelixTaskExecutor.duplicatedMessagesInProgress, 0,
+        "There are duplicated transition messages sent while participant is 
handling the state-transition!");
+    Assert.assertEquals(MockHelixTaskExecutor.duplicatedMessages, 0,
+        "There are duplicated transition messages sent at same time!");
+  }
+
+  @Test (dependsOnMethods = {"testP2PStateTransitionDisabled"})
+  public void testP2PStateTransitionEnabled() throws InterruptedException {
+    enableP2PInCluster(CLUSTER_NAME, _configAccessor, true);
+    long startTime = System.currentTimeMillis();
+    MockHelixTaskExecutor.resetStats();
+    // rolling upgrade the cluster
+    for (String ins : _instances) {
+      _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, ins, 
false);
+      Assert.assertTrue(_clusterVerifier.verifyByPolling());
+      verifyP2PEnabled(startTime);
+
+      _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, ins, 
true);
+      Assert.assertTrue(_clusterVerifier.verifyByPolling());
+      verifyP2PEnabled(startTime);
+    }
+
+    double ratio = ((double) p2pTrigged) / ((double) total);
+    Assert.assertTrue(ratio > 0.7, String
+        .format("Only %d out of %d percent transitions to Master were 
triggered by expected host!",
+            p2pTrigged, total));
+
+    Assert.assertEquals(MockHelixTaskExecutor.duplicatedMessagesInProgress, 0,
+        "There are duplicated transition messages sent while participant is 
handling the state-transition!");
+    Assert.assertEquals(MockHelixTaskExecutor.duplicatedMessages, 0,
+        "There are duplicated transition messages sent at same time!");
+  }
+
+  private void verifyP2PDisabled() {
+    ClusterDataCache dataCache = new ClusterDataCache(CLUSTER_NAME);
+    dataCache.refresh(_accessor);
+    Map<String, LiveInstance> liveInstanceMap = dataCache.getLiveInstances();
+
+    for (LiveInstance instance : liveInstanceMap.values()) {
+      Map<String, CurrentState> currentStateMap =
+          dataCache.getCurrentState(instance.getInstanceName(), 
instance.getSessionId());
+      Assert.assertNotNull(currentStateMap);
+      for (CurrentState currentState : currentStateMap.values()) {
+        for (String partition : currentState.getPartitionStateMap().keySet()) {
+          String state = currentState.getState(partition);
+          if (state.equalsIgnoreCase("MASTER")) {
+            String triggerHost = currentState.getTriggerHost(partition);
+            Assert.assertEquals(triggerHost, _controllerName,
+                state + " of " + partition + " on " + 
instance.getInstanceName()
+                    + " was triggered by " + triggerHost);
+          }
+        }
+      }
+    }
+  }
+
+  static int total = 0;
+  static int p2pTrigged = 0;
+
+  private void verifyP2PEnabled(long startTime) {
+    ClusterDataCache dataCache = new ClusterDataCache(CLUSTER_NAME);
+    dataCache.refresh(_accessor);
+    Map<String, LiveInstance> liveInstanceMap = dataCache.getLiveInstances();
+
+    for (LiveInstance instance : liveInstanceMap.values()) {
+      Map<String, CurrentState> currentStateMap =
+          dataCache.getCurrentState(instance.getInstanceName(), 
instance.getSessionId());
+      Assert.assertNotNull(currentStateMap);
+      for (CurrentState currentState : currentStateMap.values()) {
+        for (String partition : currentState.getPartitionStateMap().keySet()) {
+          String state = currentState.getState(partition);
+          long start = currentState.getStartTime(partition);
+          if (state.equalsIgnoreCase("MASTER") && start > startTime) {
+            String triggerHost = currentState.getTriggerHost(partition);
+            if (!triggerHost.equals(_controllerName)) {
+              p2pTrigged ++;
+            }
+            total ++;
+          }
+        }
+      }
+    }
+  }
+
+
+  static class TestParticipantManager extends MockParticipantManager {
+    private final DefaultMessagingService _messagingService;
+
+    public TestParticipantManager(String zkAddr, String clusterName, String 
instanceName) {
+      super(zkAddr, clusterName, instanceName);
+      _messagingService = new MockMessagingService(this);
+    }
+
+    @Override
+    public ClusterMessagingService getMessagingService() {
+      // The caller can register message handler factories on messaging 
service before the
+      // helix manager is connected. Thus we do not do connected check here.
+      return _messagingService;
+    }
+  }
+
+  static class MockMessagingService extends DefaultMessagingService {
+    private final HelixTaskExecutor _taskExecutor;
+    ConcurrentHashMap<String, MessageHandlerFactory> 
_messageHandlerFactoriestobeAdded =
+        new ConcurrentHashMap<>();
+    private final HelixManager _manager;
+
+    public MockMessagingService(HelixManager manager) {
+      super(manager);
+      _manager = manager;
+
+      boolean isParticipant = false;
+      if (manager.getInstanceType() == InstanceType.PARTICIPANT
+          || manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT) 
{
+        isParticipant = true;
+      }
+
+      _taskExecutor = new MockHelixTaskExecutor(
+          new ParticipantStatusMonitor(isParticipant, 
manager.getInstanceName()),
+          new MessageQueueMonitor(manager.getClusterName(), 
manager.getInstanceName()));
+    }
+
+    @Override
+    public synchronized void registerMessageHandlerFactory(String type,
+        MessageHandlerFactory factory) {
+      registerMessageHandlerFactory(Collections.singletonList(type), factory);
+    }
+
+    @Override
+    public synchronized void registerMessageHandlerFactory(List<String> types,
+        MessageHandlerFactory factory) {
+      if (_manager.isConnected()) {
+        for (String type : types) {
+          registerMessageHandlerFactoryExtended(type, factory);
+        }
+      } else {
+        for (String type : types) {
+          _messageHandlerFactoriestobeAdded.put(type, factory);
+        }
+      }
+    }
+
+    public synchronized void onConnected() {
+      for (String type : _messageHandlerFactoriestobeAdded.keySet()) {
+        registerMessageHandlerFactoryExtended(type, 
_messageHandlerFactoriestobeAdded.get(type));
+      }
+      _messageHandlerFactoriestobeAdded.clear();
+    }
+
+    public HelixTaskExecutor getExecutor() {
+      return _taskExecutor;
+    }
+
+
+    void registerMessageHandlerFactoryExtended(String type, 
MessageHandlerFactory factory) {
+      int threadpoolSize = HelixTaskExecutor.DEFAULT_PARALLEL_TASKS;
+      _taskExecutor.registerMessageHandlerFactory(type, factory, 
threadpoolSize);
+      super.sendNopMessage();
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java
index 4083988..83893c6 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java
@@ -98,8 +98,6 @@ public class TestExpandCluster extends 
TestPartitionMigrationBase {
 
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
     Assert.assertFalse(_migrationVerifier.hasLessReplica());
-    Assert.assertFalse(_migrationVerifier.hasMoreReplica());
-
     _migrationVerifier.stop();
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/test/java/org/apache/helix/messaging/handling/MockHelixTaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/messaging/handling/MockHelixTaskExecutor.java
 
b/helix-core/src/test/java/org/apache/helix/messaging/handling/MockHelixTaskExecutor.java
new file mode 100644
index 0000000..207f74c
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/messaging/handling/MockHelixTaskExecutor.java
@@ -0,0 +1,111 @@
+package org.apache.helix.messaging.handling;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.Message;
+import org.apache.helix.monitoring.mbeans.MessageQueueMonitor;
+import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor;
+
+public class MockHelixTaskExecutor extends HelixTaskExecutor {
+  public static int duplicatedMessages = 0;
+  public static int extraStateTransition = 0;
+  public static int duplicatedMessagesInProgress = 0;
+  HelixManager manager;
+
+  public MockHelixTaskExecutor(ParticipantStatusMonitor 
participantStatusMonitor,
+      MessageQueueMonitor messageQueueMonitor) {
+    super(participantStatusMonitor, messageQueueMonitor);
+  }
+
+  @Override
+  public void onMessage(String instanceName, List<Message> messages,
+      NotificationContext changeContext) {
+    manager = changeContext.getManager();
+    checkDuplicatedMessages(messages);
+    super.onMessage(instanceName, messages, changeContext);
+  }
+
+  void checkDuplicatedMessages(List<Message> messages) {
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey path = keyBuilder.currentStates(manager.getInstanceName(), 
manager.getSessionId());
+    Map<String, CurrentState> currentStateMap = 
accessor.getChildValuesMap(path);
+
+    Set<String> seenPartitions = new HashSet<>();
+    for (Message message : messages) {
+      if 
(message.getMsgType().equals(Message.MessageType.STATE_TRANSITION.name())) {
+        String resource = message.getResourceName();
+        String partition = message.getPartitionName();
+
+        //System.err.println(message.getMsgId());
+        String key = resource + "-" + partition;
+        if (seenPartitions.contains(key)) {
+          //System.err.println("Duplicated message received for " + resource + 
":" + partition);
+          duplicatedMessages++;
+        }
+        seenPartitions.add(key);
+
+        String toState = message.getToState();
+        String state = null;
+        if (currentStateMap.containsKey(resource)) {
+          CurrentState currentState = currentStateMap.get(resource);
+          state = currentState.getState(partition);
+        }
+
+        if (toState.equals(state) && message.getMsgState() == 
Message.MessageState.NEW) {
+          //            logger.error(
+          //                "Extra message: " + message.getMsgId() + ", 
Partition is already in target state "
+          //                    + toState + " for " + resource + ":" + 
partition);
+          extraStateTransition++;
+        }
+
+        String messageTarget =
+            getMessageTarget(message.getResourceName(), 
message.getPartitionName());
+
+        if (message.getMsgState() == Message.MessageState.NEW &&
+            _messageTaskMap.containsKey(messageTarget)) {
+          String taskId = _messageTaskMap.get(messageTarget);
+          MessageTaskInfo messageTaskInfo = _taskMap.get(taskId);
+          Message existingMsg = messageTaskInfo.getTask().getMessage();
+          if (existingMsg.getMsgId() != message.getMsgId())
+            //            logger.error("Duplicated message In Progress: " + 
message.getMsgId()
+            //                    + ", state transition in progress with 
message " + existingMsg.getMsgId()
+            //                    + " to " + toState + " for " + resource + 
":" + partition);
+            duplicatedMessagesInProgress ++;
+        }
+      }
+    }
+  }
+
+  public static void resetStats() {
+    duplicatedMessages = 0;
+    extraStateTransition = 0;
+    duplicatedMessagesInProgress = 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
 
b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
index d5e40be..bc1e554 100644
--- 
a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
+++ 
b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
@@ -32,6 +32,11 @@ import 
org.apache.helix.controller.stages.BestPossibleStateCalcStage;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.controller.stages.IntermediateStateCalcStage;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.controller.stages.IntermediateStateCalcStage;
+import org.apache.helix.controller.stages.IntermediateStateOutput;
 import org.apache.helix.controller.stages.MessageGenerationPhase;
 import org.apache.helix.controller.stages.MessageSelectionStage;
 import org.apache.helix.controller.stages.MessageSelectionStageOutput;
@@ -45,6 +50,7 @@ import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest {
@@ -156,7 +162,8 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends 
BaseStageTest {
     // Validate: Controller should not send S->M message to new master.
 
     currentStateOutput.setCurrentState(_db, _partition, initialMaster, 
"SLAVE");
-    currentStateOutput.setPendingState(_db, _partition, initialMaster, 
toSlaveMessage);
+    currentStateOutput.setPendingMessage(_db, _partition, initialMaster, 
toSlaveMessage);
+    currentStateOutput.setPendingRelayMessage(_db, _partition, initialMaster, 
relayMessage);
 
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
 
@@ -175,7 +182,7 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends 
BaseStageTest {
     currentStateOutput =
         populateCurrentStateFromBestPossible(_bestpossibleState);
     currentStateOutput.setCurrentState(_db, _partition, initialMaster, 
"SLAVE");
-    currentStateOutput.setPendingState(_db, _partition, secondMaster, 
relayMessage);
+    currentStateOutput.setPendingMessage(_db, _partition, secondMaster, 
relayMessage);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
 
     _fullPipeline.handle(event);
@@ -221,7 +228,7 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends 
BaseStageTest {
     // The initial master has forwarded the p2p message to secondMaster and 
deleted original M->S message on initialMaster,
     // But the S->M state-transition has not completed yet in secondMaster.
     // Validate: Controller should not send S->M to thirdMaster.
-    currentStateOutput.setPendingState(_db, _partition, secondMaster, 
relayMessage);
+    currentStateOutput.setPendingMessage(_db, _partition, secondMaster, 
relayMessage);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
 
     event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), 
_bestpossibleState);

Reply via email to