[HELIX-778] TASK: Fix a race condition in updatePreviousAssignedTasksStatus

It was observed that TestUnregisteredCommand is very unstable. The reason was 
identified to be a race condition where when a task fails, sometimes a pending 
message for that task (from INIT to RUNNING) wasn't being cleaned up on time, 
so AbstractTaskDispatcher's updatePreviousAssignedTasksStatus would try to 
process that message and skip the status update of that task (like updating its 
status and NUM_ATTEMPTS field in JobContext).

A short, temporary fix is to call markPartitionError() prior to checking the 
pending message, but over the long haul, we would need to revisit the task 
status update's design here to avoid this type of race conditions.

Changelist:
1. Move markPartitionError() up before checking for a pending message on the 
task
2. Fix TestUnregisteredCommand's instability


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/ceba1a55
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/ceba1a55
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/ceba1a55

Branch: refs/heads/master
Commit: ceba1a55ae351090144c001324f908f2364212a4
Parents: 5d24ed5
Author: Hunter Lee <[email protected]>
Authored: Wed Oct 31 17:20:37 2018 -0700
Committer: Hunter Lee <[email protected]>
Committed: Wed Oct 31 17:20:37 2018 -0700

----------------------------------------------------------------------
 .../apache/helix/task/AbstractTaskDispatcher.java    | 15 ++++++++++++---
 .../integration/task/TestUnregisteredCommand.java    |  3 ++-
 2 files changed, 14 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/ceba1a55/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java 
b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index aa72f2d..cbf9fb8 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -67,6 +67,16 @@ public abstract class AbstractTaskDispatcher {
         TaskPartitionState currState = 
updateJobContextAndGetTaskCurrentState(currStateOutput,
             jobResource, pId, pName, instance, jobCtx);
 
+        // This avoids a race condition in the case that although currentState 
is in the following
+        // error condition, the pending message (INIT->RUNNNING) might still 
be present.
+        // This is undesirable because this prevents JobContext from getting 
the proper update of
+        // fields including task state and task's NUM_ATTEMPTS
+        if (currState == TaskPartitionState.ERROR || currState == 
TaskPartitionState.TASK_ERROR
+            || currState == TaskPartitionState.TIMED_OUT
+            || currState == TaskPartitionState.TASK_ABORTED) {
+          markPartitionError(jobCtx, pId, currState, true);
+        }
+
         // Check for pending state transitions on this (partition, instance). 
If there is a pending
         // state transition, we prioritize this pending state transition and 
set the assignment from
         // this pending state transition, essentially "waiting" until this 
pending message clears
@@ -197,7 +207,6 @@ public abstract class AbstractTaskDispatcher {
                 "Task partition %s has error state %s with msg %s. Marking as 
such in rebalancer context.",
                 pName, currState, jobCtx.getPartitionInfo(pId)));
           }
-          markPartitionError(jobCtx, pId, currState, true);
           // The error policy is to fail the task as soon a single partition 
fails for a specified
           // maximum number of attempts or task is in ABORTED state.
           // But notice that if job is TIMED_OUT, aborted task won't be 
treated as fail and won't
@@ -239,7 +248,6 @@ public abstract class AbstractTaskDispatcher {
 
             // Also release resources for these tasks
             assignableInstance.release(taskConfig, quotaType);
-
           } else if (jobState == TaskState.IN_PROGRESS
               && (jobTgtState != TargetState.STOP && jobTgtState != 
TargetState.DELETE)) {
             // Job is in progress, implying that tasks are being re-tried, so 
set it to RUNNING
@@ -940,7 +948,8 @@ public abstract class AbstractTaskDispatcher {
 
   private long getTimeoutTime(long startTime, long timeoutPeriod) {
     return (timeoutPeriod == TaskConstants.DEFAULT_NEVER_TIMEOUT
-        || timeoutPeriod > Long.MAX_VALUE - startTime) // check long overflow
+        || timeoutPeriod > Long.MAX_VALUE - startTime)
+            // check long overflow
             ? TaskConstants.DEFAULT_NEVER_TIMEOUT
             : startTime + timeoutPeriod;
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/ceba1a55/helix-core/src/test/java/org/apache/helix/integration/task/TestUnregisteredCommand.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestUnregisteredCommand.java
 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestUnregisteredCommand.java
index 95a9be4..6f78cc0 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestUnregisteredCommand.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestUnregisteredCommand.java
@@ -37,7 +37,8 @@ public class TestUnregisteredCommand extends TaskTestBase {
     super.beforeClass();
   }
 
-  @Test public void testUnregisteredCommand() throws InterruptedException {
+  @Test
+  public void testUnregisteredCommand() throws InterruptedException {
     String workflowName = TestHelper.getTestMethodName();
     Workflow.Builder builder = new Workflow.Builder(workflowName);
 

Reply via email to