Improve helix message timeout task

>From logs and code, it could be a very rare race condition that the message 
>actually has been processed and completed but message has not been removed. 
>Once it is completed, it should cancel the timeout task running with separated 
>thread. But just before it tried to cancel the task, the message has been 
>timed out and message handling thread has been interrupted by time out task 
>thread, which shown in the log.

So the message handling thread did not catch the interrupted exception at that 
moment and failed to remove message from ZK with READ state. After I manually 
removed the message, we got an error log that showing the partition is already 
LEADER now. That proves the assumption that the message has been successfully 
process.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/1507f016
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/1507f016
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/1507f016

Branch: refs/heads/master
Commit: 1507f0161df24f4bec3ba3632b2d23a7a9bed5d4
Parents: c783ae7
Author: Junkai Xue <[email protected]>
Authored: Wed Oct 10 16:59:28 2018 -0700
Committer: Junkai Xue <[email protected]>
Committed: Thu Nov 1 14:38:58 2018 -0700

----------------------------------------------------------------------
 .../helix/messaging/handling/HelixTask.java     | 28 +++++++++++++-------
 1 file changed, 18 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/1507f016/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
index 2f3d805..fb55e76 100644
--- 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
+++ 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
@@ -22,7 +22,6 @@ package org.apache.helix.messaging.handling;
 import java.util.Date;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-
 import org.apache.helix.AccessOption;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
@@ -96,6 +95,9 @@ public class HelixTask implements MessageTask {
       handlerStart = System.currentTimeMillis();
       taskResult = _handler.handleMessage();
       handlerEnd = System.currentTimeMillis();
+
+      // cancel timeout task
+      _executor.cancelTimeoutTask(this);
     } catch (InterruptedException e) {
       taskResult = new HelixTaskResult();
       taskResult.setException(e);
@@ -116,9 +118,6 @@ public class HelixTask implements MessageTask {
       _statusUpdateUtil.logError(_message, HelixTask.class, e, errorMessage, 
_manager);
     }
 
-    // cancel timeout task
-    _executor.cancelTimeoutTask(this);
-
     Exception exception = null;
     try {
       if (taskResult.isSuccess()) {
@@ -182,13 +181,9 @@ public class HelixTask implements MessageTask {
         }
       }
 
-      if (_message.getAttribute(Attributes.PARENT_MSG_ID) == null) {
-        removeMessageFromZk(accessor, _message);
-        reportMessageStat(_manager, _message, taskResult);
-        sendReply(getSrcClusterDataAccessor(_message), _message, taskResult);
-        _executor.finishTask(this);
-      }
+      finalCleanup(taskResult);
     } catch (Exception e) {
+      finalCleanup(taskResult);
       exception = e;
       type = ErrorType.FRAMEWORK;
       code = ErrorCode.ERROR;
@@ -377,4 +372,17 @@ public class HelixTask implements MessageTask {
     }
     _isStarted = true;
   }
+
+  private void finalCleanup(HelixTaskResult taskResult) {
+    try {
+      if (_message.getAttribute(Attributes.PARENT_MSG_ID) == null) {
+        removeMessageFromZk(_manager.getHelixDataAccessor(), _message);
+        reportMessageStat(_manager, _message, taskResult);
+        sendReply(getSrcClusterDataAccessor(_message), _message, taskResult);
+        _executor.finishTask(this);
+      }
+    } catch (Exception e) {
+      logger.error(String.format("Error to final clean up for message : %s", 
_message.getId()));
+    }
+  }
 }

Reply via email to