Repository: tez Updated Branches: refs/heads/master a93dbf0b2 -> 0d5984426
TEZ-3534. Differentiate thread names on Fetchers, minor changes to shuffle shutdown code. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/0d598442 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/0d598442 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/0d598442 Branch: refs/heads/master Commit: 0d598442640ff3cb6ca52f2077bfddb62b54e628 Parents: a93dbf0 Author: Siddharth Seth <[email protected]> Authored: Fri Nov 11 10:11:26 2016 -0800 Committer: Siddharth Seth <[email protected]> Committed: Fri Nov 11 10:11:26 2016 -0800 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../common/shuffle/impl/ShuffleManager.java | 10 ++++-- .../common/shuffle/orderedgrouped/Shuffle.java | 8 ++++- .../orderedgrouped/ShuffleScheduler.java | 33 ++++++++++++++++---- .../orderedgrouped/TestShuffleScheduler.java | 4 --- 5 files changed, 44 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/0d598442/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index af83c73..8128c7b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3534. Differentiate thread names on Fetchers, minor changes to shuffle shutdown code. TEZ-3491. Tez job can hang due to container priority inversion. TEZ-3533. ShuffleScheduler should shutdown threadpool on exit. TEZ-3477. MRInputHelpers generateInputSplitsToMem public API modified @@ -146,6 +147,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3534. Differentiate thread names on Fetchers, minor changes to shuffle shutdown code. TEZ-3491. Tez job can hang due to container priority inversion. TEZ-3533. ShuffleScheduler should shutdown threadpool on exit. TEZ-3493. DAG submit timeout cannot be set to a month http://git-wip-us.apache.org/repos/asf/tez/blob/0d598442/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index 1ebd3a4..d034b2e 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -234,7 +234,7 @@ public class ShuffleManager implements FetcherCallback { ExecutorService fetcherRawExecutor = Executors.newFixedThreadPool( numFetchers, new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("Fetcher {" + srcNameTrimmed + "} #%d").build()); + .setNameFormat("Fetcher_B {" + srcNameTrimmed + "} #%d").build()); this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor); ExecutorService schedulerRawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() @@ -812,7 +812,13 @@ public class ShuffleManager implements FetcherCallback { try { wakeLoop.signal(); // signal the fetch-scheduler for (Fetcher fetcher : runningFetchers) { - fetcher.shutdown(); // This could be parallelized. + try { + fetcher.shutdown(); // This could be parallelized. + } catch (Exception e) { + LOG.warn( + "Error while stopping fetcher during shutdown. Ignoring and continuing. Message={}", + e.getMessage()); + } } } finally { lock.unlock(); http://git-wip-us.apache.org/repos/asf/tez/blob/0d598442/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java index 5a18959..e5f4e5c 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java @@ -378,7 +378,13 @@ public class Shuffle implements ExceptionReporter { if (eventHandler != null) { eventHandler.logProgress(true); } - cleanupShuffleSchedulerIgnoreErrors(); + try { + cleanupShuffleSchedulerIgnoreErrors(); + } catch (Exception e) { + LOG.warn( + "Error cleaning up shuffle scheduler. Ignoring and continuing with shutdown. Message={}", + e.getMessage()); + } cleanupMerger(true); } catch (Throwable t) { LOG.info(srcNameTrimmed + ": " + "Error in cleaning up.., ", t); http://git-wip-us.apache.org/repos/asf/tez/blob/0d598442/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index 09518e5..3d2c1ad 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -376,7 +376,7 @@ class ShuffleScheduler { ExecutorService fetcherRawExecutor = Executors.newFixedThreadPool(numFetchers, new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("Fetcher {" + srcNameTrimmed + "} #%d").build()); + .setNameFormat("Fetcher_O {" + srcNameTrimmed + "} #%d").build()); this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor); this.maxFailedUniqueFetches = Math.min(numberOfInputs, 5); @@ -429,10 +429,15 @@ class ShuffleScheduler { schedulerCallable.call(); } - public void close() throws InterruptedException { + public void close() { try { if (!isShutdown.getAndSet(true)) { - logProgress(); + try { + logProgress(); + } catch (Exception e) { + LOG.warn("Failed log progress while closing, ignoring and continuing shutdown. Message={}", + e.getMessage()); + } // Notify and interrupt the waiting scheduler thread synchronized (this) { @@ -450,12 +455,28 @@ class ShuffleScheduler { // Interrupt the fetchers. for (FetcherOrderedGrouped fetcher : runningFetchers) { - fetcher.shutDown(); + try { + fetcher.shutDown(); + } catch (Exception e) { + LOG.warn( + "Error while shutting down fetcher. Ignoring and continuing shutdown. Message={}", + e.getMessage()); + } } // Kill the Referee thread. - referee.interrupt(); - referee.join(); + try { + referee.interrupt(); + referee.join(); + } catch (InterruptedException e) { + LOG.warn( + "Interrupted while shutting down referee. Ignoring and continuing shutdown"); + Thread.currentThread().interrupt(); + } catch (Exception e) { + LOG.warn( + "Error while shutting down referee. Ignoring and continuing shutdown. Message={}", + e.getMessage()); + } } } finally { long startTime = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/tez/blob/0d598442/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java index 5b6c59f..31da4d0 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java @@ -881,11 +881,7 @@ public class TestShuffleScheduler { // Close the scheduler on different thread to trigger interrupt Thread thread = new Thread(new Runnable() { @Override public void run() { - try { scheduler.close(); - } catch (InterruptedException e) { - //ignore - } } }); thread.start();
