Temorary workaround to fix P2P race-condition for old helix participant (0.8.1 or older).
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/74145e8a Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/74145e8a Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/74145e8a Branch: refs/heads/master Commit: 74145e8ad3b34753186d53526bab825de4432c31 Parents: 880f885 Author: Lei Xia <[email protected]> Authored: Fri Jul 27 17:28:17 2018 -0700 Committer: Lei Xia <[email protected]> Committed: Mon Sep 17 15:17:26 2018 -0700 ---------------------------------------------------------------------- .../common/caches/InstanceMessagesCache.java | 98 ++++++++++++++++++-- .../stages/CurrentStateComputationStage.java | 9 +- .../controller/stages/TaskAssignmentStage.java | 18 +++- .../messaging/handling/HelixTaskExecutor.java | 12 ++- .../messaging/TestP2PNoDuplicatedMessage.java | 13 ++- 5 files changed, 120 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/74145e8a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java index 13b77cc..9fa9136 100644 --- a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java +++ b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java @@ -34,6 +34,7 @@ import org.apache.helix.PropertyKey; import org.apache.helix.model.CurrentState; import org.apache.helix.model.LiveInstance; import org.apache.helix.model.Message; +import org.apache.helix.util.HelixUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,10 +54,22 @@ public class InstanceMessagesCache { // <instance -> {<MessageId, Message>}> private Map<String, Map<String, Message>> _relayMessageCache = Maps.newHashMap(); + + // TODO: Temporary workaround to void participant receiving duplicated state transition messages when p2p is enable. + // should remove this once all clients are migrated to 0.8.2. -- Lei + private Map<String, Message> _committedRelayMessages = Maps.newHashMap(); + + public static final String COMMIT_MESSAGE_EXPIRY_CONFIG = "helix.controller.messagecache.commitmessageexpiry"; + + private static final int DEFAULT_COMMIT_RELAY_MESSAGE_EXPIRY = 20 * 1000; // 20 seconds + private final int _commitMessageExpiry; + private String _clusterName; public InstanceMessagesCache(String clusterName) { _clusterName = clusterName; + _commitMessageExpiry = HelixUtil + .getSystemPropertyAsInt(COMMIT_MESSAGE_EXPIRY_CONFIG, DEFAULT_COMMIT_RELAY_MESSAGE_EXPIRY); } /** @@ -203,14 +216,27 @@ public class InstanceMessagesCache { String targetState = message.getToState(); String instanceSessionId = liveInstanceMap.get(instance).getSessionId(); - if (_messageMap.get(instance).containsKey(message.getMsgId())) { - // relay message has already been sent to target host - // remove the message from relayMessageCache. - LOG.info( - "Relay message has already been sent to target host, remove relay message from the cache" - + message.getId()); - iterator.remove(); - continue; + Map<String, Message> instanceMsgMap = _messageMap.get(instance); + + if (instanceMsgMap != null && instanceMsgMap.containsKey(message.getMsgId())) { + Message commitMessage = instanceMsgMap.get(message.getMsgId()); + + if (!commitMessage.isRelayMessage()) { + LOG.info( + "Controller already sent the message to the target host, remove relay message from the cache" + + message.getId()); + iterator.remove(); + _committedRelayMessages.remove(message.getMsgId()); + continue; + } else { + // relay message has already been sent to target host + // remember when the relay messages get relayed to the target host. + if (!_committedRelayMessages.containsKey(message.getMsgId())) { + message.setRelayTime(System.currentTimeMillis()); + _committedRelayMessages.put(message.getMsgId(), message); + LOG.info("Put message into committed relay messages " + message.getId()); + } + } } if (!instanceSessionId.equals(sessionId)) { @@ -227,6 +253,7 @@ public class InstanceMessagesCache { .getId()); continue; } + CurrentState currentState = sessionCurrentStateMap.get(resourceName); if (currentState != null && targetState.equals(currentState.getState(partitionName))) { LOG.info("CurrentState " + currentState @@ -237,8 +264,8 @@ public class InstanceMessagesCache { } if (message.isExpired()) { - LOG.info("relay message " + message.getId() + " expired, remove it from cache." - + message.getId()); + LOG.error("relay message has not been sent " + message.getId() + + " expired, remove it from cache. relay time: " + message.getRelayTime()); iterator.remove(); continue; } @@ -251,6 +278,55 @@ public class InstanceMessagesCache { } _relayMessageMap = Collections.unmodifiableMap(relayMessageMap); + + // TODO: this is a workaround, remove this once the participants are all in 0.8.2, + checkCommittedRelayMessages(currentStateMap); + + } + + // TODO: this is a workaround, once the participants are all in 0.8.2, + private void checkCommittedRelayMessages(Map<String, Map<String, Map<String, CurrentState>>> currentStateMap) { + Iterator<Map.Entry<String, Message>> it = _committedRelayMessages.entrySet().iterator(); + while (it.hasNext()) { + Message message = it.next().getValue(); + + String resourceName = message.getResourceName(); + String partitionName = message.getPartitionName(); + String targetState = message.getToState(); + String instance = message.getTgtName(); + String sessionId = message.getTgtSessionId(); + + long committedTime = message.getRelayTime(); + if (committedTime + _commitMessageExpiry < System.currentTimeMillis()) { + LOG.info("relay message " + message.getMsgId() + + " is expired after committed, remove it from committed message cache."); + it.remove(); + continue; + } + + Map<String, Map<String, CurrentState>> instanceCurrentStateMap = + currentStateMap.get(instance); + if (instanceCurrentStateMap == null || instanceCurrentStateMap.get(sessionId) == null) { + LOG.info( + "No sessionCurrentStateMap found, remove it from committed message cache." + message + .getId()); + it.remove(); + continue; + } + + Map<String, CurrentState> sessionCurrentStateMap = instanceCurrentStateMap.get(sessionId); + CurrentState currentState = sessionCurrentStateMap.get(resourceName); + if (currentState != null && targetState.equals(currentState.getState(partitionName))) { + LOG.info("CurrentState " + currentState + + " match the target state of the relay message, remove it from committed message cache." + + message.getId()); + it.remove(); + continue; + } + + Map<String, Message> cachedMap = _messageMap.get(message.getTgtName()); + cachedMap.put(message.getId(), message); + } } /** @@ -303,6 +379,8 @@ public class InstanceMessagesCache { _relayMessageCache.put(instanceName, Maps.<String, Message>newHashMap()); } _relayMessageCache.get(instanceName).put(message.getId(), message); + + LOG.info("Add message to relay cache " + message.getMsgId()); } @Override public String toString() { http://git-wip-us.apache.org/repos/asf/helix/blob/74145e8a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java index 9cc9506..340a051 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java @@ -164,8 +164,8 @@ public class CurrentStateComputationStage extends AbstractBaseStage { message.getMsgId(), resourceName, message.getPartitionName())); } } else { - LogUtil.logWarn(LOG, _eventId, String - .format("A relay message %s should not be batched, ignored!", message.getMsgId())); + LogUtil.logWarn(LOG, _eventId, + String.format("A relay message %s should not be batched, ignored!", message.getMsgId())); } } } @@ -275,8 +275,9 @@ public class CurrentStateComputationStage extends AbstractBaseStage { // Check whether it is already passed threshold for (String resourceName : missingTopStateMap.keySet()) { for (String partitionName : missingTopStateMap.get(resourceName).keySet()) { - long startTime = missingTopStateMap.get(resourceName).get(partitionName); - if (startTime > 0 && System.currentTimeMillis() - startTime > durationThreshold) { + Long startTime = missingTopStateMap.get(resourceName).get(partitionName); + if (startTime != null && startTime > 0 + && System.currentTimeMillis() - startTime > durationThreshold) { missingTopStateMap.get(resourceName).put(partitionName, TRANSITION_FAILED); if (clusterStatusMonitor != null) { clusterStatusMonitor.updateMissingTopStateDurationStats(resourceName, 0L, false); http://git-wip-us.apache.org/repos/asf/helix/blob/74145e8a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java index 6036f34..6d483a0 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java @@ -74,7 +74,8 @@ public class TaskAssignmentStage extends AbstractBaseStage { List<Message> outputMessages = batchMessage(dataAccessor.keyBuilder(), messagesToSend, resourceMap, liveInstanceMap, manager.getProperties()); - sendMessages(dataAccessor, outputMessages); + + List<Message> messagesSent = sendMessages(dataAccessor, outputMessages); // TODO: Need also count messages from task rebalancer if (!cache.isTaskCache()) { ClusterStatusMonitor clusterStatusMonitor = @@ -84,7 +85,7 @@ public class TaskAssignmentStage extends AbstractBaseStage { } } long cacheStart = System.currentTimeMillis(); - cache.cacheMessages(outputMessages); + cache.cacheMessages(messagesSent); long cacheEnd = System.currentTimeMillis(); LogUtil.logDebug(logger, _eventId, "Caching messages took " + (cacheEnd - cacheStart) + " ms"); } @@ -132,9 +133,11 @@ public class TaskAssignmentStage extends AbstractBaseStage { return outputMessages; } - protected void sendMessages(HelixDataAccessor dataAccessor, List<Message> messages) { + // return the messages actually sent + protected List<Message> sendMessages(HelixDataAccessor dataAccessor, List<Message> messages) { + List<Message> messageSent = new ArrayList<>(); if (messages == null || messages.isEmpty()) { - return; + return messageSent; } Builder keyBuilder = dataAccessor.keyBuilder(); @@ -146,6 +149,7 @@ public class TaskAssignmentStage extends AbstractBaseStage { + message.getResourceName() + "." + message.getPartitionName() + "|" + message .getPartitionNames() + " from:" + message.getFromState() + " to:" + message .getToState() + ", relayMessages: " + message.getRelayMessages().size()); + if (message.hasRelayMessages()) { for (Message msg : message.getRelayMessages().values()) { LogUtil.logInfo(logger, _eventId, @@ -163,8 +167,12 @@ public class TaskAssignmentStage extends AbstractBaseStage { boolean[] results = dataAccessor.createChildren(keys, new ArrayList<>(messages)); for (int i = 0; i < results.length; i++) { if (!results[i]) { - LogUtil.logWarn(logger, _eventId, "Failed to send message: " + keys.get(i)); + LogUtil.logError(logger, _eventId, "Failed to send message: " + keys.get(i)); + } else { + messageSent.add(messages.get(i)); } } + + return messageSent; } } http://git-wip-us.apache.org/repos/asf/helix/blob/74145e8a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java index 925f127..5e2082c 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java @@ -874,12 +874,16 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { } else if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name()) && isStateTransitionInProgress(messageTarget)) { + String taskId = _messageTaskMap.get(messageTarget); + Message msg = _taskMap.get(taskId).getTask().getMessage(); + // If there is another state transition for same partition is going on, // discard the message. Controller will resend if this is a valid message - throw new HelixException(String - .format("Another state transition for %s:%s is in progress. Discarding %s->%s message", - message.getResourceName(), message.getPartitionName(), message.getFromState(), - message.getToState())); + throw new HelixException(String.format( + "Another state transition for %s:%s is in progress with msg: %s, p2p: %s, read: %d, current:%d. Discarding %s->%s message", + message.getResourceName(), message.getPartitionName(), msg.getMsgId(), String.valueOf(msg.isRelayMessage()), + msg.getReadTimeStamp(), System.currentTimeMillis(), message.getFromState(), + message.getToState())); } stateTransitionHandlers http://git-wip-us.apache.org/repos/asf/helix/blob/74145e8a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java index b3ef3e5..bf7f566 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java +++ b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java @@ -80,8 +80,7 @@ public class TestP2PNoDuplicatedMessage extends ZkTestBase { HelixDataAccessor _accessor; @BeforeClass - public void beforeClass() - throws InterruptedException { + public void beforeClass() { System.out.println( "START " + getShortClassName() + " at " + new Date(System.currentTimeMillis())); @@ -136,7 +135,7 @@ public class TestP2PNoDuplicatedMessage extends ZkTestBase { } @Test - public void testP2PStateTransitionDisabled() throws InterruptedException { + public void testP2PStateTransitionDisabled() { enableP2PInCluster(CLUSTER_NAME, _configAccessor, false); MockHelixTaskExecutor.resetStats(); @@ -158,7 +157,7 @@ public class TestP2PNoDuplicatedMessage extends ZkTestBase { } @Test (dependsOnMethods = {"testP2PStateTransitionDisabled"}) - public void testP2PStateTransitionEnabled() throws InterruptedException { + public void testP2PStateTransitionEnabled() { enableP2PInCluster(CLUSTER_NAME, _configAccessor, true); long startTime = System.currentTimeMillis(); MockHelixTaskExecutor.resetStats(); @@ -174,9 +173,9 @@ public class TestP2PNoDuplicatedMessage extends ZkTestBase { } double ratio = ((double) p2pTrigged) / ((double) total); - Assert.assertTrue(ratio > 0.7, String - .format("Only %d out of %d percent transitions to Master were triggered by expected host!", - p2pTrigged, total)); + Assert.assertTrue(ratio > 0.6, String + .format("Only %d out of %d percent transitions to Master were triggered by expected host!", + p2pTrigged, total)); Assert.assertEquals(MockHelixTaskExecutor.duplicatedMessagesInProgress, 0, "There are duplicated transition messages sent while participant is handling the state-transition!");
