TEZ-2502. Fix for TezTaskRunner2 not killing tasks properly in all situations. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2a3e2b3a Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2a3e2b3a Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2a3e2b3a Branch: refs/heads/TEZ-2003 Commit: 2a3e2b3acf2feb994d382efe246cae9b29474ae2 Parents: 2c19f3c Author: Siddharth Seth <[email protected]> Authored: Thu May 28 18:29:12 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Fri Aug 14 13:46:44 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../apache/tez/runtime/task/TezTaskRunner2.java | 83 ++++++++++++-------- 2 files changed, 53 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/2a3e2b3a/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index e333832..42c2e1e 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -29,5 +29,6 @@ ALL CHANGES: TEZ-2465. Retrun the status of a kill request in TaskRunner2. TEZ-2471. NPE in LogicalIOProcessorRuntimeTask while printing thread info. TEZ-2495. Inform TaskCommunicaor about Task and Container termination reasons. + TEZ-2502. Fix for TezTaskRunner2 not killing tasks properly in all situations. INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/2a3e2b3a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java index 15629fd..a5fabb5 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java @@ -124,6 +124,8 @@ public class TezTaskRunner2 { try { ListenableFuture<TaskRunner2CallableResult> future = null; synchronized (this) { + // All running state changes must be made within a synchronized block to ensure + // kills are issued or the task is not setup. if (isRunningState()) { // Safe to do this within a synchronized block because we're providing // the handler on which the Reporter will communicate back. Assuming @@ -252,27 +254,34 @@ public class TezTaskRunner2 { * @return true if the task kill was honored, false otherwise */ public boolean killTask() { + boolean isFirstError = false; synchronized (this) { if (isRunningState()) { if (trySettingEndReason(EndReason.KILL_REQUESTED)) { + isFirstError = true; killTaskRequested.set(true); - if (taskRunnerCallable != null) { - taskKillStartTime = System.currentTimeMillis(); - taskRunnerCallable.interruptTask(); - } - return true; } else { - LOG.info("Ignoring killTask request for {} since end reason is already set to {}", - task.getTaskAttemptID(), firstEndReason); + logErrorIngored("killTask", null); } } else { - LOG.info("Ignoring killTask request for {} since it is not in a running state", - task.getTaskAttemptID()); + logErrorIngored("killTask", null); } } - return false; + if (isFirstError) { + logAborting("killTask"); + killTaskInternal(); + return true; + } else { + return false; + } } + private void killTaskInternal() { + if (taskRunnerCallable != null) { + taskKillStartTime = System.currentTimeMillis(); + taskRunnerCallable.interruptTask(); + } + } // Checks and changes on these states should happen within a synchronized block, // to ensure the first event is the one that is captured and causes specific behaviour. @@ -310,17 +319,18 @@ public class TezTaskRunner2 { errorReporterToAm.set(true); oobSignalErrorInProgress = true; } else { - LOG.info( - "Ignoring fatal error since the task has ended for reason: {}. IgnoredError: {} ", - firstEndReason, (t == null ? message : t.getMessage())); + logErrorIngored("signalFatalError", message); } + } else { + logErrorIngored("signalFatalError", message); } } // Informing the TaskReporter here because the running task may not be interruptable. // Has to be outside the lock. if (isFirstError) { - killTask(); + logAborting("signalFatalError"); + killTaskInternal(); try { taskReporter.taskFailed(taskAttemptID, t, getTaskDiagnosticsString(t, message), sourceInfo); } catch (IOException e) { @@ -371,19 +381,22 @@ public class TezTaskRunner2 { if (trySettingEndReason(EndReason.COMMUNICATION_FAILURE)) { registerFirstException(t, null); isFirstError = true; + } else { + logErrorIngored("umbilicalFatalError", null); } // A race is possible between a task succeeding, and a subsequent timed heartbeat failing. // These errors can be ignored, since a task can only succeed if the synchronous taskSucceeded // method does not throw an exception, in which case task success is registered with the AM. // Leave subsequent heartbeat errors to the next entity to communicate using the TaskReporter } else { - LOG.info("Ignoring Communication failure since task with id=" + task.getTaskAttemptID() - + " is already complete, is failing or has been asked to terminate"); + logErrorIngored("umbilicalFatalError", null); } + // Since this error came from the taskReporter - there's no point attempting to report a failure back to it. + // However, the task does need to be cleaned up } - // Since this error came from the taskReporter - there's no point attempting to report a failure back to it. if (isFirstError) { - killTask(); + logAborting("umbilicalFatalError"); + killTaskInternal(); } } @@ -395,18 +408,12 @@ public class TezTaskRunner2 { isFirstTerminate = trySettingEndReason(EndReason.CONTAINER_STOP_REQUESTED); // Respect stopContainerRequested since it can come in at any point, despite a previous failure. stopContainerRequested.set(true); - - if (isFirstTerminate) { - LOG.info("Attempting to abort {} since a shutdown request was received", - task.getTaskAttemptID()); - if (taskRunnerCallable != null) { - taskKillStartTime = System.currentTimeMillis(); - taskRunnerCallable.interruptTask(); - } - } else { - LOG.info("Not acting on shutdown request for {} since the task is not in running state", - task.getTaskAttemptID()); - } + } + if (isFirstTerminate) { + logAborting("shutdownRequested"); + killTaskInternal(); + } else { + logErrorIngored("shutdownRequested", null); } } } @@ -451,6 +458,20 @@ public class TezTaskRunner2 { private void handleFinalStatusUpdateFailure(Throwable t, boolean successReportAttempted) { // TODO Ideally differentiate between FAILED/KILLED - LOG.warn("Failure while reporting state= {} to AM", (successReportAttempted ? "success" : "failure/killed"), t); + LOG.warn("Failure while reporting state= {} to AM", + (successReportAttempted ? "success" : "failure/killed"), t); + } + + private void logErrorIngored(String ignoredEndReason, String errorMessage) { + LOG.info( + "Ignoring {} request since the task with id {} has ended for reason: {}. IgnoredError: {} ", + ignoredEndReason, task.getTaskAttemptID(), + firstEndReason, (firstException == null ? (errorMessage == null ? "" : errorMessage) : + firstException.getMessage())); + } + + private void logAborting(String abortReason) { + LOG.info("Attempting to abort {} due to an invocation of {}", task.getTaskAttemptID(), + abortReason); } } \ No newline at end of file
