Repository: tez Updated Branches: refs/heads/master a5d141dac -> 6d431469b
TEZ-3462. Task attempt failure during container shutdown loses useful container diagnostics. Contributed by Eric Badger Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6d431469 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6d431469 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6d431469 Branch: refs/heads/master Commit: 6d431469be426529f4397bb8d51af0694c39b0d4 Parents: a5d141d Author: Jason Lowe <[email protected]> Authored: Mon Jan 23 19:20:36 2017 +0000 Committer: Jason Lowe <[email protected]> Committed: Mon Jan 23 19:20:36 2017 +0000 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/tez/runtime/task/TaskReporter.java | 15 ++++++- .../tez/runtime/task/TestTaskExecution2.java | 42 ++++++++++++++++++-- 3 files changed, 54 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/6d431469/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c0b6fca..303f7f3 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3462. Task attempt failure during container shutdown loses useful container diagnostics TEZ-3579. Wrong configuration key for max slow start fraction in CartesianProductVertexManager. TEZ-3458. Auto grouping for cartesian product edge(unpartitioned case). TEZ-3574. Container reuse won't pickup extra dag level local resource. @@ -179,6 +180,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3462. Task attempt failure during container shutdown loses useful container diagnostics TEZ-3574. Container reuse won't pickup extra dag level local resource. TEZ-3566. Avoid caching fs isntances in TokenCache after a point. TEZ-3568. Update SecurityUtils configuration to pick user provided configuration. http://git-wip-us.apache.org/repos/asf/tez/blob/6d431469/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java index d1c1471..809ce32 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java @@ -34,6 +34,7 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.hadoop.util.ShutdownHookManager; import org.apache.tez.common.TezTaskUmbilicalProtocol; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.TezException; @@ -125,6 +126,10 @@ public class TaskReporter implements TaskReporterInterface { heartbeatExecutor.shutdownNow(); } + protected boolean isShuttingDown() { + return ShutdownHookManager.get().isShutdownInProgress(); + } + @VisibleForTesting static class HeartbeatCallable implements Callable<Boolean> { @@ -447,13 +452,19 @@ public class TaskReporter implements TaskReporterInterface { Throwable t, String diagnostics, EventMetaData srcMeta) throws IOException, TezException { - return currentCallable.taskTerminated(taskAttemptID, false, taskFailureType, t, diagnostics, srcMeta); + if(!isShuttingDown()) { + return currentCallable.taskTerminated(taskAttemptID, false, taskFailureType, t, diagnostics, srcMeta); + } + return false; } @Override public boolean taskKilled(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics, EventMetaData srcMeta) throws IOException, TezException { - return currentCallable.taskTerminated(taskAttemptID, true, null, t, diagnostics, srcMeta); + if(!isShuttingDown()) { + return currentCallable.taskTerminated(taskAttemptID, true, null, t, diagnostics, srcMeta); + } + return false; } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/6d431469/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java index 6cb49fa..adcbe4a 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java @@ -14,9 +14,7 @@ package org.apache.tez.runtime.task; -import static org.apache.tez.runtime.task.TaskExecutionTestHelpers.createProcessorIOException; -import static org.apache.tez.runtime.task.TaskExecutionTestHelpers.createProcessorTezException; -import static org.apache.tez.runtime.task.TaskExecutionTestHelpers.createTaskReporter; +import static org.apache.tez.runtime.task.TaskExecutionTestHelpers.*; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -35,6 +33,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @@ -310,6 +309,43 @@ public class TestTaskExecution2 { } } + // test that makes sure errors aren't reported when the container is already failing + @Test(timeout = 5000) + public void testIgnoreErrorsDuringFailure() throws IOException, InterruptedException, TezException, + ExecutionException { + + ListeningExecutorService executor = null; + try { + ExecutorService rawExecutor = Executors.newFixedThreadPool(1); + executor = MoreExecutors.listeningDecorator(rawExecutor); + ApplicationId appId = ApplicationId.newInstance(10000, 1); + TaskExecutionTestHelpers.TezTaskUmbilicalForTest + umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest(); + + TaskReporter taskReporter = new TaskReporter(umbilical, 100, 1000, 100, new AtomicLong(0), + createContainerId(appId).toString()) { + @Override + protected boolean isShuttingDown() { + return true; + } + }; + + TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor, + TestProcessor.CONF_THROW_IO_EXCEPTION); + // Setup the executor + + taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner)); + + // Signal the processor to go through + TestProcessor.awaitStart(); + TestProcessor.signal(); + + umbilical.verifyNoCompletionEvents(); + } finally { + executor.shutdownNow(); + } + } + @Test(timeout = 5000) public void testHeartbeatException() throws IOException, InterruptedException, TezException, ExecutionException {
