Improve message handling properly

We need some improvement for client side to help stabilize the message 
handling. Current scenario is that when process message, Helix marked the 
message read before really scheduled.

In this case, if scheduling has problem, but the message already has been 
marked as read, no one will take care of the message and hang there forever.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/1103fecb
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/1103fecb
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/1103fecb

Branch: refs/heads/master
Commit: 1103fecb67def5e610a7b22636ba4ac25e23777b
Parents: 9659370
Author: Junkai Xue <j...@linkedin.com>
Authored: Mon Sep 17 11:51:31 2018 -0700
Committer: Junkai Xue <j...@linkedin.com>
Committed: Wed Oct 31 13:50:37 2018 -0700

----------------------------------------------------------------------
 .../helix/messaging/handling/HelixTaskExecutor.java  | 15 +++++++++------
 1 file changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/1103fecb/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 5e2082c..3ae90d3 100644
--- 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -952,22 +952,25 @@ public class HelixTaskExecutor implements 
MessageListener, TaskExecutor {
     if (readMsgs.size() > 0) {
       updateMessageState(readMsgs, accessor, instanceName);
 
+      // Remove message if schedule tasks are failed.
       for (Map.Entry<String, MessageHandler> handlerEntry : 
stateTransitionHandlers.entrySet()) {
         MessageHandler handler = handlerEntry.getValue();
         NotificationContext context = 
stateTransitionContexts.get(handlerEntry.getKey());
         Message msg = handler._message;
-        scheduleTask(
-            new HelixTask(msg, context, handler, this)
-        );
+        if (!scheduleTask(new HelixTask(msg, context, handler, this))) {
+          removeMessageFromTaskAndFutureMap(msg);
+          removeMessageFromZK(accessor, msg, instanceName);
+        }
       }
 
       for (int i = 0; i < nonStateTransitionHandlers.size(); i++) {
         MessageHandler handler = nonStateTransitionHandlers.get(i);
         NotificationContext context = nonStateTransitionContexts.get(i);
         Message msg = handler._message;
-        scheduleTask(
-            new HelixTask(msg, context, handler, this)
-        );
+        if (!scheduleTask(new HelixTask(msg, context, handler, this))) {
+          removeMessageFromTaskAndFutureMap(msg);
+          removeMessageFromZK(accessor, msg, instanceName);
+        }
       }
     }
   }

Reply via email to