Repository: tez Updated Branches: refs/heads/branch-0.8 aff0edb87 -> b5650a492
TEZ-3534. Differentiate thread names on Fetchers, minor changes to shuffle shutdown code. (sseth) (cherry picked from commit 0d598442640ff3cb6ca52f2077bfddb62b54e628) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b5650a49 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b5650a49 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b5650a49 Branch: refs/heads/branch-0.8 Commit: b5650a492de1330954367872489f7aadc980c93d Parents: aff0edb Author: Siddharth Seth <[email protected]> Authored: Fri Nov 11 10:11:26 2016 -0800 Committer: Siddharth Seth <[email protected]> Committed: Fri Nov 11 10:12:55 2016 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../common/shuffle/impl/ShuffleManager.java | 10 ++++-- .../common/shuffle/orderedgrouped/Shuffle.java | 8 ++++- .../orderedgrouped/ShuffleScheduler.java | 33 ++++++++++++++++---- .../orderedgrouped/TestShuffleScheduler.java | 4 --- 5 files changed, 43 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/b5650a49/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a21554b..7023d0d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3534. Differentiate thread names on Fetchers, minor changes to shuffle shutdown code. TEZ-3491. Tez job can hang due to container priority inversion. TEZ-3533. ShuffleScheduler should shutdown threadpool on exit. TEZ-3493. DAG submit timeout cannot be set to a month http://git-wip-us.apache.org/repos/asf/tez/blob/b5650a49/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index 0cb17e6..84c0779 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -230,7 +230,7 @@ public class ShuffleManager implements FetcherCallback { ExecutorService fetcherRawExecutor = Executors.newFixedThreadPool( numFetchers, new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("Fetcher {" + srcNameTrimmed + "} #%d").build()); + .setNameFormat("Fetcher_B {" + srcNameTrimmed + "} #%d").build()); this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor); ExecutorService schedulerRawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() @@ -799,7 +799,13 @@ public class ShuffleManager implements FetcherCallback { try { wakeLoop.signal(); // signal the fetch-scheduler for (Fetcher fetcher : runningFetchers) { - fetcher.shutdown(); // This could be parallelized. + try { + fetcher.shutdown(); // This could be parallelized. + } catch (Exception e) { + LOG.warn( + "Error while stopping fetcher during shutdown. Ignoring and continuing. Message={}", + e.getMessage()); + } } } finally { lock.unlock(); http://git-wip-us.apache.org/repos/asf/tez/blob/b5650a49/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java index 5a18959..e5f4e5c 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java @@ -378,7 +378,13 @@ public class Shuffle implements ExceptionReporter { if (eventHandler != null) { eventHandler.logProgress(true); } - cleanupShuffleSchedulerIgnoreErrors(); + try { + cleanupShuffleSchedulerIgnoreErrors(); + } catch (Exception e) { + LOG.warn( + "Error cleaning up shuffle scheduler. Ignoring and continuing with shutdown. Message={}", + e.getMessage()); + } cleanupMerger(true); } catch (Throwable t) { LOG.info(srcNameTrimmed + ": " + "Error in cleaning up.., ", t); http://git-wip-us.apache.org/repos/asf/tez/blob/b5650a49/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index 61dc456..84da654 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -376,7 +376,7 @@ class ShuffleScheduler { ExecutorService fetcherRawExecutor = Executors.newFixedThreadPool(numFetchers, new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("Fetcher {" + srcNameTrimmed + "} #%d").build()); + .setNameFormat("Fetcher_O {" + srcNameTrimmed + "} #%d").build()); this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor); this.maxFailedUniqueFetches = Math.min(numberOfInputs, 5); @@ -429,10 +429,15 @@ class ShuffleScheduler { schedulerCallable.call(); } - public void close() throws InterruptedException { + public void close() { try { if (!isShutdown.getAndSet(true)) { - logProgress(); + try { + logProgress(); + } catch (Exception e) { + LOG.warn("Failed log progress while closing, ignoring and continuing shutdown. Message={}", + e.getMessage()); + } // Notify and interrupt the waiting scheduler thread synchronized (this) { @@ -450,12 +455,28 @@ class ShuffleScheduler { // Interrupt the fetchers. for (FetcherOrderedGrouped fetcher : runningFetchers) { - fetcher.shutDown(); + try { + fetcher.shutDown(); + } catch (Exception e) { + LOG.warn( + "Error while shutting down fetcher. Ignoring and continuing shutdown. Message={}", + e.getMessage()); + } } // Kill the Referee thread. - referee.interrupt(); - referee.join(); + try { + referee.interrupt(); + referee.join(); + } catch (InterruptedException e) { + LOG.warn( + "Interrupted while shutting down referee. Ignoring and continuing shutdown"); + Thread.currentThread().interrupt(); + } catch (Exception e) { + LOG.warn( + "Error while shutting down referee. Ignoring and continuing shutdown. Message={}", + e.getMessage()); + } } } finally { long startTime = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/tez/blob/b5650a49/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java index 5b6c59f..31da4d0 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java @@ -881,11 +881,7 @@ public class TestShuffleScheduler { // Close the scheduler on different thread to trigger interrupt Thread thread = new Thread(new Runnable() { @Override public void run() { - try { scheduler.close(); - } catch (InterruptedException e) { - //ignore - } } }); thread.start();
