[HELIX-777] TASK: Handle null currentState for unscheduled tasks

It was observed that when a workflow is submitted and the Controller attempts 
to schedule its tasks, ZK read fails to read the appropriate job's context, 
causing the job to be stuck in an unscheduled state. The job remained 
unscheduled because it had no currentStates, and its job context did not 
contain any assignment/state information. This RB fixes such stuck states by 
detecting null currentStates.
Changelist:
1. Check if currentState is null and if it is, manually assign an INIT state


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/5d24ed54
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/5d24ed54
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/5d24ed54

Branch: refs/heads/master
Commit: 5d24ed544898ff69f289f54be71a04413735d118
Parents: 6090732
Author: Hunter Lee <[email protected]>
Authored: Wed Oct 31 14:21:49 2018 -0700
Committer: Hunter Lee <[email protected]>
Committed: Wed Oct 31 17:17:16 2018 -0700

----------------------------------------------------------------------
 .../helix/task/AbstractTaskDispatcher.java      | 213 +++++++++----------
 1 file changed, 106 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/5d24ed54/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java 
b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index 617263b..aa72f2d 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -23,7 +23,6 @@ import org.apache.helix.model.Partition;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.helix.task.assigner.AssignableInstance;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,7 +72,6 @@ public abstract class AbstractTaskDispatcher {
         // this pending state transition, essentially "waiting" until this 
pending message clears
         Message pendingMessage =
             currStateOutput.getPendingMessage(jobResource, new 
Partition(pName), instance);
-
         if (pendingMessage != null && 
!pendingMessage.getToState().equals(currState.name())) {
           // If there is a pending message whose destination state is 
different from the current
           // state, just make the same assignment as the pending message. This 
is essentially
@@ -125,26 +123,26 @@ public abstract class AbstractTaskDispatcher {
         }
 
         switch (currState) {
-          case RUNNING: {
-            TaskPartitionState nextState = TaskPartitionState.RUNNING;
-            if (jobState == TaskState.TIMING_OUT) {
-              nextState = TaskPartitionState.TASK_ABORTED;
-            } else if (jobTgtState == TargetState.STOP) {
-              nextState = TaskPartitionState.STOPPED;
-            } else if (jobState == TaskState.ABORTED || jobState == 
TaskState.FAILED
-                || jobState == TaskState.FAILING || jobState == 
TaskState.TIMED_OUT) {
-              // Drop tasks if parent job is not in progress
-              paMap.put(pId, new PartitionAssignment(instance, 
TaskPartitionState.DROPPED.name()));
-              break;
-            }
+        case RUNNING: {
+          TaskPartitionState nextState = TaskPartitionState.RUNNING;
+          if (jobState == TaskState.TIMING_OUT) {
+            nextState = TaskPartitionState.TASK_ABORTED;
+          } else if (jobTgtState == TargetState.STOP) {
+            nextState = TaskPartitionState.STOPPED;
+          } else if (jobState == TaskState.ABORTED || jobState == 
TaskState.FAILED
+              || jobState == TaskState.FAILING || jobState == 
TaskState.TIMED_OUT) {
+            // Drop tasks if parent job is not in progress
+            paMap.put(pId, new PartitionAssignment(instance, 
TaskPartitionState.DROPPED.name()));
+            break;
+          }
 
-            paMap.put(pId, new PartitionAssignment(instance, 
nextState.name()));
-            assignedPartitions.add(pId);
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(String.format("Setting task partition %s state to %s 
on instance %s.", pName,
-                  nextState, instance));
-            }
+          paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
+          assignedPartitions.add(pId);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String.format("Setting task partition %s state to %s on 
instance %s.", pName,
+                nextState, instance));
           }
+        }
           break;
         case STOPPED: {
           // TODO: This case statement might be unreachable code - Hunter
@@ -166,106 +164,107 @@ public abstract class AbstractTaskDispatcher {
           assignedPartitions.add(pId);
 
           if (LOG.isDebugEnabled()) {
-            LOG.debug(String.format("Setting task partition %s state to %s on 
instance %s.", pName, nextState, instance));
+            LOG.debug(String.format("Setting task partition %s state to %s on 
instance %s.", pName,
+                nextState, instance));
           }
         }
           break;
-          case COMPLETED: {
-            // The task has completed on this partition. Mark as such in the 
context object.
-            donePartitions.add(pId);
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(String.format(
-                  "Task partition %s has completed with state %s. Marking as 
such in rebalancer context.",
-                  pName, currState));
-            }
-            partitionsToDropFromIs.add(pId);
-            markPartitionCompleted(jobCtx, pId);
-
-            // This task is COMPLETED, so release this task
-            assignableInstance.release(taskConfig, quotaType);
+        case COMPLETED: {
+          // The task has completed on this partition. Mark as such in the 
context object.
+          donePartitions.add(pId);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String.format(
+                "Task partition %s has completed with state %s. Marking as 
such in rebalancer context.",
+                pName, currState));
           }
+          partitionsToDropFromIs.add(pId);
+          markPartitionCompleted(jobCtx, pId);
+
+          // This task is COMPLETED, so release this task
+          assignableInstance.release(taskConfig, quotaType);
+        }
           break;
-          case TIMED_OUT:
+        case TIMED_OUT:
 
-          case TASK_ERROR:
+        case TASK_ERROR:
 
-          case TASK_ABORTED:
+        case TASK_ABORTED:
 
-          case ERROR: {
-            donePartitions.add(pId); // The task may be rescheduled on a 
different instance.
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(String.format(
-                  "Task partition %s has error state %s with msg %s. Marking 
as such in rebalancer context.",
-                  pName, currState, jobCtx.getPartitionInfo(pId)));
-            }
-            markPartitionError(jobCtx, pId, currState, true);
-            // The error policy is to fail the task as soon a single partition 
fails for a specified
-            // maximum number of attempts or task is in ABORTED state.
-            // But notice that if job is TIMED_OUT, aborted task won't be 
treated as fail and won't
-            // cause job fail.
-            // After all tasks are aborted, they will be dropped, because of 
job timeout.
-            if (jobState != TaskState.TIMED_OUT && jobState != 
TaskState.TIMING_OUT) {
-              if (jobCtx.getPartitionNumAttempts(pId) >= 
jobCfg.getMaxAttemptsPerTask()
-                  || currState.equals(TaskPartitionState.TASK_ABORTED)
-                  || currState.equals(TaskPartitionState.ERROR)) {
-                skippedPartitions.add(pId);
-                partitionsToDropFromIs.add(pId);
-                if (LOG.isDebugEnabled()) {
-                  LOG.debug("skippedPartitions:" + skippedPartitions);
-                }
-              } else {
-                // Mark the task to be started at some later time (if enabled)
-                markPartitionDelayed(jobCfg, jobCtx, pId);
+        case ERROR: {
+          donePartitions.add(pId); // The task may be rescheduled on a 
different instance.
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String.format(
+                "Task partition %s has error state %s with msg %s. Marking as 
such in rebalancer context.",
+                pName, currState, jobCtx.getPartitionInfo(pId)));
+          }
+          markPartitionError(jobCtx, pId, currState, true);
+          // The error policy is to fail the task as soon a single partition 
fails for a specified
+          // maximum number of attempts or task is in ABORTED state.
+          // But notice that if job is TIMED_OUT, aborted task won't be 
treated as fail and won't
+          // cause job fail.
+          // After all tasks are aborted, they will be dropped, because of job 
timeout.
+          if (jobState != TaskState.TIMED_OUT && jobState != 
TaskState.TIMING_OUT) {
+            if (jobCtx.getPartitionNumAttempts(pId) >= 
jobCfg.getMaxAttemptsPerTask()
+                || currState.equals(TaskPartitionState.TASK_ABORTED)
+                || currState.equals(TaskPartitionState.ERROR)) {
+              skippedPartitions.add(pId);
+              partitionsToDropFromIs.add(pId);
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("skippedPartitions:" + skippedPartitions);
               }
+            } else {
+              // Mark the task to be started at some later time (if enabled)
+              markPartitionDelayed(jobCfg, jobCtx, pId);
             }
-            // Release this task
-            assignableInstance.release(taskConfig, quotaType);
           }
+          // Release this task
+          assignableInstance.release(taskConfig, quotaType);
+        }
           break;
-          case INIT: {
-            // INIT is a temporary state for tasks
-            // Two possible scenarios for INIT:
-            // 1. Task is getting scheduled for the first time. In this case, 
Task's state will go
-            // from null->INIT->RUNNING, and this INIT state will be transient 
and very short-lived
-            // 2. Task is getting scheduled for the first time, but in this 
case, job is timed out or
-            // timing out. In this case, it will be sent back to INIT state to 
be removed. Here we
-            // ensure that this task then goes from INIT to DROPPED so that it 
will be released from
-            // AssignableInstance to prevent resource leak
-            if (jobState == TaskState.TIMED_OUT || jobState == 
TaskState.TIMING_OUT
-                || jobTgtState == TargetState.DELETE) {
-              // Job is timed out or timing out or targetState is to be 
deleted, so its tasks will be
-              // sent back to INIT
-              // In this case, tasks' IdealState will be removed, and they 
will be sent to DROPPED
-              partitionsToDropFromIs.add(pId);
+        case INIT: {
+          // INIT is a temporary state for tasks
+          // Two possible scenarios for INIT:
+          // 1. Task is getting scheduled for the first time. In this case, 
Task's state will go
+          // from null->INIT->RUNNING, and this INIT state will be transient 
and very short-lived
+          // 2. Task is getting scheduled for the first time, but in this 
case, job is timed out or
+          // timing out. In this case, it will be sent back to INIT state to 
be removed. Here we
+          // ensure that this task then goes from INIT to DROPPED so that it 
will be released from
+          // AssignableInstance to prevent resource leak
+          if (jobState == TaskState.TIMED_OUT || jobState == 
TaskState.TIMING_OUT
+              || jobTgtState == TargetState.DELETE) {
+            // Job is timed out or timing out or targetState is to be deleted, 
so its tasks will be
+            // sent back to INIT
+            // In this case, tasks' IdealState will be removed, and they will 
be sent to DROPPED
+            partitionsToDropFromIs.add(pId);
 
-              // Also release resources for these tasks
-              assignableInstance.release(taskConfig, quotaType);
+            // Also release resources for these tasks
+            assignableInstance.release(taskConfig, quotaType);
 
-            } else if (jobState == TaskState.IN_PROGRESS
-                && (jobTgtState != TargetState.STOP && jobTgtState != 
TargetState.DELETE)) {
-              // Job is in progress, implying that tasks are being re-tried, 
so set it to RUNNING
-              paMap.put(pId,
-                  new JobRebalancer.PartitionAssignment(instance, 
TaskPartitionState.RUNNING.name()));
-              assignedPartitions.add(pId);
-            }
+          } else if (jobState == TaskState.IN_PROGRESS
+              && (jobTgtState != TargetState.STOP && jobTgtState != 
TargetState.DELETE)) {
+            // Job is in progress, implying that tasks are being re-tried, so 
set it to RUNNING
+            paMap.put(pId,
+                new JobRebalancer.PartitionAssignment(instance, 
TaskPartitionState.RUNNING.name()));
+            assignedPartitions.add(pId);
           }
+        }
 
-          case DROPPED: {
-            // currState in [INIT, DROPPED]. Do nothing, the partition is 
eligible to be reassigned.
-            donePartitions.add(pId);
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(String.format(
-                  "Task partition %s has state %s. It will be dropped from the 
current ideal state.",
-                  pName, currState));
-            }
-            // If it's DROPPED, release this task. If INIT, do not release
-            if (currState == TaskPartitionState.DROPPED) {
-              assignableInstance.release(taskConfig, quotaType);
-            }
+        case DROPPED: {
+          // currState in [INIT, DROPPED]. Do nothing, the partition is 
eligible to be reassigned.
+          donePartitions.add(pId);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String.format(
+                "Task partition %s has state %s. It will be dropped from the 
current ideal state.",
+                pName, currState));
+          }
+          // If it's DROPPED, release this task. If INIT, do not release
+          if (currState == TaskPartitionState.DROPPED) {
+            assignableInstance.release(taskConfig, quotaType);
           }
+        }
           break;
