[HELIX-778] TASK: Fix a race condition in updatePreviousAssignedTasksStatus
It was observed that TestUnregisteredCommand is very unstable. The reason was identified to be a race condition where when a task fails, sometimes a pending message for that task (from INIT to RUNNING) wasn't being cleaned up on time, so AbstractTaskDispatcher's updatePreviousAssignedTasksStatus would try to process that message and skip the status update of that task (like updating its status and NUM_ATTEMPTS field in JobContext). A short, temporary fix is to call markPartitionError() prior to checking the pending message, but over the long haul, we would need to revisit the task status update's design here to avoid this type of race conditions. Changelist: 1. Move markPartitionError() up before checking for a pending message on the task 2. Fix TestUnregisteredCommand's instability Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/ceba1a55 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/ceba1a55 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/ceba1a55 Branch: refs/heads/master Commit: ceba1a55ae351090144c001324f908f2364212a4 Parents: 5d24ed5 Author: Hunter Lee <[email protected]> Authored: Wed Oct 31 17:20:37 2018 -0700 Committer: Hunter Lee <[email protected]> Committed: Wed Oct 31 17:20:37 2018 -0700 ---------------------------------------------------------------------- .../apache/helix/task/AbstractTaskDispatcher.java | 15 ++++++++++++--- .../integration/task/TestUnregisteredCommand.java | 3 ++- 2 files changed, 14 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/ceba1a55/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java index aa72f2d..cbf9fb8 100644 --- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java @@ -67,6 +67,16 @@ public abstract class AbstractTaskDispatcher { TaskPartitionState currState = updateJobContextAndGetTaskCurrentState(currStateOutput, jobResource, pId, pName, instance, jobCtx); + // This avoids a race condition in the case that although currentState is in the following + // error condition, the pending message (INIT->RUNNNING) might still be present. + // This is undesirable because this prevents JobContext from getting the proper update of + // fields including task state and task's NUM_ATTEMPTS + if (currState == TaskPartitionState.ERROR || currState == TaskPartitionState.TASK_ERROR + || currState == TaskPartitionState.TIMED_OUT + || currState == TaskPartitionState.TASK_ABORTED) { + markPartitionError(jobCtx, pId, currState, true); + } + // Check for pending state transitions on this (partition, instance). If there is a pending // state transition, we prioritize this pending state transition and set the assignment from // this pending state transition, essentially "waiting" until this pending message clears @@ -197,7 +207,6 @@ public abstract class AbstractTaskDispatcher { "Task partition %s has error state %s with msg %s. Marking as such in rebalancer context.", pName, currState, jobCtx.getPartitionInfo(pId))); } - markPartitionError(jobCtx, pId, currState, true); // The error policy is to fail the task as soon a single partition fails for a specified // maximum number of attempts or task is in ABORTED state. // But notice that if job is TIMED_OUT, aborted task won't be treated as fail and won't @@ -239,7 +248,6 @@ public abstract class AbstractTaskDispatcher { // Also release resources for these tasks assignableInstance.release(taskConfig, quotaType); - } else if (jobState == TaskState.IN_PROGRESS && (jobTgtState != TargetState.STOP && jobTgtState != TargetState.DELETE)) { // Job is in progress, implying that tasks are being re-tried, so set it to RUNNING @@ -940,7 +948,8 @@ public abstract class AbstractTaskDispatcher { private long getTimeoutTime(long startTime, long timeoutPeriod) { return (timeoutPeriod == TaskConstants.DEFAULT_NEVER_TIMEOUT - || timeoutPeriod > Long.MAX_VALUE - startTime) // check long overflow + || timeoutPeriod > Long.MAX_VALUE - startTime) + // check long overflow ? TaskConstants.DEFAULT_NEVER_TIMEOUT : startTime + timeoutPeriod; } http://git-wip-us.apache.org/repos/asf/helix/blob/ceba1a55/helix-core/src/test/java/org/apache/helix/integration/task/TestUnregisteredCommand.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestUnregisteredCommand.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestUnregisteredCommand.java index 95a9be4..6f78cc0 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestUnregisteredCommand.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestUnregisteredCommand.java @@ -37,7 +37,8 @@ public class TestUnregisteredCommand extends TaskTestBase { super.beforeClass(); } - @Test public void testUnregisteredCommand() throws InterruptedException { + @Test + public void testUnregisteredCommand() throws InterruptedException { String workflowName = TestHelper.getTestMethodName(); Workflow.Builder builder = new Workflow.Builder(workflowName);
