Improve message handling properly We need some improvement for client side to help stabilize the message handling. Current scenario is that when process message, Helix marked the message read before really scheduled.
In this case, if scheduling has problem, but the message already has been marked as read, no one will take care of the message and hang there forever. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/1103fecb Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/1103fecb Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/1103fecb Branch: refs/heads/master Commit: 1103fecb67def5e610a7b22636ba4ac25e23777b Parents: 9659370 Author: Junkai Xue <j...@linkedin.com> Authored: Mon Sep 17 11:51:31 2018 -0700 Committer: Junkai Xue <j...@linkedin.com> Committed: Wed Oct 31 13:50:37 2018 -0700 ---------------------------------------------------------------------- .../helix/messaging/handling/HelixTaskExecutor.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/1103fecb/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java index 5e2082c..3ae90d3 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java @@ -952,22 +952,25 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { if (readMsgs.size() > 0) { updateMessageState(readMsgs, accessor, instanceName); + // Remove message if schedule tasks are failed. for (Map.Entry<String, MessageHandler> handlerEntry : stateTransitionHandlers.entrySet()) { MessageHandler handler = handlerEntry.getValue(); NotificationContext context = stateTransitionContexts.get(handlerEntry.getKey()); Message msg = handler._message; - scheduleTask( - new HelixTask(msg, context, handler, this) - ); + if (!scheduleTask(new HelixTask(msg, context, handler, this))) { + removeMessageFromTaskAndFutureMap(msg); + removeMessageFromZK(accessor, msg, instanceName); + } } for (int i = 0; i < nonStateTransitionHandlers.size(); i++) { MessageHandler handler = nonStateTransitionHandlers.get(i); NotificationContext context = nonStateTransitionContexts.get(i); Message msg = handler._message; - scheduleTask( - new HelixTask(msg, context, handler, this) - ); + if (!scheduleTask(new HelixTask(msg, context, handler, this))) { + removeMessageFromTaskAndFutureMap(msg); + removeMessageFromZK(accessor, msg, instanceName); + } } } }