Repository: tez Updated Branches: refs/heads/master de72fbe62 -> 020a7c873
TEZ-3767. Shuffle should not report error to AM during inputContext.killSelf(). (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/020a7c87 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/020a7c87 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/020a7c87 Branch: refs/heads/master Commit: 020a7c87381872ea138ecfcada2846b270a4f471 Parents: de72fbe Author: Rajesh Balamohan <[email protected]> Authored: Wed Jun 28 06:24:29 2017 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Wed Jun 28 06:24:29 2017 +0530 ---------------------------------------------------------------------- .../orderedgrouped/ExceptionReporter.java | 1 + .../common/shuffle/orderedgrouped/Shuffle.java | 9 ++++++ .../orderedgrouped/ShuffleScheduler.java | 6 +--- .../shuffle/orderedgrouped/TestShuffle.java | 30 ++++++++++++++++++++ 4 files changed, 41 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/020a7c87/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ExceptionReporter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ExceptionReporter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ExceptionReporter.java index 8739dd2..1fa99af 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ExceptionReporter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ExceptionReporter.java @@ -22,4 +22,5 @@ package org.apache.tez.runtime.library.common.shuffle.orderedgrouped; */ interface ExceptionReporter { void reportException(Throwable t); + void killSelf(Exception exception, String message); } http://git-wip-us.apache.org/repos/asf/tez/blob/020a7c87/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java index f787c59..0089d8c 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java @@ -407,6 +407,15 @@ public class Shuffle implements ExceptionReporter { cleanupShuffleSchedulerIgnoreErrors(); } } + + @Private + @Override + public synchronized void killSelf(Exception exception, String message) { + if (!isShutDown.get() && throwable.get() == null) { + shutdown(); + inputContext.killSelf(exception, message); + } + } public static class ShuffleError extends IOException { private static final long serialVersionUID = 5753909320586607881L; http://git-wip-us.apache.org/repos/asf/tez/blob/020a7c87/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index c42ffda..b223c1a 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -728,11 +728,7 @@ class ShuffleScheduler { @VisibleForTesting void killSelf(Exception exception, String message) { LOG.error(message, exception); - try { - this.close(); - } finally { - this.inputContext.killSelf(exception, message); - } + exceptionReporter.killSelf(exception, message); } private final AtomicInteger nextProgressLineEventCount = new AtomicInteger(0); http://git-wip-us.apache.org/repos/asf/tez/blob/020a7c87/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java index cad9523..a28b1fa 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java @@ -108,6 +108,36 @@ public class TestShuffle { } + @Test(timeout = 10000) + public void testKillSelf() throws IOException, InterruptedException { + InputContext inputContext = createTezInputContext(); + TezConfiguration conf = new TezConfiguration(); + conf.setLong(Constants.TEZ_RUNTIME_TASK_MEMORY, 300000l); + Shuffle shuffle = new Shuffle(inputContext, conf, 1, 3000000l); + try { + shuffle.run(); + ShuffleScheduler scheduler = shuffle.scheduler; + assertFalse("scheduler.isShutdown should be false", scheduler.isShutdown()); + + // killSelf() would invoke close(). Internally Shuffle --> merge.close() --> finalMerge() + // gets called. In MergeManager::finalMerge(), it would throw illegal argument exception as + // uniqueIdentifier is not present in inputContext. This is used as means of simulating + // exception. + scheduler.killSelf(new Exception(), "due to internal error"); + assertTrue("scheduler.isShutdown should be true", scheduler.isShutdown()); + + //killSelf() should not result in reporting failure to AM + ArgumentCaptor<Throwable> throwableArgumentCaptor = ArgumentCaptor.forClass(Throwable.class); + ArgumentCaptor<String> stringArgumentCaptor = ArgumentCaptor.forClass(String.class); + verify(inputContext, times(0)).reportFailure(eq(TaskFailureType.NON_FATAL), + throwableArgumentCaptor.capture(), + stringArgumentCaptor.capture()); + } finally { + shuffle.shutdown(); + } + + } + private InputContext createTezInputContext() throws IOException { ApplicationId applicationId = ApplicationId.newInstance(1, 1);
