This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 19b6d7c1db3fdce261e9f035c87e155e06315b80
Author: Lei Xia <[email protected]>
AuthorDate: Wed Jan 30 10:48:29 2019 -0800

    Fix race condition in P2P message handling that can cause long mastership 
handoff problem.
---
 .../helix/common/caches/CurrentStateCache.java     |   4 +-
 .../helix/common/caches/InstanceMessagesCache.java | 348 +++++++++++++--------
 .../helix/controller/GenericHelixController.java   |  93 +++++-
 .../ResourceControllerDataProvider.java            |   5 +-
 .../WorkflowControllerDataProvider.java            |   4 +-
 .../helix/controller/stages/ClusterEvent.java      |   2 +-
 .../helix/controller/stages/ClusterEventType.java  |   1 +
 .../controller/stages/MessageGenerationPhase.java  |   8 +-
 .../BestPossibleExternalViewVerifier.java          |   1 +
 .../helix/integration/TestEnableCompression.java   |   2 +-
 .../messaging/TestP2PMessageSemiAuto.java          |   2 +-
 .../messaging/TestP2PSingleTopState.java           | 214 +++++++++++++
 .../helix/participant/MockZKHelixManager.java      |   5 +-
 13 files changed, 531 insertions(+), 158 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
 
b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
index 01c8aa9..38ac745 100644
--- 
a/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
+++ 
b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
@@ -103,10 +103,10 @@ public class CurrentStateCache extends 
AbstractDataCache<CurrentState> {
     long endTime = System.currentTimeMillis();
     LogUtil.logInfo(LOG, genEventInfo(),
         "END: CurrentStateCache.refresh() for cluster " + 
_controlContextProvider.getClusterName()
-            + ", took " + (endTime - startTime) + " ms");
+            + ", started at : " + startTime + ", took " + (endTime - 
startTime) + " ms");
     if (LOG.isDebugEnabled()) {
       LogUtil.logDebug(LOG, genEventInfo(),
-          String.format("Current State freshed : %s", 
_currentStateMap.toString()));
+          String.format("Current State refreshed : %s", 
_currentStateMap.toString()));
     }
     return true;
   }
diff --git 
a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
 
