TEZ-2414. LogicalIOProcessorRuntimeTask, RuntimeTask, TezTaskRunner should handle interrupts & carry out necessary cleanups. Contribtued by Rajesh Balamohan.
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e4f7ea0a Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e4f7ea0a Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e4f7ea0a Branch: refs/heads/TEZ-2003 Commit: e4f7ea0aa12f9e5b6352e76745898644c530754d Parents: 9ed22b2 Author: Siddharth Seth <[email protected]> Authored: Wed May 6 00:39:46 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Thu Aug 6 01:25:34 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../runtime/LogicalIOProcessorRuntimeTask.java | 83 ++++++++++++++++++-- .../org/apache/tez/runtime/RuntimeTask.java | 5 ++ .../apache/tez/runtime/task/TezTaskRunner.java | 71 ++++++++++++++++- 4 files changed, 152 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/e4f7ea0a/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index 9fc9ed3..f8a71e8 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -20,5 +20,6 @@ ALL CHANGES: TEZ-2361. Propagate dag completion to TaskCommunicator. TEZ-2381. Fixes after rebase 04/28. TEZ-2388. Send dag identifier as part of the fetcher request string. + TEZ-2414. LogicalIOProcessorRuntimeTask, RuntimeTask, TezTaskRunner should handle interrupts & carry out necessary cleanups. INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/e4f7ea0a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java index 84e5e0d..8263b3f 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java @@ -20,6 +20,9 @@ package org.apache.tez.runtime; import java.io.Closeable; import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; import java.nio.ByteBuffer; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -41,6 +44,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; +import com.google.common.base.Throwables; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.tez.runtime.api.TaskContext; import org.apache.tez.runtime.api.impl.TezProcessorContextImpl; @@ -174,6 +178,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { this.runInputMap = new LinkedHashMap<String, LogicalInput>(); this.runOutputMap = new LinkedHashMap<String, LogicalOutput>(); + this.initializedInputs = new ConcurrentHashMap<String, LogicalInput>(); + this.initializedOutputs = new ConcurrentHashMap<String, LogicalOutput>(); + this.processorDescriptor = taskSpec.getProcessorDescriptor(); this.serviceConsumerMetadata = serviceConsumerMetadata; this.envMap = envMap; @@ -420,6 +427,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { taskSpec.getTaskAttemptID()); initializedInputs.put(edgeName, input); LOG.info("Initialized Input with src edge: " + edgeName); + initializedInputs.put(edgeName, input); return null; } } @@ -469,6 +477,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { outputContext.getDestinationVertexName(), taskSpec.getTaskAttemptID()); initializedOutputs.put(edgeName, output); LOG.info("Initialized Output with dest edge: " + edgeName); + initializedOutputs.put(edgeName, output); return null; } } @@ -694,6 +703,13 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { eventsToBeProcessed.addAll(events); } + @Override + public synchronized void abortTask() throws Exception { + if (processor != null) { + processor.abort(); + } + } + private void startRouterThread() { eventRouterThread = new Thread(new RunnableWithNdc() { public void runInternal() { @@ -713,6 +729,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { if (!isTaskDone()) { LOG.warn("Event Router thread interrupted. Returning."); } + Thread.currentThread().interrupt(); return; } } @@ -724,6 +741,12 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { eventRouterThread.start(); } + private void maybeResetInterruptStatus() { + if (!Thread.currentThread().isInterrupted()) { + Thread.currentThread().interrupt(); + } + } + private void closeContexts() throws IOException { closeContext(inputContextMap); closeContext(outputContextMap); @@ -763,6 +786,18 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { } // Close the unclosed IPO + /** + * Cleanup IPO that are not closed. In case, regular close() has happened in IPO, they + * would not be available in the IPOs to be cleaned. So this is safe. + * + * e.g whenever input gets closed() in normal way, it automatically removes it from + * initializedInputs map. + * + * In case any exception happens in processor close or IO close, it wouldn't be removed from + * the initialized IO data structures and here is the chance to close them and release + * resources. + * + */ if (LOG.isDebugEnabled()) { LOG.debug("Processor closed={}", processorClosed); LOG.debug("Num of inputs to be closed={}", initializedInputs.size()); @@ -773,10 +808,16 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { try { processorClosed = true; processor.close(); - LOG.info("Closed processor for vertex={}, index={}", + LOG.info("Closed processor for vertex={}, index={}, interruptedStatus={}", processor .getContext().getTaskVertexName(), - processor.getContext().getTaskVertexIndex()); + processor.getContext().getTaskVertexIndex(), + Thread.currentThread().isInterrupted()); + maybeResetInterruptStatus(); + } catch (InterruptedException ie) { + //reset the status + LOG.info("Resetting interrupt for processor"); + Thread.currentThread().interrupt(); } catch (Throwable e) { LOG.warn( "Ignoring Exception when closing processor(cleanup). Exception class={}, message={}", @@ -792,13 +833,19 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { inputIterator.remove(); try { ((InputFrameworkInterface)entry.getValue()).close(); + maybeResetInterruptStatus(); + } catch (InterruptedException ie) { + //reset the status + LOG.info("Resetting interrupt status for input with srcVertexName={}", + srcVertexName); + Thread.currentThread().interrupt(); } catch (Throwable e) { LOG.warn( "Ignoring exception when closing input {}(cleanup). Exception class={}, message={}", srcVertexName, e.getClass().getName(), e.getMessage()); } finally { - LOG.info("Close input for vertex={}, sourceVertex={}", processor - .getContext().getTaskVertexName(), srcVertexName); + LOG.info("Close input for vertex={}, sourceVertex={}, interruptedStatus={}", processor + .getContext().getTaskVertexName(), srcVertexName, Thread.currentThread().isInterrupted()); } } @@ -810,16 +857,26 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { outputIterator.remove(); try { ((OutputFrameworkInterface) entry.getValue()).close(); + maybeResetInterruptStatus(); + } catch (InterruptedException ie) { + //reset the status + LOG.info("Resetting interrupt status for output with destVertexName={}", + destVertexName); + Thread.currentThread().interrupt(); } catch (Throwable e) { LOG.warn( "Ignoring exception when closing output {}(cleanup). Exception class={}, message={}", destVertexName, e.getClass().getName(), e.getMessage()); } finally { - LOG.info("Close input for vertex={}, sourceVertex={}", processor - .getContext().getTaskVertexName(), destVertexName); + LOG.info("Close input for vertex={}, sourceVertex={}, interruptedStatus={}", processor + .getContext().getTaskVertexName(), destVertexName, Thread.currentThread().isInterrupted()); } } + if (LOG.isDebugEnabled()) { + printThreads(); + } + try { closeContexts(); // Cleanup references which may be held by misbehaved tasks. @@ -867,6 +924,20 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { inputReadyTracker = null; objectRegistry = null; } + + + /** + * Print all threads in JVM (only for debugging) + */ + void printThreads() { + //Print the status of all threads in JVM + ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); + long[] threadIds = threadMXBean.getAllThreadIds(); + for (Long id : threadIds) { + ThreadInfo threadInfo = threadMXBean.getThreadInfo(id); + LOG.info("ThreadId : " + id + ", name=" + threadInfo.getThreadName()); + } + } @Private @VisibleForTesting http://git-wip-us.apache.org/repos/asf/tez/blob/e4f7ea0a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java index 17d7053..cdfb46a 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java @@ -76,6 +76,10 @@ public abstract class RuntimeTask { protected final AtomicReference<State> state = new AtomicReference<State>(); + public boolean isRunning() { + return (state.get() == State.RUNNING); + } + public TezCounters addAndGetTezCounter(String name) { TezCounters counter = new TezCounters(); counterMap.put(name, counter); @@ -163,4 +167,5 @@ public abstract class RuntimeTask { taskDone.set(true); } + public abstract void abortTask() throws Exception; } http://git-wip-us.apache.org/repos/asf/tez/blob/e4f7ea0a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java index 33a7f4a..7238d5e 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java @@ -25,8 +25,13 @@ import java.security.PrivilegedExceptionAction; import java.util.Collection; import java.util.Map; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import com.google.common.base.Throwables; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSError; @@ -35,6 +40,7 @@ import org.apache.tez.common.CallableWithNdc; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask; +import org.apache.tez.runtime.RuntimeTask; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.ObjectRegistry; import org.apache.tez.runtime.api.impl.EventMetaData; @@ -61,6 +67,7 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter { private final ListeningExecutorService executor; private volatile ListenableFuture<Void> taskFuture; private volatile Thread waitingThread; + private volatile Thread taskRunner; private volatile Throwable firstException; // Effectively a duplicate check, since hadFatalError does the same thing. @@ -96,7 +103,10 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter { taskReporter.registerTask(task, this); TaskRunnerCallable callable = new TaskRunnerCallable(); Throwable failureCause = null; - taskFuture = executor.submit(callable); + if (!Thread.currentThread().isInterrupted()) { + taskFuture = executor.submit(callable); + return isShutdownRequested(); + } try { taskFuture.get(); @@ -158,6 +168,10 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter { } } } + return isShutdownRequested(); + } + + private boolean isShutdownRequested() { if (shutdownRequested.get()) { LOG.info("Shutdown requested... returning"); return false; @@ -173,11 +187,14 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter { @Override public Void run() throws Exception { try { + taskRunner = Thread.currentThread(); LOG.info("Initializing task" + ", taskAttemptId=" + task.getTaskAttemptID()); task.initialize(); if (!Thread.currentThread().isInterrupted() && firstException == null) { LOG.info("Running task, taskAttemptId=" + task.getTaskAttemptID()); task.run(); + maybeInterruptWaitingThread(); + LOG.info("Closing task, taskAttemptId=" + task.getTaskAttemptID()); task.close(); task.setFrameworkCounters(); @@ -199,6 +216,12 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter { } return null; } catch (Throwable cause) { + if (Thread.currentThread().isInterrupted()) { + LOG.info("TaskRunnerCallable interrupted=" + Thread.currentThread().isInterrupted() + + ", shutdownRequest=" + shutdownRequested.get()); + Thread.currentThread().interrupt(); + return null; + } if (cause instanceof FSError) { // Not immediately fatal, this is an error reported by Hadoop FileSystem maybeRegisterFirstException(cause); @@ -255,6 +278,17 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter { taskRunning.set(false); } } + + private void maybeInterruptWaitingThread() { + /** + * Possible that the processor is swallowing InterruptException of taskRunner.interrupt(). + * In such case, interrupt the waitingThread based on the shutdownRequested flag, so that + * entire task gets cancelled. + */ + if (shutdownRequested.get()) { + waitingThread.interrupt(); + } + } } // should wait until all messages are sent to AM before TezChild shutdown @@ -353,10 +387,43 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter { } } + private void abortRunningTask() { + if (!taskRunning.get()) { + LOG.info("Task is not running"); + waitingThread.interrupt(); + return; + } + + if (taskRunning.get()) { + try { + task.abortTask(); + } catch (Exception e) { + LOG.warn("Error when aborting the task", e); + try { + sendFailure(e, "Error when aborting the task"); + } catch (Exception ignored) { + // Ignored. + } + } + } + //Interrupt the relevant threads. TaskRunner should be interrupted preferably. + if (isTaskRunning()) { + LOG.info("Interrupting taskRunner=" + taskRunner.getName()); + taskRunner.interrupt(); + } else { + LOG.info("Interrupting waitingThread=" + waitingThread.getName()); + waitingThread.interrupt(); + } + } + + private boolean isTaskRunning() { + return (taskRunning.get() && task.isRunning()); + } + @Override public void shutdownRequested() { shutdownRequested.set(true); - waitingThread.interrupt(); + abortRunningTask(); } private String getTaskDiagnosticsString(Throwable t, String message) {