-          default:
-            throw new AssertionError("Unknown enum symbol: " + currState);
+        default:
+          throw new AssertionError("Unknown enum symbol: " + currState);
         }
       }
 
@@ -301,7 +300,8 @@ public abstract class AbstractTaskDispatcher {
         currentStateOutput.getCurrentState(jobResource, new Partition(pName), 
instance);
     if (currentStateString == null) {
       // Task state is either DROPPED or INIT
-      return jobCtx.getPartitionState(pId);
+      TaskPartitionState stateFromContext = jobCtx.getPartitionState(pId);
+      return stateFromContext == null ? TaskPartitionState.INIT : 
stateFromContext;
     }
     TaskPartitionState currentState = 
TaskPartitionState.valueOf(currentStateString);
     jobCtx.setPartitionState(pId, currentState);
@@ -859,7 +859,6 @@ public abstract class AbstractTaskDispatcher {
         incomplete = true;
       }
     }
-
     if (!incomplete && cfg.isTerminable()) {
       ctx.setWorkflowState(TaskState.COMPLETED);
       return true;
@@ -922,7 +921,7 @@ public abstract class AbstractTaskDispatcher {
     long nextRebalanceTime = _rebalanceScheduler.getRebalanceTime(resourceId);
     if (nextTimeout >= System.currentTimeMillis()
         && (nextRebalanceTime == TaskConstants.DEFAULT_NEVER_TIMEOUT
-        || nextTimeout < nextRebalanceTime)) {
+            || nextTimeout < nextRebalanceTime)) {
       _rebalanceScheduler.scheduleRebalance(_manager, resourceId, nextTimeout);
     }
   }
@@ -942,8 +941,8 @@ public abstract class AbstractTaskDispatcher {
   private long getTimeoutTime(long startTime, long timeoutPeriod) {
     return (timeoutPeriod == TaskConstants.DEFAULT_NEVER_TIMEOUT
         || timeoutPeriod > Long.MAX_VALUE - startTime) // check long overflow
-        ? TaskConstants.DEFAULT_NEVER_TIMEOUT
-        : startTime + timeoutPeriod;
+            ? TaskConstants.DEFAULT_NEVER_TIMEOUT
+            : startTime + timeoutPeriod;
   }
 
   /**

Reply via email to