[HELIX-777] TASK: Handle null currentState for unscheduled tasks It was observed that when a workflow is submitted and the Controller attempts to schedule its tasks, ZK read fails to read the appropriate job's context, causing the job to be stuck in an unscheduled state. The job remained unscheduled because it had no currentStates, and its job context did not contain any assignment/state information. This RB fixes such stuck states by detecting null currentStates. Changelist: 1. Check if currentState is null and if it is, manually assign an INIT state
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/5d24ed54 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/5d24ed54 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/5d24ed54 Branch: refs/heads/master Commit: 5d24ed544898ff69f289f54be71a04413735d118 Parents: 6090732 Author: Hunter Lee <[email protected]> Authored: Wed Oct 31 14:21:49 2018 -0700 Committer: Hunter Lee <[email protected]> Committed: Wed Oct 31 17:17:16 2018 -0700 ---------------------------------------------------------------------- .../helix/task/AbstractTaskDispatcher.java | 213 +++++++++---------- 1 file changed, 106 insertions(+), 107 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/5d24ed54/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java index 617263b..aa72f2d 100644 --- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java @@ -23,7 +23,6 @@ import org.apache.helix.model.Partition; import org.apache.helix.model.ResourceAssignment; import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor; import org.apache.helix.task.assigner.AssignableInstance; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,7 +72,6 @@ public abstract class AbstractTaskDispatcher { // this pending state transition, essentially "waiting" until this pending message clears Message pendingMessage = currStateOutput.getPendingMessage(jobResource, new Partition(pName), instance); - if (pendingMessage != null && !pendingMessage.getToState().equals(currState.name())) { // If there is a pending message whose destination state is different from the current // state, just make the same assignment as the pending message. This is essentially @@ -125,26 +123,26 @@ public abstract class AbstractTaskDispatcher { } switch (currState) { - case RUNNING: { - TaskPartitionState nextState = TaskPartitionState.RUNNING; - if (jobState == TaskState.TIMING_OUT) { - nextState = TaskPartitionState.TASK_ABORTED; - } else if (jobTgtState == TargetState.STOP) { - nextState = TaskPartitionState.STOPPED; - } else if (jobState == TaskState.ABORTED || jobState == TaskState.FAILED - || jobState == TaskState.FAILING || jobState == TaskState.TIMED_OUT) { - // Drop tasks if parent job is not in progress - paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name())); - break; - } + case RUNNING: { + TaskPartitionState nextState = TaskPartitionState.RUNNING; + if (jobState == TaskState.TIMING_OUT) { + nextState = TaskPartitionState.TASK_ABORTED; + } else if (jobTgtState == TargetState.STOP) { + nextState = TaskPartitionState.STOPPED; + } else if (jobState == TaskState.ABORTED || jobState == TaskState.FAILED + || jobState == TaskState.FAILING || jobState == TaskState.TIMED_OUT) { + // Drop tasks if parent job is not in progress + paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name())); + break; + } - paMap.put(pId, new PartitionAssignment(instance, nextState.name())); - assignedPartitions.add(pId); - if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName, - nextState, instance)); - } + paMap.put(pId, new PartitionAssignment(instance, nextState.name())); + assignedPartitions.add(pId); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName, + nextState, instance)); } + } break; case STOPPED: { // TODO: This case statement might be unreachable code - Hunter @@ -166,106 +164,107 @@ public abstract class AbstractTaskDispatcher { assignedPartitions.add(pId); if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName, nextState, instance)); + LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName, + nextState, instance)); } } break; - case COMPLETED: { - // The task has completed on this partition. Mark as such in the context object. - donePartitions.add(pId); - if (LOG.isDebugEnabled()) { - LOG.debug(String.format( - "Task partition %s has completed with state %s. Marking as such in rebalancer context.", - pName, currState)); - } - partitionsToDropFromIs.add(pId); - markPartitionCompleted(jobCtx, pId); - - // This task is COMPLETED, so release this task - assignableInstance.release(taskConfig, quotaType); + case COMPLETED: { + // The task has completed on this partition. Mark as such in the context object. + donePartitions.add(pId); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format( + "Task partition %s has completed with state %s. Marking as such in rebalancer context.", + pName, currState)); } + partitionsToDropFromIs.add(pId); + markPartitionCompleted(jobCtx, pId); + + // This task is COMPLETED, so release this task + assignableInstance.release(taskConfig, quotaType); + } break; - case TIMED_OUT: + case TIMED_OUT: - case TASK_ERROR: + case TASK_ERROR: - case TASK_ABORTED: + case TASK_ABORTED: - case ERROR: { - donePartitions.add(pId); // The task may be rescheduled on a different instance. - if (LOG.isDebugEnabled()) { - LOG.debug(String.format( - "Task partition %s has error state %s with msg %s. Marking as such in rebalancer context.", - pName, currState, jobCtx.getPartitionInfo(pId))); - } - markPartitionError(jobCtx, pId, currState, true); - // The error policy is to fail the task as soon a single partition fails for a specified - // maximum number of attempts or task is in ABORTED state. - // But notice that if job is TIMED_OUT, aborted task won't be treated as fail and won't - // cause job fail. - // After all tasks are aborted, they will be dropped, because of job timeout. - if (jobState != TaskState.TIMED_OUT && jobState != TaskState.TIMING_OUT) { - if (jobCtx.getPartitionNumAttempts(pId) >= jobCfg.getMaxAttemptsPerTask() - || currState.equals(TaskPartitionState.TASK_ABORTED) - || currState.equals(TaskPartitionState.ERROR)) { - skippedPartitions.add(pId); - partitionsToDropFromIs.add(pId); - if (LOG.isDebugEnabled()) { - LOG.debug("skippedPartitions:" + skippedPartitions); - } - } else { - // Mark the task to be started at some later time (if enabled) - markPartitionDelayed(jobCfg, jobCtx, pId); + case ERROR: { + donePartitions.add(pId); // The task may be rescheduled on a different instance. + if (LOG.isDebugEnabled()) { + LOG.debug(String.format( + "Task partition %s has error state %s with msg %s. Marking as such in rebalancer context.", + pName, currState, jobCtx.getPartitionInfo(pId))); + } + markPartitionError(jobCtx, pId, currState, true); + // The error policy is to fail the task as soon a single partition fails for a specified + // maximum number of attempts or task is in ABORTED state. + // But notice that if job is TIMED_OUT, aborted task won't be treated as fail and won't + // cause job fail. + // After all tasks are aborted, they will be dropped, because of job timeout. + if (jobState != TaskState.TIMED_OUT && jobState != TaskState.TIMING_OUT) { + if (jobCtx.getPartitionNumAttempts(pId) >= jobCfg.getMaxAttemptsPerTask() + || currState.equals(TaskPartitionState.TASK_ABORTED) + || currState.equals(TaskPartitionState.ERROR)) { + skippedPartitions.add(pId); + partitionsToDropFromIs.add(pId); + if (LOG.isDebugEnabled()) { + LOG.debug("skippedPartitions:" + skippedPartitions); } + } else { + // Mark the task to be started at some later time (if enabled) + markPartitionDelayed(jobCfg, jobCtx, pId); } - // Release this task - assignableInstance.release(taskConfig, quotaType); } + // Release this task + assignableInstance.release(taskConfig, quotaType); + } break; - case INIT: { - // INIT is a temporary state for tasks - // Two possible scenarios for INIT: - // 1. Task is getting scheduled for the first time. In this case, Task's state will go - // from null->INIT->RUNNING, and this INIT state will be transient and very short-lived - // 2. Task is getting scheduled for the first time, but in this case, job is timed out or - // timing out. In this case, it will be sent back to INIT state to be removed. Here we - // ensure that this task then goes from INIT to DROPPED so that it will be released from - // AssignableInstance to prevent resource leak - if (jobState == TaskState.TIMED_OUT || jobState == TaskState.TIMING_OUT - || jobTgtState == TargetState.DELETE) { - // Job is timed out or timing out or targetState is to be deleted, so its tasks will be - // sent back to INIT - // In this case, tasks' IdealState will be removed, and they will be sent to DROPPED - partitionsToDropFromIs.add(pId); + case INIT: { + // INIT is a temporary state for tasks + // Two possible scenarios for INIT: + // 1. Task is getting scheduled for the first time. In this case, Task's state will go + // from null->INIT->RUNNING, and this INIT state will be transient and very short-lived + // 2. Task is getting scheduled for the first time, but in this case, job is timed out or + // timing out. In this case, it will be sent back to INIT state to be removed. Here we + // ensure that this task then goes from INIT to DROPPED so that it will be released from + // AssignableInstance to prevent resource leak + if (jobState == TaskState.TIMED_OUT || jobState == TaskState.TIMING_OUT + || jobTgtState == TargetState.DELETE) { + // Job is timed out or timing out or targetState is to be deleted, so its tasks will be + // sent back to INIT + // In this case, tasks' IdealState will be removed, and they will be sent to DROPPED + partitionsToDropFromIs.add(pId); - // Also release resources for these tasks - assignableInstance.release(taskConfig, quotaType); + // Also release resources for these tasks + assignableInstance.release(taskConfig, quotaType); - } else if (jobState == TaskState.IN_PROGRESS - && (jobTgtState != TargetState.STOP && jobTgtState != TargetState.DELETE)) { - // Job is in progress, implying that tasks are being re-tried, so set it to RUNNING - paMap.put(pId, - new JobRebalancer.PartitionAssignment(instance, TaskPartitionState.RUNNING.name())); - assignedPartitions.add(pId); - } + } else if (jobState == TaskState.IN_PROGRESS + && (jobTgtState != TargetState.STOP && jobTgtState != TargetState.DELETE)) { + // Job is in progress, implying that tasks are being re-tried, so set it to RUNNING + paMap.put(pId, + new JobRebalancer.PartitionAssignment(instance, TaskPartitionState.RUNNING.name())); + assignedPartitions.add(pId); } + } - case DROPPED: { - // currState in [INIT, DROPPED]. Do nothing, the partition is eligible to be reassigned. - donePartitions.add(pId); - if (LOG.isDebugEnabled()) { - LOG.debug(String.format( - "Task partition %s has state %s. It will be dropped from the current ideal state.", - pName, currState)); - } - // If it's DROPPED, release this task. If INIT, do not release - if (currState == TaskPartitionState.DROPPED) { - assignableInstance.release(taskConfig, quotaType); - } + case DROPPED: { + // currState in [INIT, DROPPED]. Do nothing, the partition is eligible to be reassigned. + donePartitions.add(pId); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format( + "Task partition %s has state %s. It will be dropped from the current ideal state.", + pName, currState)); + } + // If it's DROPPED, release this task. If INIT, do not release + if (currState == TaskPartitionState.DROPPED) { + assignableInstance.release(taskConfig, quotaType); } + } break; - default: - throw new AssertionError("Unknown enum symbol: " + currState); + default: + throw new AssertionError("Unknown enum symbol: " + currState); } } @@ -301,7 +300,8 @@ public abstract class AbstractTaskDispatcher { currentStateOutput.getCurrentState(jobResource, new Partition(pName), instance); if (currentStateString == null) { // Task state is either DROPPED or INIT - return jobCtx.getPartitionState(pId); + TaskPartitionState stateFromContext = jobCtx.getPartitionState(pId); + return stateFromContext == null ? TaskPartitionState.INIT : stateFromContext; } TaskPartitionState currentState = TaskPartitionState.valueOf(currentStateString); jobCtx.setPartitionState(pId, currentState); @@ -859,7 +859,6 @@ public abstract class AbstractTaskDispatcher { incomplete = true; } } - if (!incomplete && cfg.isTerminable()) { ctx.setWorkflowState(TaskState.COMPLETED); return true; @@ -922,7 +921,7 @@ public abstract class AbstractTaskDispatcher { long nextRebalanceTime = _rebalanceScheduler.getRebalanceTime(resourceId); if (nextTimeout >= System.currentTimeMillis() && (nextRebalanceTime == TaskConstants.DEFAULT_NEVER_TIMEOUT - || nextTimeout < nextRebalanceTime)) { + || nextTimeout < nextRebalanceTime)) { _rebalanceScheduler.scheduleRebalance(_manager, resourceId, nextTimeout); } } @@ -942,8 +941,8 @@ public abstract class AbstractTaskDispatcher { private long getTimeoutTime(long startTime, long timeoutPeriod) { return (timeoutPeriod == TaskConstants.DEFAULT_NEVER_TIMEOUT || timeoutPeriod > Long.MAX_VALUE - startTime) // check long overflow - ? TaskConstants.DEFAULT_NEVER_TIMEOUT - : startTime + timeoutPeriod; + ? TaskConstants.DEFAULT_NEVER_TIMEOUT + : startTime + timeoutPeriod; } /**
