Temorary workaround to fix P2P race-condition for old helix participant (0.8.1 
or older).


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/74145e8a
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/74145e8a
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/74145e8a

Branch: refs/heads/master
Commit: 74145e8ad3b34753186d53526bab825de4432c31
Parents: 880f885
Author: Lei Xia <[email protected]>
Authored: Fri Jul 27 17:28:17 2018 -0700
Committer: Lei Xia <[email protected]>
Committed: Mon Sep 17 15:17:26 2018 -0700

----------------------------------------------------------------------
 .../common/caches/InstanceMessagesCache.java    | 98 ++++++++++++++++++--
 .../stages/CurrentStateComputationStage.java    |  9 +-
 .../controller/stages/TaskAssignmentStage.java  | 18 +++-
 .../messaging/handling/HelixTaskExecutor.java   | 12 ++-
 .../messaging/TestP2PNoDuplicatedMessage.java   | 13 ++-
 5 files changed, 120 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/74145e8a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
 
b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
index 13b77cc..9fa9136 100644
--- 
a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
+++ 
b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
@@ -34,6 +34,7 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
+import org.apache.helix.util.HelixUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,10 +54,22 @@ public class InstanceMessagesCache {
   // <instance -> {<MessageId, Message>}>
   private Map<String, Map<String, Message>> _relayMessageCache = 
Maps.newHashMap();
 
+
+  // TODO: Temporary workaround to void participant receiving duplicated state 
transition messages when p2p is enable.
+  // should remove this once all clients are migrated to 0.8.2. -- Lei
+  private Map<String, Message> _committedRelayMessages = Maps.newHashMap();
+
+  public static final String COMMIT_MESSAGE_EXPIRY_CONFIG = 
"helix.controller.messagecache.commitmessageexpiry";
+
+  private static final int DEFAULT_COMMIT_RELAY_MESSAGE_EXPIRY = 20 * 1000; // 
20 seconds
+  private final int _commitMessageExpiry;
+
   private String _clusterName;
 
   public InstanceMessagesCache(String clusterName) {
     _clusterName = clusterName;
+    _commitMessageExpiry = HelixUtil
+        .getSystemPropertyAsInt(COMMIT_MESSAGE_EXPIRY_CONFIG, 
DEFAULT_COMMIT_RELAY_MESSAGE_EXPIRY);
   }
 
   /**
@@ -203,14 +216,27 @@ public class InstanceMessagesCache {
         String targetState = message.getToState();
         String instanceSessionId = 
liveInstanceMap.get(instance).getSessionId();
 
-        if (_messageMap.get(instance).containsKey(message.getMsgId())) {
-          // relay message has already been sent to target host
-          // remove the message from relayMessageCache.
-          LOG.info(
-              "Relay message has already been sent to target host, remove 
relay message from the cache"
-                  + message.getId());
-          iterator.remove();
-          continue;
+        Map<String, Message> instanceMsgMap = _messageMap.get(instance);
+
+        if (instanceMsgMap != null && 
instanceMsgMap.containsKey(message.getMsgId())) {
+          Message commitMessage = instanceMsgMap.get(message.getMsgId());
+
+          if (!commitMessage.isRelayMessage()) {
+            LOG.info(
+                "Controller already sent the message to the target host, 
remove relay message from the cache"
+                    + message.getId());
+            iterator.remove();
+            _committedRelayMessages.remove(message.getMsgId());
+            continue;
+          } else {
+            // relay message has already been sent to target host
+            // remember when the relay messages get relayed to the target host.
+            if (!_committedRelayMessages.containsKey(message.getMsgId())) {
+              message.setRelayTime(System.currentTimeMillis());
+              _committedRelayMessages.put(message.getMsgId(), message);
+              LOG.info("Put message into committed relay messages " + 
message.getId());
+            }
+          }
         }
 
         if (!instanceSessionId.equals(sessionId)) {
@@ -227,6 +253,7 @@ public class InstanceMessagesCache {
               .getId());
           continue;
         }
+
         CurrentState currentState = sessionCurrentStateMap.get(resourceName);
         if (currentState != null && 
targetState.equals(currentState.getState(partitionName))) {
           LOG.info("CurrentState " + currentState
@@ -237,8 +264,8 @@ public class InstanceMessagesCache {
         }
 
         if (message.isExpired()) {
-          LOG.info("relay message " + message.getId() + " expired, remove it 
from cache."
-              + message.getId());
+          LOG.error("relay message has not been sent " + message.getId()
+              + " expired, remove it from cache. relay time: " + 
message.getRelayTime());
           iterator.remove();
           continue;
         }
@@ -251,6 +278,55 @@ public class InstanceMessagesCache {
     }
 
     _relayMessageMap = Collections.unmodifiableMap(relayMessageMap);
+
+    // TODO: this is a workaround, remove this once the participants are all 
in 0.8.2,
+    checkCommittedRelayMessages(currentStateMap);
+
+  }
+
+  // TODO: this is a workaround, once the participants are all in 0.8.2,
+  private void checkCommittedRelayMessages(Map<String, Map<String, Map<String, 
CurrentState>>> currentStateMap) {
+    Iterator<Map.Entry<String, Message>> it = 
_committedRelayMessages.entrySet().iterator();
+    while (it.hasNext()) {
+      Message message = it.next().getValue();
+
+      String resourceName = message.getResourceName();
+      String partitionName = message.getPartitionName();
+      String targetState = message.getToState();
+      String instance = message.getTgtName();
+      String sessionId = message.getTgtSessionId();
+
+      long committedTime = message.getRelayTime();
+      if (committedTime + _commitMessageExpiry < System.currentTimeMillis()) {
+        LOG.info("relay message " + message.getMsgId()
+            + " is expired after committed, remove it from committed message 
cache.");
+        it.remove();
+        continue;
+      }
+
+      Map<String, Map<String, CurrentState>> instanceCurrentStateMap =
+          currentStateMap.get(instance);
+      if (instanceCurrentStateMap == null || 
instanceCurrentStateMap.get(sessionId) == null) {
+        LOG.info(
+            "No sessionCurrentStateMap found, remove it from committed message 
cache." + message
+                .getId());
+        it.remove();
+        continue;
+      }
+
+      Map<String, CurrentState> sessionCurrentStateMap = 
instanceCurrentStateMap.get(sessionId);
+      CurrentState currentState = sessionCurrentStateMap.get(resourceName);
+      if (currentState != null && 
targetState.equals(currentState.getState(partitionName))) {
+        LOG.info("CurrentState " + currentState
+            + " match the target state of the relay message, remove it from 
committed message cache."
+            + message.getId());
+        it.remove();
+        continue;
+      }
+
+      Map<String, Message> cachedMap = _messageMap.get(message.getTgtName());
+      cachedMap.put(message.getId(), message);
+    }
   }
 
   /**
@@ -303,6 +379,8 @@ public class InstanceMessagesCache {
       _relayMessageCache.put(instanceName, Maps.<String, Message>newHashMap());
     }
     _relayMessageCache.get(instanceName).put(message.getId(), message);
+
+    LOG.info("Add message to relay cache " + message.getMsgId());
   }
 
   @Override public String toString() {

http://git-wip-us.apache.org/repos/asf/helix/blob/74145e8a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 9cc9506..340a051 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -164,8 +164,8 @@ public class CurrentStateComputationStage extends 
AbstractBaseStage {
               message.getMsgId(), resourceName, message.getPartitionName()));
         }
       } else {
-        LogUtil.logWarn(LOG, _eventId, String
-            .format("A relay message %s should not be batched, ignored!", 
message.getMsgId()));
+        LogUtil.logWarn(LOG, _eventId,
+            String.format("A relay message %s should not be batched, 
ignored!", message.getMsgId()));
       }
     }
   }
@@ -275,8 +275,9 @@ public class CurrentStateComputationStage extends 
AbstractBaseStage {
     // Check whether it is already passed threshold
     for (String resourceName : missingTopStateMap.keySet()) {
       for (String partitionName : 
missingTopStateMap.get(resourceName).keySet()) {
-        long startTime = 
missingTopStateMap.get(resourceName).get(partitionName);
-        if (startTime > 0 && System.currentTimeMillis() - startTime > 
durationThreshold) {
+        Long startTime = 
missingTopStateMap.get(resourceName).get(partitionName);
+        if (startTime != null && startTime > 0
+            && System.currentTimeMillis() - startTime > durationThreshold) {
           missingTopStateMap.get(resourceName).put(partitionName, 
TRANSITION_FAILED);
           if (clusterStatusMonitor != null) {
             
clusterStatusMonitor.updateMissingTopStateDurationStats(resourceName, 0L, 
false);

http://git-wip-us.apache.org/repos/asf/helix/blob/74145e8a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
index 6036f34..6d483a0 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
@@ -74,7 +74,8 @@ public class TaskAssignmentStage extends AbstractBaseStage {
     List<Message> outputMessages =
         batchMessage(dataAccessor.keyBuilder(), messagesToSend, resourceMap, 
liveInstanceMap,
             manager.getProperties());
-    sendMessages(dataAccessor, outputMessages);
+
+    List<Message> messagesSent = sendMessages(dataAccessor, outputMessages);
     // TODO: Need also count messages from task rebalancer
     if (!cache.isTaskCache()) {
       ClusterStatusMonitor clusterStatusMonitor =
@@ -84,7 +85,7 @@ public class TaskAssignmentStage extends AbstractBaseStage {
       }
     }
     long cacheStart = System.currentTimeMillis();
-    cache.cacheMessages(outputMessages);
+    cache.cacheMessages(messagesSent);
     long cacheEnd = System.currentTimeMillis();
     LogUtil.logDebug(logger, _eventId, "Caching messages took " + (cacheEnd - 
cacheStart) + " ms");
   }
@@ -132,9 +133,11 @@ public class TaskAssignmentStage extends AbstractBaseStage 
{
     return outputMessages;
   }
 
-  protected void sendMessages(HelixDataAccessor dataAccessor, List<Message> 
messages) {
+  // return the messages actually sent
+  protected List<Message> sendMessages(HelixDataAccessor dataAccessor, 
List<Message> messages) {
+    List<Message> messageSent = new ArrayList<>();
     if (messages == null || messages.isEmpty()) {
-      return;
+      return messageSent;
     }
 
     Builder keyBuilder = dataAccessor.keyBuilder();
@@ -146,6 +149,7 @@ public class TaskAssignmentStage extends AbstractBaseStage {
               + message.getResourceName() + "." + message.getPartitionName() + 
"|" + message
               .getPartitionNames() + " from:" + message.getFromState() + " 
to:" + message
               .getToState() + ", relayMessages: " + 
message.getRelayMessages().size());
+
       if (message.hasRelayMessages()) {
         for (Message msg : message.getRelayMessages().values()) {
           LogUtil.logInfo(logger, _eventId,
@@ -163,8 +167,12 @@ public class TaskAssignmentStage extends AbstractBaseStage 
{
     boolean[] results = dataAccessor.createChildren(keys, new 
ArrayList<>(messages));
     for (int i = 0; i < results.length; i++) {
       if (!results[i]) {
-        LogUtil.logWarn(logger, _eventId, "Failed to send message: " + 
keys.get(i));
+        LogUtil.logError(logger, _eventId, "Failed to send message: " + 
keys.get(i));
+      } else {
+        messageSent.add(messages.get(i));
       }
     }
+
+    return messageSent;
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/74145e8a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 925f127..5e2082c 100644
--- 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -874,12 +874,16 @@ public class HelixTaskExecutor implements 
MessageListener, TaskExecutor {
           } else if 
(message.getMsgType().equals(MessageType.STATE_TRANSITION.name())
               && isStateTransitionInProgress(messageTarget)) {
 
+            String taskId = _messageTaskMap.get(messageTarget);
+            Message msg = _taskMap.get(taskId).getTask().getMessage();
+
             // If there is another state transition for same partition is 
going on,
             // discard the message. Controller will resend if this is a valid 
message
-            throw new HelixException(String
-                .format("Another state transition for %s:%s is in progress. 
Discarding %s->%s message",
-                    message.getResourceName(), message.getPartitionName(), 
message.getFromState(),
-                    message.getToState()));
+            throw new HelixException(String.format(
+                "Another state transition for %s:%s is in progress with msg: 
%s, p2p: %s, read: %d, current:%d. Discarding %s->%s message",
+                message.getResourceName(), message.getPartitionName(), 
msg.getMsgId(), String.valueOf(msg.isRelayMessage()),
+                msg.getReadTimeStamp(), System.currentTimeMillis(), 
message.getFromState(),
+                message.getToState()));
           }
 
           stateTransitionHandlers

http://git-wip-us.apache.org/repos/asf/helix/blob/74145e8a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
 
b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
index b3ef3e5..bf7f566 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
@@ -80,8 +80,7 @@ public class TestP2PNoDuplicatedMessage extends ZkTestBase {
   HelixDataAccessor _accessor;
 
   @BeforeClass
-  public void beforeClass()
-      throws InterruptedException {
+  public void beforeClass() {
     System.out.println(
         "START " + getShortClassName() + " at " + new 
Date(System.currentTimeMillis()));
 
@@ -136,7 +135,7 @@ public class TestP2PNoDuplicatedMessage extends ZkTestBase {
   }
 
   @Test
-  public void testP2PStateTransitionDisabled() throws InterruptedException {
+  public void testP2PStateTransitionDisabled() {
     enableP2PInCluster(CLUSTER_NAME, _configAccessor, false);
 
     MockHelixTaskExecutor.resetStats();
@@ -158,7 +157,7 @@ public class TestP2PNoDuplicatedMessage extends ZkTestBase {
   }
 
   @Test (dependsOnMethods = {"testP2PStateTransitionDisabled"})
-  public void testP2PStateTransitionEnabled() throws InterruptedException {
+  public void testP2PStateTransitionEnabled() {
     enableP2PInCluster(CLUSTER_NAME, _configAccessor, true);
     long startTime = System.currentTimeMillis();
     MockHelixTaskExecutor.resetStats();
@@ -174,9 +173,9 @@ public class TestP2PNoDuplicatedMessage extends ZkTestBase {
     }
 
     double ratio = ((double) p2pTrigged) / ((double) total);
-    Assert.assertTrue(ratio > 0.7, String
-        .format("Only %d out of %d percent transitions to Master were 
triggered by expected host!",
-            p2pTrigged, total));
+    Assert.assertTrue(ratio > 0.6, String
+       .format("Only %d out of %d percent transitions to Master were triggered 
by expected host!",
+           p2pTrigged, total));
 
     Assert.assertEquals(MockHelixTaskExecutor.duplicatedMessagesInProgress, 0,
         "There are duplicated transition messages sent while participant is 
handling the state-transition!");

Reply via email to