Repository: tez Updated Branches: refs/heads/master eb6fb67c4 -> c95d3ddc7
TEZ-3533. ShuffleScheduler should shutdown threadpool on exit (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c95d3ddc Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c95d3ddc Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c95d3ddc Branch: refs/heads/master Commit: c95d3ddc712f4cc47b43d00a14d184bcb4369934 Parents: eb6fb67 Author: Rajesh Balamohan <[email protected]> Authored: Wed Nov 9 07:36:36 2016 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Wed Nov 9 07:36:36 2016 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../common/shuffle/orderedgrouped/Shuffle.java | 2 + .../orderedgrouped/ShuffleScheduler.java | 59 +++++++++++------ .../orderedgrouped/TestShuffleScheduler.java | 67 ++++++++++++++++++++ 4 files changed, 108 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/c95d3ddc/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 88767ec..5ddc398 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3533. ShuffleScheduler should shutdown threadpool on exit. TEZ-3477. MRInputHelpers generateInputSplitsToMem public API modified TEZ-3465. Support broadcast edge into cartesian product vertex and forbid other edges. TEZ-3493. DAG submit timeout cannot be set to a month http://git-wip-us.apache.org/repos/asf/tez/blob/c95d3ddc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java index 37269ad..5a18959 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java @@ -291,6 +291,8 @@ public class Shuffle implements ExceptionReporter { scheduler.start(); } catch (Throwable e) { throw new ShuffleError("Error during shuffle", e); + } finally { + cleanupShuffleScheduler(); } } // The ShuffleScheduler may have exited cleanly as a result of a shutdown invocation http://git-wip-us.apache.org/repos/asf/tez/blob/c95d3ddc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index afd280b..09518e5 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -430,35 +430,52 @@ class ShuffleScheduler { } public void close() throws InterruptedException { - if (!isShutdown.getAndSet(true)) { + try { + if (!isShutdown.getAndSet(true)) { + logProgress(); - logProgress(); + // Notify and interrupt the waiting scheduler thread + synchronized (this) { + notifyAll(); + } + // Interrupt the ShuffleScheduler thread only if the close is invoked by another thread. + // If this is invoked on the same thread, then the shuffleRunner has already complete, and there's + // no point interrupting it. + // The interrupt is needed to unblock any merges or waits which may be happening, so that the thread can + // exit. + if (shuffleSchedulerThread != null && !Thread.currentThread() + .equals(shuffleSchedulerThread)) { + shuffleSchedulerThread.interrupt(); + } - // Notify and interrupt the waiting scheduler thread - synchronized (this) { - notifyAll(); - } - // Interrupt the ShuffleScheduler thread only if the close is invoked by another thread. - // If this is invoked on the same thread, then the shuffleRunner has already complete, and there's - // no point interrupting it. - // The interrupt is needed to unblock any merges or waits which may be happening, so that the thread can - // exit. - if (shuffleSchedulerThread != null && !Thread.currentThread().equals(shuffleSchedulerThread)) { - shuffleSchedulerThread.interrupt(); - } + // Interrupt the fetchers. + for (FetcherOrderedGrouped fetcher : runningFetchers) { + fetcher.shutDown(); + } - // Interrupt the fetchers. - for (FetcherOrderedGrouped fetcher : runningFetchers) { - fetcher.shutDown(); + // Kill the Referee thread. + referee.interrupt(); + referee.join(); } - - // Kill the Referee thread. - referee.interrupt(); - referee.join(); + } finally { + long startTime = System.currentTimeMillis(); + if (!fetcherExecutor.isShutdown()) { + // Ensure that fetchers respond to cancel request. + fetcherExecutor.shutdownNow(); + } + long endTime = System.currentTimeMillis(); + LOG.info("Shutting down fetchers for input: {}, shutdown timetaken: {} ms, " + + "hasFetcherExecutorStopped: {}", srcNameTrimmed, + (endTime - startTime), hasFetcherExecutorStopped()); } } @VisibleForTesting + boolean hasFetcherExecutorStopped() { + return fetcherExecutor.isShutdown(); + } + + @VisibleForTesting public boolean isShutdown() { return isShutdown.get(); } http://git-wip-us.apache.org/repos/asf/tez/blob/c95d3ddc/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java index 15cfa48..5b6c59f 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java @@ -830,6 +830,73 @@ public class TestShuffleScheduler { } } + @Test(timeout = 30000) + public void testShutdownWithInterrupt() throws Exception { + InputContext inputContext = createTezInputContext(); + Configuration conf = new TezConfiguration(); + int numInputs = 10; + Shuffle shuffle = mock(Shuffle.class); + MergeManager mergeManager = mock(MergeManager.class); + + final ShuffleSchedulerForTest scheduler = + new ShuffleSchedulerForTest(inputContext, conf, numInputs, shuffle, mergeManager, + mergeManager, + System.currentTimeMillis(), null, false, 0, "srcName"); + + ExecutorService executor = Executors.newFixedThreadPool(1); + + Future<Void> executorFuture = executor.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + scheduler.start(); + return null; + } + }); + + InputAttemptIdentifier[] identifiers = new InputAttemptIdentifier[numInputs]; + + for (int i = 0; i < numInputs; i++) { + InputAttemptIdentifier inputAttemptIdentifier = + new InputAttemptIdentifier(i, 0, "attempt_"); + scheduler.addKnownMapOutput("host" + i, 10000, 1, inputAttemptIdentifier); + identifiers[i] = inputAttemptIdentifier; + } + + MapHost[] mapHosts = new MapHost[numInputs]; + int count = 0; + for (MapHost mh : scheduler.mapLocations.values()) { + mapHosts[count++] = mh; + } + + // Copy succeeded for 1 less host + for (int i = 0; i < numInputs - 1; i++) { + MapOutput mapOutput = MapOutput + .createMemoryMapOutput(identifiers[i], mock(FetchedInputAllocatorOrderedGrouped.class), + 100, false); + scheduler.copySucceeded(identifiers[i], mapHosts[i], 20, 25, 100, mapOutput, false); + scheduler.freeHost(mapHosts[i]); + } + + try { + // Close the scheduler on different thread to trigger interrupt + Thread thread = new Thread(new Runnable() { + @Override public void run() { + try { + scheduler.close(); + } catch (InterruptedException e) { + //ignore + } + } + }); + thread.start(); + thread.join(); + } finally { + assertTrue("Fetcher executor should be shutdown, but still running", + scheduler.hasFetcherExecutorStopped()); + executor.shutdownNow(); + } + } + private InputContext createTezInputContext() throws IOException { ApplicationId applicationId = ApplicationId.newInstance(1, 1);
