TEZ-2443. TaskRunner2 should call abort, NPEs while cleaning up tasks. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/848b137f Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/848b137f Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/848b137f Branch: refs/heads/TEZ-2003 Commit: 848b137f43af982dae9c9a51594170c13d59fb91 Parents: 12a3729 Author: Siddharth Seth <[email protected]> Authored: Tue May 12 14:27:42 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Thu Aug 20 18:22:07 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../java/org/apache/tez/dag/api/TaskCommunicator.java | 4 ++++ .../tez/runtime/LogicalIOProcessorRuntimeTask.java | 11 ++++++----- .../main/java/org/apache/tez/runtime/RuntimeTask.java | 2 +- .../apache/tez/runtime/task/TaskRunner2Callable.java | 13 +++++++------ .../org/apache/tez/runtime/task/TezTaskRunner2.java | 10 ++++++---- 6 files changed, 25 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/848b137f/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index 5d2e40a..ed72d6b 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -25,5 +25,6 @@ ALL CHANGES: TEZ-2433. Fixes after rebase 05/08 TEZ-2438. tez-tools version in the branch is incorrect. TEZ-2434. Allow tasks to be killed in the Runtime. + TEZ-2443. TaskRunner2 should call abort, NPEs while cleaning up tasks. INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/848b137f/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java index cadca0c..2651013 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java @@ -48,6 +48,10 @@ public abstract class TaskCommunicator extends AbstractService { // TODO TEZ-2003. Are additional APIs required to mark a container as completed ? - for completeness. // TODO TEZ-2003 Remove reference to TaskAttemptID + // TODO TEZ-2003 This needs some information about why the attempt is being unregistered. + // e.g. preempted in which case the task may need to be informed. Alternately as a result of + // a failed task. + // In case of preemption - a killTask API is likely a better bet than trying to overload this method. public abstract void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID); // TODO TEZ-2003 This doesn't necessarily belong here. A server may not start within the AM. http://git-wip-us.apache.org/repos/asf/tez/blob/848b137f/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java index 8263b3f..de08e56 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java @@ -704,7 +704,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { } @Override - public synchronized void abortTask() throws Exception { + public synchronized void abortTask() { if (processor != null) { processor.abort(); } @@ -803,6 +803,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { LOG.debug("Num of inputs to be closed={}", initializedInputs.size()); LOG.debug("Num of outputs to be closed={}", initializedOutputs.size()); } + // Close processor if (!processorClosed && processor != null) { try { @@ -820,8 +821,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { Thread.currentThread().interrupt(); } catch (Throwable e) { LOG.warn( - "Ignoring Exception when closing processor(cleanup). Exception class={}, message={}", - e.getClass().getName(), e.getMessage()); + "Ignoring Exception when closing processor(cleanup). Exception class={}, message={}" + + e.getClass().getName(), e.getMessage(), e); } } @@ -842,7 +843,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { } catch (Throwable e) { LOG.warn( "Ignoring exception when closing input {}(cleanup). Exception class={}, message={}", - srcVertexName, e.getClass().getName(), e.getMessage()); + srcVertexName, e.getClass().getName(), e.getMessage(), e); } finally { LOG.info("Close input for vertex={}, sourceVertex={}, interruptedStatus={}", processor .getContext().getTaskVertexName(), srcVertexName, Thread.currentThread().isInterrupted()); @@ -866,7 +867,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { } catch (Throwable e) { LOG.warn( "Ignoring exception when closing output {}(cleanup). Exception class={}, message={}", - destVertexName, e.getClass().getName(), e.getMessage()); + destVertexName, e.getClass().getName(), e.getMessage(), e); } finally { LOG.info("Close input for vertex={}, sourceVertex={}, interruptedStatus={}", processor .getContext().getTaskVertexName(), destVertexName, Thread.currentThread().isInterrupted()); http://git-wip-us.apache.org/repos/asf/tez/blob/848b137f/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java index cdfb46a..33c0113 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java @@ -167,5 +167,5 @@ public abstract class RuntimeTask { taskDone.set(true); } - public abstract void abortTask() throws Exception; + public abstract void abortTask(); } http://git-wip-us.apache.org/repos/asf/tez/blob/848b137f/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java index 7315bbd..ab77635 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java @@ -63,26 +63,26 @@ public class TaskRunner2Callable extends CallableWithNdc<TaskRunner2Callable.Tas if (stopRequested.get() || Thread.currentThread().isInterrupted()) { return new TaskRunner2CallableResult(null); } - LOG.info("Initializing task" + ", taskAttemptId=" + task.getTaskAttemptID()); + LOG.info("Initializing task" + ", taskAttemptId={}", task.getTaskAttemptID()); task.initialize(); if (!stopRequested.get() && !Thread.currentThread().isInterrupted()) { - LOG.info("Running task, taskAttemptId=" + task.getTaskAttemptID()); + LOG.info("Running task, taskAttemptId={}", task.getTaskAttemptID()); task.run(); } else { - LOG.info("Stopped before running the processor."); + LOG.info("Stopped before running the processor taskAttemptId={}", task.getTaskAttemptID()); return new TaskRunner2CallableResult(null); } if (!stopRequested.get() && !Thread.currentThread().isInterrupted()) { - LOG.info("Closing task, taskAttemptId=" + task.getTaskAttemptID()); + LOG.info("Closing task, taskAttemptId={}", task.getTaskAttemptID()); task.close(); task.setFrameworkCounters(); } else { - LOG.info("Stopped before closing the processor"); + LOG.info("Stopped before closing the processor, taskAttemptId={}", task.getTaskAttemptID()); return new TaskRunner2CallableResult(null); } - LOG.info("Task completed, taskAttemptId=" + task.getTaskAttemptID() + ", askedToStop=" + stopRequested.get()); + LOG.info("Task completed, taskAttemptId={}, askedToStop={}", task.getTaskAttemptID(), stopRequested.get()); return new TaskRunner2CallableResult(null); @@ -115,6 +115,7 @@ public class TaskRunner2Callable extends CallableWithNdc<TaskRunner2Callable.Tas public void interruptTask() { // Ensure the task is only interrupted once. if (!stopRequested.getAndSet(true)) { + task.abortTask(); if (ownThread != null) { ownThread.interrupt(); } http://git-wip-us.apache.org/repos/asf/tez/blob/848b137f/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java index 73e5c76..ffbc6e8 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java @@ -250,10 +250,12 @@ public class TezTaskRunner2 { public void killTask() { synchronized (this) { if (isRunningState()) { - trySettingEndReason(EndReason.KILL_REQUESTED); - if (taskRunnerCallable != null) { - taskKillStartTime = System.currentTimeMillis(); - taskRunnerCallable.interruptTask(); + if (trySettingEndReason(EndReason.KILL_REQUESTED)) { + killTaskRequested.set(true); + if (taskRunnerCallable != null) { + taskKillStartTime = System.currentTimeMillis(); + taskRunnerCallable.interruptTask(); + } } } }
