Improve helix message timeout task >From logs and code, it could be a very rare race condition that the message >actually has been processed and completed but message has not been removed. >Once it is completed, it should cancel the timeout task running with separated >thread. But just before it tried to cancel the task, the message has been >timed out and message handling thread has been interrupted by time out task >thread, which shown in the log.
So the message handling thread did not catch the interrupted exception at that moment and failed to remove message from ZK with READ state. After I manually removed the message, we got an error log that showing the partition is already LEADER now. That proves the assumption that the message has been successfully process. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/1507f016 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/1507f016 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/1507f016 Branch: refs/heads/master Commit: 1507f0161df24f4bec3ba3632b2d23a7a9bed5d4 Parents: c783ae7 Author: Junkai Xue <[email protected]> Authored: Wed Oct 10 16:59:28 2018 -0700 Committer: Junkai Xue <[email protected]> Committed: Thu Nov 1 14:38:58 2018 -0700 ---------------------------------------------------------------------- .../helix/messaging/handling/HelixTask.java | 28 +++++++++++++------- 1 file changed, 18 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/1507f016/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java index 2f3d805..fb55e76 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java @@ -22,7 +22,6 @@ package org.apache.helix.messaging.handling; import java.util.Date; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; - import org.apache.helix.AccessOption; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; @@ -96,6 +95,9 @@ public class HelixTask implements MessageTask { handlerStart = System.currentTimeMillis(); taskResult = _handler.handleMessage(); handlerEnd = System.currentTimeMillis(); + + // cancel timeout task + _executor.cancelTimeoutTask(this); } catch (InterruptedException e) { taskResult = new HelixTaskResult(); taskResult.setException(e); @@ -116,9 +118,6 @@ public class HelixTask implements MessageTask { _statusUpdateUtil.logError(_message, HelixTask.class, e, errorMessage, _manager); } - // cancel timeout task - _executor.cancelTimeoutTask(this); - Exception exception = null; try { if (taskResult.isSuccess()) { @@ -182,13 +181,9 @@ public class HelixTask implements MessageTask { } } - if (_message.getAttribute(Attributes.PARENT_MSG_ID) == null) { - removeMessageFromZk(accessor, _message); - reportMessageStat(_manager, _message, taskResult); - sendReply(getSrcClusterDataAccessor(_message), _message, taskResult); - _executor.finishTask(this); - } + finalCleanup(taskResult); } catch (Exception e) { + finalCleanup(taskResult); exception = e; type = ErrorType.FRAMEWORK; code = ErrorCode.ERROR; @@ -377,4 +372,17 @@ public class HelixTask implements MessageTask { } _isStarted = true; } + + private void finalCleanup(HelixTaskResult taskResult) { + try { + if (_message.getAttribute(Attributes.PARENT_MSG_ID) == null) { + removeMessageFromZk(_manager.getHelixDataAccessor(), _message); + reportMessageStat(_manager, _message, taskResult); + sendReply(getSrcClusterDataAccessor(_message), _message, taskResult); + _executor.finishTask(this); + } + } catch (Exception e) { + logger.error(String.format("Error to final clean up for message : %s", _message.getId())); + } + } }