b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
index 9fa9136..b57ffb9 100644
--- 
a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
+++ 
b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
@@ -54,22 +55,21 @@ public class InstanceMessagesCache {
   // <instance -> {<MessageId, Message>}>
   private Map<String, Map<String, Message>> _relayMessageCache = 
Maps.newHashMap();
 
+  // Map of a relay message to its original hosted message.
+  private Map<String, Message> _relayHostMessageCache = Maps.newHashMap();
 
-  // TODO: Temporary workaround to void participant receiving duplicated state 
transition messages when p2p is enable.
-  // should remove this once all clients are migrated to 0.8.2. -- Lei
-  private Map<String, Message> _committedRelayMessages = Maps.newHashMap();
+  public static final String RELAY_MESSAGE_LIFETIME = 
"helix.controller.messagecache.relaymessagelifetime";
 
-  public static final String COMMIT_MESSAGE_EXPIRY_CONFIG = 
"helix.controller.messagecache.commitmessageexpiry";
-
-  private static final int DEFAULT_COMMIT_RELAY_MESSAGE_EXPIRY = 20 * 1000; // 
20 seconds
-  private final int _commitMessageExpiry;
+  // If Helix missed all of other events to evict a relay message from the 
cache, it will delete the message anyway after this timeout.
+  private static final int DEFAULT_RELAY_MESSAGE_LIFETIME = 120 * 1000;  // in 
ms
+  private final int _relayMessageLifetime;
 
   private String _clusterName;
 
   public InstanceMessagesCache(String clusterName) {
     _clusterName = clusterName;
-    _commitMessageExpiry = HelixUtil
-        .getSystemPropertyAsInt(COMMIT_MESSAGE_EXPIRY_CONFIG, 
DEFAULT_COMMIT_RELAY_MESSAGE_EXPIRY);
+    _relayMessageLifetime = HelixUtil
+        .getSystemPropertyAsInt(RELAY_MESSAGE_LIFETIME, 
DEFAULT_RELAY_MESSAGE_LIFETIME);
   }
 
   /**
@@ -137,195 +137,265 @@ public class InstanceMessagesCache {
     _messageMap = Collections.unmodifiableMap(msgMap);
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Message purge took: " + purgeSum);
-      LOG.debug("# of Messages read from ZooKeeper " + newMessageKeys.size() + 
". took " + (
-          System.currentTimeMillis() - startTime) + " ms.");
-    }
+      LOG.debug("Message purge took: {} ", purgeSum);
 
-    LOG.info("END: InstanceMessagesCache.refresh()");
+    }
 
+    LOG.info(
+        "END: InstanceMessagesCache.refresh(), {} of Messages read from 
ZooKeeper. took {} ms. ",
+        newMessageKeys.size(), (System.currentTimeMillis() - startTime));
     return true;
   }
 
-  // update all valid relay messages attached to existing state transition 
messages into message map.
+  /**
+   * Refresh relay message cache by updating relay messages read from ZK, and 
remove all expired relay messages.
+   */
   public void updateRelayMessages(Map<String, LiveInstance> liveInstanceMap,
       Map<String, Map<String, Map<String, CurrentState>>> currentStateMap) {
 
-    // refresh _relayMessageCache
+    // cache all relay messages read from ZK
     for (String instance : _messageMap.keySet()) {
       Map<String, Message> instanceMessages = _messageMap.get(instance);
-      Map<String, Map<String, CurrentState>> instanceCurrentStateMap =
-          currentStateMap.get(instance);
-      if (instanceCurrentStateMap == null) {
-        continue;
-      }
-
       for (Message message : instanceMessages.values()) {
         if (message.hasRelayMessages()) {
-          String sessionId = message.getTgtSessionId();
-          String resourceName = message.getResourceName();
-          String partitionName = message.getPartitionName();
-          String targetState = message.getToState();
-          String instanceSessionId = 
liveInstanceMap.get(instance).getSessionId();
-
-          if (!instanceSessionId.equals(sessionId)) {
-            LOG.info("Instance SessionId does not match, ignore relay messages 
attached to message "
-                + message.getId());
-            continue;
-          }
-
-          Map<String, CurrentState> sessionCurrentStateMap = 
instanceCurrentStateMap.get(sessionId);
-          if (sessionCurrentStateMap == null) {
-            LOG.info("No sessionCurrentStateMap found, ignore relay messages 
attached to message "
-                + message.getId());
-            continue;
-          }
-          CurrentState currentState = sessionCurrentStateMap.get(resourceName);
-          if (currentState == null || 
!targetState.equals(currentState.getState(partitionName))) {
-            LOG.info("CurrentState " + currentState
-                + " do not match the target state of the message, ignore relay 
messages attached to message "
-                + message.getId());
-            continue;
-          }
-          long transitionCompleteTime = currentState.getEndTime(partitionName);
-
           for (Message relayMsg : message.getRelayMessages().values()) {
-            relayMsg.setRelayTime(transitionCompleteTime);
-            cacheRelayMessage(relayMsg);
+            cacheRelayMessage(relayMsg, message);
           }
         }
       }
     }
 
     Map<String, Map<String, Message>> relayMessageMap = new HashMap<>();
-    // refresh _relayMessageMap
+    long nextRebalanceTime = Long.MAX_VALUE;
+
+    // Iterate all relay message in the cache, remove invalid or expired ones.
     for (String instance : _relayMessageCache.keySet()) {
-      Map<String, Message> messages = _relayMessageCache.get(instance);
-      Map<String, Map<String, CurrentState>> instanceCurrentStateMap =
-          currentStateMap.get(instance);
-      if (instanceCurrentStateMap == null) {
-        continue;
-      }
+      Map<String, Message> relayMessages = _relayMessageCache.get(instance);
+      Iterator<Map.Entry<String, Message>> iterator = 
relayMessages.entrySet().iterator();
 
-      Iterator<Map.Entry<String, Message>> iterator = 
messages.entrySet().iterator();
       while (iterator.hasNext()) {
-        Message message = iterator.next().getValue();
-        String sessionId = message.getTgtSessionId();
-        String resourceName = message.getResourceName();
-        String partitionName = message.getPartitionName();
-        String targetState = message.getToState();
-        String instanceSessionId = 
liveInstanceMap.get(instance).getSessionId();
+        Message relayMessage = iterator.next().getValue();
 
         Map<String, Message> instanceMsgMap = _messageMap.get(instance);
-
-        if (instanceMsgMap != null && 
instanceMsgMap.containsKey(message.getMsgId())) {
-          Message commitMessage = instanceMsgMap.get(message.getMsgId());
-
-          if (!commitMessage.isRelayMessage()) {
-            LOG.info(
-                "Controller already sent the message to the target host, 
remove relay message from the cache"
-                    + message.getId());
+        if (instanceMsgMap != null && 
instanceMsgMap.containsKey(relayMessage.getMsgId())) {
+          Message committedMessage = 
instanceMsgMap.get(relayMessage.getMsgId());
+          if (committedMessage.isRelayMessage()) {
+            LOG.info("Relay message committed, remove relay message {} from 
the cache.", relayMessage
+                .getId());
             iterator.remove();
-            _committedRelayMessages.remove(message.getMsgId());
+            _relayHostMessageCache.remove(relayMessage.getMsgId());
             continue;
           } else {
-            // relay message has already been sent to target host
-            // remember when the relay messages get relayed to the target host.
-            if (!_committedRelayMessages.containsKey(message.getMsgId())) {
-              message.setRelayTime(System.currentTimeMillis());
-              _committedRelayMessages.put(message.getMsgId(), message);
-              LOG.info("Put message into committed relay messages " + 
message.getId());
+            // controller already sent the same message to target host,
+            // To avoid potential race-condition, do not remove relay message 
immediately,
+            // just set the relay time as current time .
+            if (relayMessage.getRelayTime() < 0) {
+              relayMessage.setRelayTime(System.currentTimeMillis());
+              LOG.info(
+                  "Controller already sent the message {} to the target host, 
set message to be relayed at {}",
+                  relayMessage.getId(), relayMessage.getRelayTime());
             }
           }
         }
 
+        String sessionId = relayMessage.getTgtSessionId();
+        String instanceSessionId = 
liveInstanceMap.get(instance).getSessionId();
+
+        // Target host's session has been changed, remove relay message
         if (!instanceSessionId.equals(sessionId)) {
-          LOG.info(
-              "Instance SessionId does not match, remove relay message from 
the cache" + message
-                  .getId());
+          LOG.info("Instance SessionId does not match, remove relay message {} 
from the cache.", relayMessage.getId());
           iterator.remove();
+          _relayHostMessageCache.remove(relayMessage.getMsgId());
           continue;
         }
 
+        Map<String, Map<String, CurrentState>> instanceCurrentStateMap =
+            currentStateMap.get(instance);
+        if (instanceCurrentStateMap == null) {
+          LOG.warn("CurrentStateMap null for " + instance);
+          continue;
+        }
         Map<String, CurrentState> sessionCurrentStateMap = 
instanceCurrentStateMap.get(sessionId);
         if (sessionCurrentStateMap == null) {
-          LOG.info("No sessionCurrentStateMap found, ignore relay message from 
the cache" + message
-              .getId());
+          LOG.warn("CurrentStateMap null for {}, session {}.", instance, 
sessionId);
           continue;
         }
 
+        String resourceName = relayMessage.getResourceName();
+        String partitionName = relayMessage.getPartitionName();
+        String targetState = relayMessage.getToState();
+        String fromState = relayMessage.getFromState();
         CurrentState currentState = sessionCurrentStateMap.get(resourceName);
-        if (currentState != null && 
targetState.equals(currentState.getState(partitionName))) {
-          LOG.info("CurrentState " + currentState
-              + " match the target state of the relay message, remove relay 
from cache." + message
-              .getId());
+
+        long currentTime = System.currentTimeMillis();
+        if (currentState == null) {
+          if (relayMessage.getRelayTime() < 0) {
+            relayMessage.setRelayTime(currentTime);
+            LOG.warn("CurrentState is null for {} on {}, set relay time {} for 
message {}",
+                resourceName, instance, relayMessage.getRelayTime(), 
relayMessage.getId());
+          }
+        }
+
+        // if partition state on the target host already changed, set it to be 
expired.
+        String partitionState = currentState.getState(partitionName);
+        if (targetState.equals(partitionState) || 
!fromState.equals(partitionState)) {
+          if (relayMessage.getRelayTime() < 0) {
+            relayMessage.setRelayTime(currentTime);
+            LOG.debug(
+                "CurrentState {} on target host has changed, set relay time {} 
for message {}.",
+                partitionState, relayMessage.getRelayTime(), 
relayMessage.getId());
+          }
+        }
+
+        // derive relay time from hosted message and relayed host.
+        setRelayTime(relayMessage, liveInstanceMap, currentStateMap);
+
+        if (relayMessage.isExpired()) {
+          LOG.info("relay message {} expired, remove it from cache. relay time 
{}.",
+              relayMessage.getId(), relayMessage.getRelayTime());
           iterator.remove();
+          _relayHostMessageCache.remove(relayMessage.getMsgId());
           continue;
         }
 
-        if (message.isExpired()) {
-          LOG.error("relay message has not been sent " + message.getId()
-              + " expired, remove it from cache. relay time: " + 
message.getRelayTime());
+        // If Helix missed all of other events to evict a relay message from 
the cache,
+        // it will delete the message anyway after a certain timeout.
+        // This is the latest resort to avoid a relay message stuck in the 
cache forever.
+        // This case should happen very rarely.
+        if (relayMessage.getRelayTime() < 0
+            && (relayMessage.getCreateTimeStamp() + _relayMessageLifetime) < 
System
+            .currentTimeMillis()) {
+          LOG.info(
+              "relay message {} has reached its lifetime, remove it from 
cache.", relayMessage.getId());
           iterator.remove();
+          _relayHostMessageCache.remove(relayMessage.getMsgId());
           continue;
         }
 
+        long expiryTime = relayMessage.getCreateTimeStamp() + 
_relayMessageLifetime;
+        if (relayMessage.getRelayTime() > 0) {
+          expiryTime = relayMessage.getRelayTime() + 
relayMessage.getExpiryPeriod();
+        }
+
+        if (expiryTime < nextRebalanceTime) {
+          nextRebalanceTime = expiryTime;
+        }
+
         if (!relayMessageMap.containsKey(instance)) {
           relayMessageMap.put(instance, Maps.<String, Message>newHashMap());
         }
-        relayMessageMap.get(instance).put(message.getMsgId(), message);
+        relayMessageMap.get(instance).put(relayMessage.getMsgId(), 
relayMessage);
       }
     }
 
+    if (nextRebalanceTime < Long.MAX_VALUE) {
+      scheduleFuturePipeline(nextRebalanceTime);
+    }
+
     _relayMessageMap = Collections.unmodifiableMap(relayMessageMap);
 
-    // TODO: this is a workaround, remove this once the participants are all 
in 0.8.2,
-    checkCommittedRelayMessages(currentStateMap);
+    long relayCount = 0;
+    // Add valid relay message to the instance message map.
+    for (String instance : _relayMessageMap.keySet()) {
+      Map<String, Message> relayMessages = _relayMessageMap.get(instance);
+      if (!_messageMap.containsKey(instance)) {
+        _messageMap.put(instance, Maps.<String, Message>newHashMap());
+      }
+      _messageMap.get(instance).putAll(relayMessages);
+      relayCount += relayMessages.size();
+    }
 
+    if (LOG.isDebugEnabled()) {
+      if (relayCount > 0) {
+        LOG.debug("Relay message cache size " + relayCount);
+      }
+    }
   }
 
-  // TODO: this is a workaround, once the participants are all in 0.8.2,
-  private void checkCommittedRelayMessages(Map<String, Map<String, Map<String, 
CurrentState>>> currentStateMap) {
-    Iterator<Map.Entry<String, Message>> it = 
_committedRelayMessages.entrySet().iterator();
-    while (it.hasNext()) {
-      Message message = it.next().getValue();
-
-      String resourceName = message.getResourceName();
-      String partitionName = message.getPartitionName();
-      String targetState = message.getToState();
-      String instance = message.getTgtName();
-      String sessionId = message.getTgtSessionId();
-
-      long committedTime = message.getRelayTime();
-      if (committedTime + _commitMessageExpiry < System.currentTimeMillis()) {
-        LOG.info("relay message " + message.getMsgId()
-            + " is expired after committed, remove it from committed message 
cache.");
-        it.remove();
-        continue;
-      }
+  // Schedule a future rebalance pipeline run.
+  private void scheduleFuturePipeline(long rebalanceTime) {
+    GenericHelixController controller = 
GenericHelixController.getController(_clusterName);
+    if (controller != null) {
+      controller.scheduleRebalance(rebalanceTime);
+    } else {
+      LOG.warn(
+          "Failed to schedule a future pipeline run for cluster {} at delay 
{}, helix controller is null.",
+          _clusterName, (rebalanceTime - System.currentTimeMillis()));
+    }
+  }
 
-      Map<String, Map<String, CurrentState>> instanceCurrentStateMap =
-          currentStateMap.get(instance);
-      if (instanceCurrentStateMap == null || 
instanceCurrentStateMap.get(sessionId) == null) {
-        LOG.info(
-            "No sessionCurrentStateMap found, remove it from committed message 
cache." + message
-                .getId());
-        it.remove();
-        continue;
-      }
+  private void setRelayTime(Message relayMessage, Map<String, LiveInstance> 
liveInstanceMap,
+      Map<String, Map<String, Map<String, CurrentState>>> currentStateMap) {
+
+    // relay time already set, avoid to reset it to a later time.
+    if (relayMessage.getRelayTime() > relayMessage.getCreateTimeStamp()) {
+      return;
+    }
+
+    Message hostedMessage = 
_relayHostMessageCache.get(relayMessage.getMsgId());
+    String sessionId = hostedMessage.getTgtSessionId();
+    String instance = hostedMessage.getTgtName();
+    String resourceName = hostedMessage.getResourceName();
+    String instanceSessionId = liveInstanceMap.get(instance).getSessionId();
+
+    long currentTime = System.currentTimeMillis();
+    long expiredTime = currentTime + relayMessage.getExpiryPeriod();
+
+    if (!instanceSessionId.equals(sessionId)) {
+      LOG.debug(
+          "Hosted Instance SessionId {} does not match sessionId {} in hosted 
message , set relay message {} to be expired at {}, hosted message ",
+          instanceSessionId, sessionId, relayMessage.getId(), expiredTime,
+          hostedMessage.getMsgId());
+      relayMessage.setRelayTime(currentTime);
+      return;
+    }
 
-      Map<String, CurrentState> sessionCurrentStateMap = 
instanceCurrentStateMap.get(sessionId);
-      CurrentState currentState = sessionCurrentStateMap.get(resourceName);
-      if (currentState != null && 
targetState.equals(currentState.getState(partitionName))) {
-        LOG.info("CurrentState " + currentState
-            + " match the target state of the relay message, remove it from 
committed message cache."
-            + message.getId());
-        it.remove();
-        continue;
+    Map<String, Map<String, CurrentState>> instanceCurrentStateMap = 
currentStateMap.get(instance);
+    if (instanceCurrentStateMap == null) {
+      LOG.debug(
+          "No instanceCurrentStateMap found for {} on {}, set relay messages 
{} to be expired at {}"
+              + resourceName, instance, relayMessage.getId(), expiredTime);
+      relayMessage.setRelayTime(currentTime);
+      return;
+    }
+
+    Map<String, CurrentState> sessionCurrentStateMap = 
instanceCurrentStateMap.get(sessionId);
+    if (sessionCurrentStateMap == null) {
+      LOG.debug("No sessionCurrentStateMap found, set relay messages {} to be 
expired at {}. ",
+          relayMessage.getId(), expiredTime);
+      relayMessage.setRelayTime(currentTime);
+      return;
+    }
+
+    String partitionName = hostedMessage.getPartitionName();
+    String targetState = hostedMessage.getToState();
+    String fromState = hostedMessage.getFromState();
+
+    CurrentState currentState = sessionCurrentStateMap.get(resourceName);
+    if (currentState == null) {
+      LOG.debug("No currentState found for {} on {}, set relay message {} to 
be expired at {} ",
+          resourceName, instance, relayMessage.getId(),
+          (currentTime + relayMessage.getExpiryPeriod()));
+      relayMessage.setRelayTime(currentTime);
+      return;
+    }
+
+    if (targetState.equals(currentState.getState(partitionName))) {
+      long completeTime = currentState.getEndTime(partitionName);
+      if (completeTime < relayMessage.getCreateTimeStamp()) {
+        completeTime = currentTime;
       }
+      relayMessage.setRelayTime(completeTime);
+      LOG.debug(
+          "Target state match the hosted message's target state, set relay 
message {} relay time at {}.",
+          relayMessage.getId(), completeTime);
+    }
 
-      Map<String, Message> cachedMap = _messageMap.get(message.getTgtName());
-      cachedMap.put(message.getId(), message);
+    if (!fromState.equals(currentState.getState(partitionName))) {
+      LOG.debug(
+          "Current state does not match hosted message's from state, set relay 
message {} relay time at {}.",
+          relayMessage.getId(), currentTime);
+      relayMessage.setRelayTime(currentTime);
     }
   }
 
@@ -367,20 +437,22 @@ public class InstanceMessagesCache {
 
       if (message.hasRelayMessages()) {
         for (Message relayMsg : message.getRelayMessages().values()) {
-          cacheRelayMessage(relayMsg);
+          cacheRelayMessage(relayMsg, message);
         }
       }
     }
   }
 
-  protected void cacheRelayMessage(Message message) {
-    String instanceName = message.getTgtName();
+  private void cacheRelayMessage(Message relayMessage, Message hostMessage) {
+    String instanceName = relayMessage.getTgtName();
     if (!_relayMessageCache.containsKey(instanceName)) {
       _relayMessageCache.put(instanceName, Maps.<String, Message>newHashMap());
     }
-    _relayMessageCache.get(instanceName).put(message.getId(), message);
+    _relayMessageCache.get(instanceName).put(relayMessage.getId(), 
relayMessage);
+    _relayHostMessageCache.put(relayMessage.getMsgId(), hostMessage);
 
-    LOG.info("Add message to relay cache " + message.getMsgId());
+    LOG.info("Add relay message to relay cache " + relayMessage.getMsgId() + 
", hosted message "
+        + hostMessage.getMsgId());
   }
 
   @Override public String toString() {
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index dda4552..a8f2872 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -21,6 +21,7 @@ package org.apache.helix.controller;
 
 import com.google.common.collect.Sets;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -29,10 +30,12 @@ import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import org.I0Itec.zkclient.exception.ZkInterruptedException;
 import org.apache.helix.HelixDataAccessor;
@@ -156,9 +159,16 @@ public class GenericHelixController implements 
IdealStateChangeListener,
    * one resource group has the config to use the timer.
    */
   Timer _periodicalRebalanceTimer = null;
-
   long _timerPeriod = Long.MAX_VALUE;
 
+
+  /**
+   * The timer that triggers the on-demand rebalance pipeline.
+   */
+  Timer _onDemandRebalanceTimer = null;
+  AtomicReference<RebalanceTask> _nextRebalanceTask = new AtomicReference<>();
+
+
   /**
    * A cache maintained across pipelines
    */
@@ -174,6 +184,19 @@ public class GenericHelixController implements 
IdealStateChangeListener,
   private String _clusterName;
   private final Set<Pipeline.Type> _enabledPipelineTypes;
 
+  private HelixManager _helixManager;
+
+  /**
+   * TODO: We should get rid of this once we move to:
+   *  1) ZK callback should go to ClusterDataCache and trigger data cache 
refresh only
+   *  2) then ClusterDataCache.refresh triggers rebalance pipeline.
+   */
+  /* Map of cluster->GenrichelixController */
+  private static Map<String, GenericHelixController> HelixControllerFactory = 
new ConcurrentHashMap<>();
+  public static GenericHelixController getController(String clusterName) {
+    return HelixControllerFactory.get(clusterName);
+  }
+
   /**
    * Default constructor that creates a default pipeline registry. This is 
sufficient in most cases,
    * but if there is a some thing specific needed use another constructor 
where in you can pass a
@@ -198,16 +221,28 @@ public class GenericHelixController implements 
IdealStateChangeListener,
   class RebalanceTask extends TimerTask {
     final HelixManager _manager;
     final ClusterEventType _clusterEventType;
+    private long _nextRebalanceTime;
 
     public RebalanceTask(HelixManager manager, ClusterEventType 
clusterEventType) {
+      this(manager, clusterEventType, -1);
+
+    }
+
+    public RebalanceTask(HelixManager manager, ClusterEventType 
clusterEventType, long nextRebalanceTime) {
       _manager = manager;
       _clusterEventType = clusterEventType;
+      _nextRebalanceTime = nextRebalanceTime;
+    }
+
+    public long getNextRebalanceTime() {
+      return _nextRebalanceTime;
     }
 
     @Override
     public void run() {
       try {
-        if (_clusterEventType.equals(ClusterEventType.PeriodicalRebalance)) {
+        if (_clusterEventType.equals(ClusterEventType.PeriodicalRebalance) || 
_clusterEventType
+            .equals(ClusterEventType.OnDemandRebalance)) {
           requestDataProvidersFullRefresh();
 
           HelixDataAccessor accessor = _manager.getHelixDataAccessor();
@@ -224,7 +259,6 @@ public class GenericHelixController implements 
IdealStateChangeListener,
             }
           }
         }
-
         forceRebalance(_manager, _clusterEventType);
       } catch (Throwable ex) {
         logger.error("Time task failed. Rebalance task type: " + 
_clusterEventType + ", cluster: "
@@ -247,7 +281,7 @@ public class GenericHelixController implements 
IdealStateChangeListener,
     enqueueEvent(_taskEventQueue, event);
     enqueueEvent(_eventQueue, event.clone(uid));
     logger.info(String
-        .format("Controller rebalance event triggered with event type: %s for 
cluster %s",
+        .format("Controller rebalance pipeline triggered with event type: %s 
for cluster %s",
             eventType, _clusterName));
   }
 
@@ -284,6 +318,46 @@ public class GenericHelixController implements 
IdealStateChangeListener,
     }
   }
 
+  /**
+   * schedule a future rebalance pipeline run, delayed at given time.
+   */
+  public void scheduleRebalance(long rebalanceTime) {
+    if (_helixManager == null) {
+      logger.warn(
+          "Failed to schedule a future pipeline run for cluster " + 
_clusterName + " helix manager is null!");
+      return;
+    }
+
+    long current = System.currentTimeMillis();
+    long delay = rebalanceTime - current;
+
+    if (rebalanceTime > current) {
+      if (_onDemandRebalanceTimer == null) {
+        _onDemandRebalanceTimer = new Timer(true);
+      }
+
+      RebalanceTask preTask = _nextRebalanceTask.get();
+      if (preTask != null && preTask.getNextRebalanceTime() > current
+          && preTask.getNextRebalanceTime() < rebalanceTime) {
+        // already have a earlier rebalance scheduled, no need to schedule 
again.
+        return;
+      }
+
+      RebalanceTask newTask =
+          new RebalanceTask(_helixManager, ClusterEventType.OnDemandRebalance, 
rebalanceTime);
+
+      _onDemandRebalanceTimer.schedule(newTask, delay);
+      logger.info(
+          "Scheduled a future pipeline run for cluster " + 
_helixManager.getClusterName() + " in delay "
+              + delay);
+
+      preTask = _nextRebalanceTask.getAndSet(newTask);
+      if (preTask != null) {
+        preTask.cancel();
+      }
+    }
+  }
+
   private static PipelineRegistry createDefaultRegistry(String pipelineName) {
     logger.info("createDefaultRegistry");
     synchronized (GenericHelixController.class) {
@@ -335,6 +409,7 @@ public class GenericHelixController implements 
IdealStateChangeListener,
       registry.register(ClusterEventType.MessageChange, dataRefresh, 
dataPreprocess, rebalancePipeline);
       registry.register(ClusterEventType.Resume, dataRefresh, dataPreprocess, 
externalViewPipeline, rebalancePipeline);
       registry.register(ClusterEventType.PeriodicalRebalance, dataRefresh, 
autoExitMaintenancePipeline, dataPreprocess, externalViewPipeline, 
rebalancePipeline);
+      registry.register(ClusterEventType.OnDemandRebalance, dataRefresh, 
autoExitMaintenancePipeline, dataPreprocess, externalViewPipeline, 
rebalancePipeline);
       return registry;
     }
   }
@@ -442,6 +517,10 @@ public class GenericHelixController implements 
IdealStateChangeListener,
       _taskEventQueue = null;
       _taskEventThread = null;
     }
+
+    if (clusterName != null) {
+      HelixControllerFactory.put(clusterName, this);
+    }
   }
 
   private void initializeAsyncFIFOWorkers() {
@@ -493,6 +572,8 @@ public class GenericHelixController implements 
IdealStateChangeListener,
       return;
     }
 
+    _helixManager = manager;
+
     // TODO If init controller with paused = true, it may not take effect 
immediately
     // _paused is default false. If any events come before 
controllerChangeEvent, the controller
     // will be excuting in un-paused mode. Which might not be the config in ZK.
@@ -540,7 +621,7 @@ public class GenericHelixController implements 
IdealStateChangeListener,
           .getPipelinesForEvent(event.getEventType());
       isTaskFrameworkPipeline = true;
     } else {
-      logger.info(String
+      logger.warn(String
           .format("No %s pipeline to run for event: %s::%s", 
dataProvider.getPipelineName(),
               event.getEventType(), event.getEventId()));
       return;
@@ -561,7 +642,7 @@ public class GenericHelixController implements 
IdealStateChangeListener,
       } catch (Exception e) {
         logger.error(
             "Exception while executing {} pipeline: {} for cluster {}. Will 
not continue to next pipeline",
-            dataProvider.getPipelineName(), _clusterName, e);
+            dataProvider.getPipelineName(), _clusterName, 
Arrays.toString(e.getStackTrace()));
 
         if (e instanceof HelixMetaDataAccessException) {
           rebalanceFail = true;
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
index 84013ba..2ab4526 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
@@ -108,6 +108,7 @@ public class ResourceControllerDataProvider extends 
BaseControllerDataProvider {
 
   public synchronized void refresh(HelixDataAccessor accessor) {
     long startTime = System.currentTimeMillis();
+
     // Invalidate cached information
     if (_propertyDataChangedMap.get(HelixConstants.ChangeType.IDEAL_STATE)
         || _propertyDataChangedMap.get(HelixConstants.ChangeType.LIVE_INSTANCE)
@@ -123,8 +124,8 @@ public class ResourceControllerDataProvider extends 
BaseControllerDataProvider {
     refreshExternalViews(accessor);
     refreshTargetExternalViews(accessor);
     LogUtil.logInfo(logger, getClusterEventId(), String.format(
-        "END: WorkflowControllerDataProvider.refresh() for cluster %s took %s 
for %s pipeline",
-        getClusterName(), System.currentTimeMillis() - startTime, 
getPipelineName()));
+        "END: ResourceControllerDataProvider.refresh() for cluster %s, started 
at %d took %d for %s pipeline",
+        getClusterName(), startTime, System.currentTimeMillis() - startTime, 
getPipelineName()));
     dumpDebugInfo();
   }
 
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
index 3416e22..2eee09e 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
@@ -104,8 +104,8 @@ public class WorkflowControllerDataProvider extends 
BaseControllerDataProvider {
 
     long duration = System.currentTimeMillis() - startTime;
     LogUtil.logInfo(logger, getClusterEventId(), String.format(
-        "END: WorkflowControllerDataProvider.refresh() for cluster %s took %s 
for %s pipeline",
-        getClusterName(), duration, getPipelineName()));
+        "END: WorkflowControllerDataProvider.refresh() for cluster %s, started 
at %d took %d for %s pipeline",
+        getClusterName(), startTime, duration, getPipelineName()));
     dumpDebugInfo();
   }
 
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java
index a65e9f0..b09c297 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java
@@ -105,7 +105,7 @@ public class ClusterEvent {
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();
-    sb.append(String.format("Event id : %s", _eventId.toString()));
+    sb.append(String.format("Event id : %s", _eventId));
     sb.append("name:" + _eventType.name()).append("\n");
     for (String key : _eventAttributeMap.keySet()) {
       
sb.append(key).append(":").append(_eventAttributeMap.get(key)).append("\n");
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
index 5312c35..7e9ab15 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
@@ -32,6 +32,7 @@ public enum ClusterEventType {
   TargetExternalViewChange,
   Resume,
   PeriodicalRebalance,
+  OnDemandRebalance,
   RetryRebalance,
   StateVerifier,
   Unknown
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index d266c7d..2894a49 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -207,21 +207,21 @@ public abstract class MessageGenerationPhase extends 
AbstractBaseStage {
           if (pendingMessage != null) {
             String pendingState = pendingMessage.getToState();
             if (nextState.equalsIgnoreCase(pendingState)) {
-              LogUtil.logDebug(logger, _eventId,
+              LogUtil.logInfo(logger, _eventId,
                   "Message already exists for " + instanceName + " to transit 
" + resource
                       .getResourceName() + "." + partition.getPartitionName() 
+ " from "
-                      + currentState + " to " + nextState);
+                      + currentState + " to " + nextState + ", isRelay: " + 
pendingMessage.isRelayMessage());
             } else if (currentState.equalsIgnoreCase(pendingState)) {
               LogUtil.logInfo(logger, _eventId,
                   "Message hasn't been removed for " + instanceName + " to 
transit " + resource
                       .getResourceName() + "." + partition.getPartitionName() 
+ " to "
-                      + pendingState + ", desiredState: " + desiredState);
+                      + pendingState + ", desiredState: " + desiredState + ", 
isRelay: " + pendingMessage.isRelayMessage());
             } else {
               LogUtil.logInfo(logger, _eventId,
                   "IdealState changed before state transition completes for " 
+ resource
                       .getResourceName() + "." + partition.getPartitionName() 
+ " on "
                       + instanceName + ", pendingState: " + pendingState + ", 
currentState: "
-                      + currentState + ", nextState: " + nextState);
+                      + currentState + ", nextState: " + nextState + ", 
isRelay: " + pendingMessage.isRelayMessage());
 
               message = createStateTransitionCancellationMessage(manager, 
resource,
                   partition.getPartitionName(), instanceName, 
sessionIdMap.get(instanceName),
diff --git 
a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
 
b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
index 898bd63..77cbcca 100644
--- 
a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
+++ 
b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
@@ -196,6 +196,7 @@ public class BestPossibleExternalViewVerifier extends 
ZkHelixClusterVerifier {
 
       _dataProvider.requireFullRefresh();
       _dataProvider.refresh(_accessor);
+      _dataProvider.setClusterEventId("ClusterStateVerifier");
 
       Map<String, IdealState> idealStates = new 
HashMap<>(_dataProvider.getIdealStates());
 
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestEnableCompression.java
 
b/helix-core/src/test/java/org/apache/helix/integration/TestEnableCompression.java
index daf24fb..cde62ae 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/TestEnableCompression.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestEnableCompression.java
@@ -110,7 +110,7 @@ public class TestEnableCompression extends ZkTestBase {
     Assert.assertTrue(result);
 
     List<String> compressedPaths = new ArrayList<String>();
-    findCompressedZNodes(zkClient, "/", compressedPaths);
+    findCompressedZNodes(zkClient, "/" + clusterName, compressedPaths);
 
     System.out.println("compressed paths:" + compressedPaths);
     // ONLY IDEALSTATE and EXTERNAL VIEW must be compressed
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java
 
b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java
index 92dfa13..9dad564 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java
@@ -186,7 +186,7 @@ public class TestP2PMessageSemiAuto extends ZkTestBase {
   }
 
   private void verifyP2PMessage(String dbName, String instance, String 
expectedState, String expectedTriggerHost) {
-    verifyP2PMessage(dbName, instance, expectedState, expectedTriggerHost, 
0.7);
+    verifyP2PMessage(dbName, instance, expectedState, expectedTriggerHost, 
0.6);
   }
 
   private void verifyP2PMessage(String dbName, String instance, String 
expectedState, String expectedTriggerHost, double expectedRatio) {
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PSingleTopState.java
 
b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PSingleTopState.java
new file mode 100644
index 0000000..a1b37b2
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PSingleTopState.java
@@ -0,0 +1,214 @@
+package org.apache.helix.integration.messaging;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.common.ZkTestBase;
+import 
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
+import org.apache.helix.integration.DelayedTransitionBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.mock.participant.MockTransition;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.MasterSlaveSMD;
+import org.apache.helix.model.Message;
+import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestP2PSingleTopState extends ZkTestBase {
+  private static Logger logger = 
LoggerFactory.getLogger(TestP2PSingleTopState.class);
+
+  final String CLASS_NAME = getShortClassName();
+  final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+
+  static final int PARTICIPANT_NUMBER = 24;
+  static final int PARTICIPANT_START_PORT = 12918;
+
+  static final int DB_COUNT = 2;
+
+  static final int PARTITION_NUMBER = 50;
+  static final int REPLICA_NUMBER = 3;
+
+  final String _controllerName = CONTROLLER_PREFIX + "_0";
+
+  List<MockParticipantManager> _participants = new ArrayList<>();
+  List<String> _instances = new ArrayList<>();
+  ClusterControllerManager _controller;
+
+  ZkHelixClusterVerifier _clusterVerifier;
+  ConfigAccessor _configAccessor;
+  HelixDataAccessor _accessor;
+
+  @BeforeClass
+  public void beforeClass() {
+    System.out
+        .println("START " + getShortClassName() + " at " + new 
Date(System.currentTimeMillis()));
+
+    // setup storage cluster
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
+
+    for (int i = 0; i < PARTICIPANT_NUMBER / 2; i++) {
+      String instance = PARTICIPANT_PREFIX + "_" + (PARTICIPANT_START_PORT + 
i);
+      _gSetupTool.addInstanceToCluster(CLUSTER_NAME, instance);
+      _instances.add(instance);
+    }
+
+    // start dummy participants
+    for (int i = 0; i < PARTICIPANT_NUMBER / 2; i++) {
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, _instances.get(i));
+      participant.setTransition(new DelayedTransitionBase(100));
+      participant.syncStart();
+      participant.setTransition(new 
TestTransition(participant.getInstanceName()));
+      _participants.add(participant);
+    }
+
+    _configAccessor = new ConfigAccessor(_gZkClient);
+    _accessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
+
+    enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true, 1000000);
+    //enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, false);
+    enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+    enableP2PInCluster(CLUSTER_NAME, _configAccessor, true);
+
+    // start controller
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, 
_controllerName);
+    _controller.syncStart();
+
+    for (int i = 0; i < DB_COUNT; i++) {
+      createResourceWithDelayedRebalance(CLUSTER_NAME, "TestDB_" + i,
+          BuiltInStateModelDefinitions.MasterSlave.name(), PARTITION_NUMBER, 
REPLICA_NUMBER,
+          REPLICA_NUMBER - 1, 1000000L, 
CrushEdRebalanceStrategy.class.getName());
+    }
+
+    _clusterVerifier =
+        new 
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient).build();
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    _controller.syncStop();
+    for (MockParticipantManager p : _participants) {
+      if (p.isConnected()) {
+        p.syncStop();
+      }
+    }
+    deleteCluster(CLUSTER_NAME);
+    System.out.println("END " + CLASS_NAME + " at " + new 
Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testRollingUpgrade() throws InterruptedException {
+    // rolling upgrade the cluster
+    for (String ins : _instances) {
+      System.out.println("Disable " + ins);
+      _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, ins, 
false);
+      Thread.sleep (1000);
+      Assert.assertTrue(_clusterVerifier.verifyByPolling());
+      System.out.println("Enable " + ins);
+      _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, ins, 
true);
+      Thread.sleep(1000);
+      Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    }
+
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    
Assert.assertFalse(TestTransition.duplicatedPartitionsSnapshot.keys().hasMoreElements());
+  }
+
+  @Test
+  public void testAddInstances() throws InterruptedException {
+    for (int i = PARTICIPANT_NUMBER / 2; i < PARTICIPANT_NUMBER; i++) {
+      String instance = PARTICIPANT_PREFIX + "_" + (PARTICIPANT_START_PORT + 
i);
+      _gSetupTool.addInstanceToCluster(CLUSTER_NAME, instance);
+      _instances.add(instance);
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, _instances.get(i));
+      participant.setTransition(new DelayedTransitionBase(100));
+      participant.syncStart();
+      participant.setTransition(new 
TestTransition(participant.getInstanceName()));
+      _participants.add(participant);
+      Thread.sleep(100);
+    }
+
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    
Assert.assertFalse(TestTransition.duplicatedPartitionsSnapshot.keys().hasMoreElements());
+  }
+
+  static class TestTransition extends MockTransition {
+    // resource.partition that having top state.
+    static ConcurrentHashMap<String, Map<String, String>> 
duplicatedPartitionsSnapshot =
+        new ConcurrentHashMap<>();
+    static ConcurrentHashMap<String, Map<String, String>> ExternalViews = new 
ConcurrentHashMap<>();
+    static AtomicLong totalToMaster = new AtomicLong();
+    static AtomicLong totalRelayMessage = new AtomicLong();
+
+    String _instanceName;
+
+    public TestTransition(String instanceName) {
+      _instanceName = instanceName;
+    }
+
+    public void doTransition(Message message, NotificationContext context) {
+      String to = message.getToState();
+      String resource = message.getResourceName();
+      String partition = message.getPartitionName();
+      String key = resource + "." + partition;
+
+      if (to.equals(MasterSlaveSMD.States.MASTER.name())) {
+        Map<String, String> mapFields = new HashMap<>(ExternalViews.get(key));
+        if (mapFields.values().contains(MasterSlaveSMD.States.MASTER.name())) {
+          Map<String, String> newMapFile = new HashMap<>(mapFields);
+          newMapFile.put(_instanceName, to);
+          duplicatedPartitionsSnapshot.put(key, newMapFile);
+        }
+
+        totalToMaster.incrementAndGet();
+        if (message.isRelayMessage()) {
+          totalRelayMessage.incrementAndGet();
+        }
+      }
+
+      ExternalViews.putIfAbsent(key, new ConcurrentHashMap<String, String>());
+      if (to.equalsIgnoreCase("DROPPED")) {
+        ExternalViews.get(key).remove(_instanceName);
+      } else {
+        ExternalViews.get(key).put(_instanceName, to);
+      }
+    }
+  }
+}
+
diff --git 
a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java 
b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
index 983a6a3..5fcb307 100644
--- 
a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
+++ 
b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
@@ -51,6 +51,8 @@ import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.messaging.DefaultMessagingService;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.task.TaskConstants;
+import org.testng.collections.Lists;
 
 public class MockZKHelixManager implements HelixManager {
   private final ZKHelixDataAccessor _accessor;
@@ -281,7 +283,8 @@ public class MockZKHelixManager implements HelixManager {
   @Override
   public ZkHelixPropertyStore<ZNRecord> getHelixPropertyStore() {
     // TODO Auto-generated method stub
-    return null;
+    return new ZkHelixPropertyStore<>(
+        (ZkBaseDataAccessor<ZNRecord>) _accessor.getBaseDataAccessor(), 
TaskConstants.REBALANCER_CONTEXT_ROOT, Lists.<String>newArrayList());
   }
 
   @Override

Reply via email to