Repository: helix Updated Branches: refs/heads/master ae8eb5969 -> f99d9477f
[HELIX-690] batch message execution should not share same context Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/f99d9477 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/f99d9477 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/f99d9477 Branch: refs/heads/master Commit: f99d9477f279613fd7c3229008721b6d3a1699df Parents: ae8eb59 Author: Harry Zhang <[email protected]> Authored: Mon Apr 16 09:55:43 2018 -0700 Committer: Harry Zhang <[email protected]> Committed: Thu Apr 19 11:06:51 2018 -0700 ---------------------------------------------------------------------- .../org/apache/helix/NotificationContext.java | 18 ++++++++ .../handling/HelixBatchMessageTask.java | 6 ++- .../helix/messaging/handling/HelixTask.java | 2 +- .../messaging/handling/HelixTaskExecutor.java | 43 ++++++++++++-------- 4 files changed, 50 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/f99d9477/helix-core/src/main/java/org/apache/helix/NotificationContext.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/NotificationContext.java b/helix-core/src/main/java/org/apache/helix/NotificationContext.java index 48a7d07..dd76b60 100644 --- a/helix-core/src/main/java/org/apache/helix/NotificationContext.java +++ b/helix-core/src/main/java/org/apache/helix/NotificationContext.java @@ -74,6 +74,24 @@ public class NotificationContext { } /** + * Clone a new Notification context from existing one. Map contents are + * not recursively deep copied. + * + * @return new copy of NotificationContext + */ + public NotificationContext clone() { + + NotificationContext copy = new NotificationContext(_manager); + copy.setType(_type); + copy.setChangeType(_changeType); + copy.setPathChanged(_pathChanged); + copy.setEventName(_eventName); + copy.setCreationTime(_creationTime); + copy._map.putAll(_map); + return copy; + } + + /** * Get the HelixManager associated with this notification * * @return {@link HelixManager} object http://git-wip-us.apache.org/repos/asf/helix/blob/f99d9477/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixBatchMessageTask.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixBatchMessageTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixBatchMessageTask.java index 05b875e..86abce7 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixBatchMessageTask.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixBatchMessageTask.java @@ -48,7 +48,7 @@ public class HelixBatchMessageTask implements MessageTask { HelixTaskResult taskResult = null; long start = System.currentTimeMillis(); - LOG.info("taskId:" + getTaskId() + " handling task begin, at: " + start); + LOG.info("BatchMsg task {} handling task begin, at: {}", getTaskId(), start); boolean isSucceed = true; try { @@ -74,7 +74,9 @@ public class HelixBatchMessageTask implements MessageTask { } if (isSucceed) { - LOG.info("task: " + getTaskId() + " completed sucessfully"); + LOG.info("BatchMsg task {} completed successfully", getTaskId()); + } else { + LOG.warn("BatchMsg task {} failed.", getTaskId()); } taskResult = new HelixTaskResult(); http://git-wip-us.apache.org/repos/asf/helix/blob/f99d9477/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java index 7225f70..337a933 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java @@ -84,7 +84,7 @@ public class HelixTask implements MessageTask { // add a concurrent map to hold currentStateUpdates for sub-messages of a batch-message // partitionName -> csUpdate - if (_message.getBatchMessageMode() == true) { + if (_message.getBatchMessageMode()) { _notificationContext.add(MapKey.CURRENT_STATE_UPDATE.toString(), new ConcurrentHashMap<String, CurrentStateUpdate>()); } http://git-wip-us.apache.org/repos/asf/helix/blob/f99d9477/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java index ccd7100..70a30ec 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -35,9 +36,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; - -import org.apache.commons.codec.binary.StringUtils; -import org.apache.helix.AccessOption; import org.apache.helix.ConfigAccessor; import org.apache.helix.Criteria; import org.apache.helix.HelixConstants; @@ -50,7 +48,6 @@ import org.apache.helix.NotificationContext.MapKey; import org.apache.helix.NotificationContext.Type; import org.apache.helix.PropertyKey; import org.apache.helix.PropertyKey.Builder; -import org.apache.helix.PropertyType; import org.apache.helix.api.listeners.MessageListener; import org.apache.helix.api.listeners.PreFetch; import org.apache.helix.controller.GenericHelixController; @@ -62,10 +59,10 @@ import org.apache.helix.model.Message; import org.apache.helix.model.Message.MessageState; import org.apache.helix.model.Message.MessageType; import org.apache.helix.model.builder.HelixConfigScopeBuilder; -import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor; import org.apache.helix.monitoring.mbeans.MessageQueueMonitor; import org.apache.helix.monitoring.mbeans.ParticipantMessageMonitor; import org.apache.helix.monitoring.mbeans.ParticipantMessageMonitor.ProcessedMessageState; +import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor; import org.apache.helix.participant.HelixStateMachineEngine; import org.apache.helix.participant.statemachine.StateModel; import org.apache.helix.participant.statemachine.StateModelFactory; @@ -74,8 +71,6 @@ import org.apache.helix.util.StatusUpdateUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; - public class HelixTaskExecutor implements MessageListener, TaskExecutor { /** * Put together all registration information about a message handler factory @@ -769,9 +764,12 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { HelixDataAccessor accessor = manager.getHelixDataAccessor(); Builder keyBuilder = accessor.keyBuilder(); - // message handlers created + // message handlers and corresponding contexts created Map<String, MessageHandler> stateTransitionHandlers = new HashMap<>(); + Map<String, NotificationContext> stateTransitionContexts = new HashMap<>(); + List<MessageHandler> nonStateTransitionHandlers = new ArrayList<>(); + List<NotificationContext> nonStateTransitionContexts = new ArrayList<>(); // message read List<Message> readMsgs = new ArrayList<>(); @@ -858,8 +856,9 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { _monitor.reportReceivedMessage(message); // create message handlers, if handlers not found, leave its state as NEW + NotificationContext msgWorkingContext = changeContext.clone(); try { - MessageHandler createHandler = createMessageHandler(message, changeContext); + MessageHandler createHandler = createMessageHandler(message, msgWorkingContext); if (createHandler == null) { continue; } @@ -868,8 +867,12 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { stateTransitionHandlers .put(getMessageTarget(message.getResourceName(), message.getPartitionName()), createHandler); + stateTransitionContexts + .put(getMessageTarget(message.getResourceName(), message.getPartitionName()), + msgWorkingContext); } else { nonStateTransitionHandlers.add(createHandler); + nonStateTransitionContexts.add(msgWorkingContext); } } catch (Exception e) { LOG.error("Failed to create message handler for " + message.getMsgId(), e); @@ -885,7 +888,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { continue; } - markReadMessage(message, changeContext, manager); + markReadMessage(message, msgWorkingContext, manager); readMsgs.add(message); // batch creation of all current state meta data @@ -927,14 +930,22 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { if (readMsgs.size() > 0) { updateMessageState(readMsgs, accessor, instanceName); - for (MessageHandler handler : stateTransitionHandlers.values()) { - HelixTask task = new HelixTask(handler._message, changeContext, handler, this); - scheduleTask(task); + for (Map.Entry<String, MessageHandler> handlerEntry : stateTransitionHandlers.entrySet()) { + MessageHandler handler = handlerEntry.getValue(); + NotificationContext context = stateTransitionContexts.get(handlerEntry.getKey()); + Message msg = handler._message; + scheduleTask( + new HelixTask(msg, context, handler, this) + ); } - for (MessageHandler handler : nonStateTransitionHandlers) { - HelixTask task = new HelixTask(handler._message, changeContext, handler, this); - scheduleTask(task); + for (int i = 0; i < nonStateTransitionHandlers.size(); i++) { + MessageHandler handler = nonStateTransitionHandlers.get(i); + NotificationContext context = nonStateTransitionContexts.get(i); + Message msg = handler._message; + scheduleTask( + new HelixTask(msg, context, handler, this) + ); } } }
