Repository: tez Updated Branches: refs/heads/TEZ-2003 b787648ed -> 5f9653233
TEZ-2443. TaskRunner2 should call abort, NPEs while cleaning up tasks. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5f965323 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5f965323 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5f965323 Branch: refs/heads/TEZ-2003 Commit: 5f96532332eda1246412c3e0af3661aa9179ce96 Parents: b787648 Author: Siddharth Seth <[email protected]> Authored: Tue May 12 14:27:42 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Tue May 12 14:27:42 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../apache/tez/dag/api/TaskCommunicator.java | 4 ++ .../runtime/LogicalIOProcessorRuntimeTask.java | 75 ++++++++++---------- .../org/apache/tez/runtime/RuntimeTask.java | 2 +- .../tez/runtime/task/TaskRunner2Callable.java | 13 ++-- .../apache/tez/runtime/task/TezTaskRunner2.java | 10 +-- 6 files changed, 56 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/5f965323/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index 5d2e40a..ed72d6b 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -25,5 +25,6 @@ ALL CHANGES: TEZ-2433. Fixes after rebase 05/08 TEZ-2438. tez-tools version in the branch is incorrect. TEZ-2434. Allow tasks to be killed in the Runtime. + TEZ-2443. TaskRunner2 should call abort, NPEs while cleaning up tasks. INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/5f965323/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java index cadca0c..2651013 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java @@ -48,6 +48,10 @@ public abstract class TaskCommunicator extends AbstractService { // TODO TEZ-2003. Are additional APIs required to mark a container as completed ? - for completeness. // TODO TEZ-2003 Remove reference to TaskAttemptID + // TODO TEZ-2003 This needs some information about why the attempt is being unregistered. + // e.g. preempted in which case the task may need to be informed. Alternately as a result of + // a failed task. + // In case of preemption - a killTask API is likely a better bet than trying to overload this method. public abstract void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID); // TODO TEZ-2003 This doesn't necessarily belong here. A server may not start within the AM. http://git-wip-us.apache.org/repos/asf/tez/blob/5f965323/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java index 9a0e397..f32da76 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java @@ -119,7 +119,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { final ConcurrentHashMap<String, LogicalInput> initializedInputs; final ConcurrentHashMap<String, LogicalOutput> initializedOutputs; - private boolean processorClosed; + private boolean processorClosed = false; final ProcessorDescriptor processorDescriptor; AbstractLogicalIOProcessor processor; ProcessorContext processorContext; @@ -693,7 +693,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { } @Override - public synchronized void abortTask() throws Exception { + public synchronized void abortTask() { if (processor != null) { processor.abort(); } @@ -791,7 +791,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { LOG.debug("Num of inputs to be closed={}", initializedInputs.size()); LOG.debug("Num of outputs to be closed={}", initializedOutputs.size()); } - if (!processorClosed) { + if (!processorClosed && processor != null) { try { processorClosed = true; processor.close(); @@ -808,19 +808,20 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { LOG.info("Resetting interrupt for processor"); Thread.currentThread().interrupt(); } catch (Throwable e) { - LOG.warn("Exception when closing processor", e); + LOG.warn( + "Ignoring Exception when closing processor(cleanup). Exception class={}, message={}" + + e.getClass().getName(), e.getMessage(), e); } } + // Close the remaining inited Inputs. - Iterator<String> srcVertexItr = initializedInputs.keySet().iterator(); - while (srcVertexItr.hasNext()) { - String srcVertexName = srcVertexItr.next(); + Iterator<Map.Entry<String, LogicalInput>> inputIterator = initializedInputs.entrySet().iterator(); + while (inputIterator.hasNext()) { + Map.Entry<String, LogicalInput> entry = inputIterator.next(); + String srcVertexName = entry.getKey(); + inputIterator.remove(); try { - srcVertexItr.remove(); - - initializedInputs.remove(srcVertexName); - ((InputFrameworkInterface) initializedInputs.get(srcVertexName)).close(); - + ((InputFrameworkInterface)entry.getValue()).close(); maybeResetInterruptStatus(); } catch (InterruptedException ie) { //reset the status @@ -828,7 +829,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { srcVertexName); Thread.currentThread().interrupt(); } catch (Throwable e) { - LOG.warn("Exception when closing input in cleanup(interrupted)", e); + LOG.warn( + "Ignoring exception when closing input {}(cleanup). Exception class={}, message={}", + srcVertexName, e.getClass().getName(), e.getMessage(), e); } finally { LOG.info("Close input for vertex={}, sourceVertex={}, interruptedStatus={}", processor .getContext().getTaskVertexName(), srcVertexName, Thread.currentThread() @@ -837,32 +840,28 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { } // Close the remaining inited Outputs. - try { - Iterator<String> outVertexItr = initializedOutputs.keySet().iterator(); - while (outVertexItr.hasNext()) { - String destVertexName = outVertexItr.next(); - try { - outVertexItr.remove(); - - initializedOutputs.remove(destVertexName); - ((OutputFrameworkInterface) initializedOutputs.get(destVertexName)).close(); - - maybeResetInterruptStatus(); - } catch (InterruptedException ie) { - //reset the status - LOG.info("Resetting interrupt status for output with destVertexName={}", - destVertexName); - Thread.currentThread().interrupt(); - } catch (Throwable e) { - LOG.warn("Exception when closing output in cleanup(interrupted)", e); - } finally { - LOG.info("Close input for vertex={}, sourceVertex={}, interruptedStatus={}", processor - .getContext().getTaskVertexName(), destVertexName, Thread.currentThread() - .isInterrupted()); - } + Iterator<Map.Entry<String, LogicalOutput>> outputIterator = initializedOutputs.entrySet().iterator(); + while (outputIterator.hasNext()) { + Map.Entry<String, LogicalOutput> entry = outputIterator.next(); + String destVertexName = entry.getKey(); + outputIterator.remove(); + try { + ((OutputFrameworkInterface) entry.getValue()).close(); + maybeResetInterruptStatus(); + } catch (InterruptedException ie) { + //reset the status + LOG.info("Resetting interrupt status for output with destVertexName={}", + destVertexName); + Thread.currentThread().interrupt(); + } catch (Throwable e) { + LOG.warn( + "Ignoring exception when closing output {}(cleanup). Exception class={}, message={}", + destVertexName, e.getClass().getName(), e.getMessage(), e); + } finally { + LOG.info("Close input for vertex={}, sourceVertex={}, interruptedStatus={}", processor + .getContext().getTaskVertexName(), destVertexName, Thread.currentThread() + .isInterrupted()); } - } catch (Throwable e) { - LOG.warn(Throwables.getStackTraceAsString(e)); } if (LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/tez/blob/5f965323/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java index 7b09455..316a138 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java @@ -157,5 +157,5 @@ public abstract class RuntimeTask { taskDone.set(true); } - public abstract void abortTask() throws Exception; + public abstract void abortTask(); } http://git-wip-us.apache.org/repos/asf/tez/blob/5f965323/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java index 7315bbd..ab77635 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java @@ -63,26 +63,26 @@ public class TaskRunner2Callable extends CallableWithNdc<TaskRunner2Callable.Tas if (stopRequested.get() || Thread.currentThread().isInterrupted()) { return new TaskRunner2CallableResult(null); } - LOG.info("Initializing task" + ", taskAttemptId=" + task.getTaskAttemptID()); + LOG.info("Initializing task" + ", taskAttemptId={}", task.getTaskAttemptID()); task.initialize(); if (!stopRequested.get() && !Thread.currentThread().isInterrupted()) { - LOG.info("Running task, taskAttemptId=" + task.getTaskAttemptID()); + LOG.info("Running task, taskAttemptId={}", task.getTaskAttemptID()); task.run(); } else { - LOG.info("Stopped before running the processor."); + LOG.info("Stopped before running the processor taskAttemptId={}", task.getTaskAttemptID()); return new TaskRunner2CallableResult(null); } if (!stopRequested.get() && !Thread.currentThread().isInterrupted()) { - LOG.info("Closing task, taskAttemptId=" + task.getTaskAttemptID()); + LOG.info("Closing task, taskAttemptId={}", task.getTaskAttemptID()); task.close(); task.setFrameworkCounters(); } else { - LOG.info("Stopped before closing the processor"); + LOG.info("Stopped before closing the processor, taskAttemptId={}", task.getTaskAttemptID()); return new TaskRunner2CallableResult(null); } - LOG.info("Task completed, taskAttemptId=" + task.getTaskAttemptID() + ", askedToStop=" + stopRequested.get()); + LOG.info("Task completed, taskAttemptId={}, askedToStop={}", task.getTaskAttemptID(), stopRequested.get()); return new TaskRunner2CallableResult(null); @@ -115,6 +115,7 @@ public class TaskRunner2Callable extends CallableWithNdc<TaskRunner2Callable.Tas public void interruptTask() { // Ensure the task is only interrupted once. if (!stopRequested.getAndSet(true)) { + task.abortTask(); if (ownThread != null) { ownThread.interrupt(); } http://git-wip-us.apache.org/repos/asf/tez/blob/5f965323/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java index 73e5c76..ffbc6e8 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java @@ -250,10 +250,12 @@ public class TezTaskRunner2 { public void killTask() { synchronized (this) { if (isRunningState()) { - trySettingEndReason(EndReason.KILL_REQUESTED); - if (taskRunnerCallable != null) { - taskKillStartTime = System.currentTimeMillis(); - taskRunnerCallable.interruptTask(); + if (trySettingEndReason(EndReason.KILL_REQUESTED)) { + killTaskRequested.set(true); + if (taskRunnerCallable != null) { + taskKillStartTime = System.currentTimeMillis(); + taskRunnerCallable.interruptTask(); + } } } }
