TEZ-2186. OOM with a simple scatter gather job with re-use (rbalamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/cbd7323c Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/cbd7323c Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/cbd7323c Branch: refs/heads/TEZ-2003 Commit: cbd7323cd9027e2f1459ba477c7f01c99306ba31 Parents: 2703f0b Author: Rajesh Balamohan <[email protected]> Authored: Mon Mar 23 05:11:04 2015 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Mon Mar 23 05:11:04 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../common/shuffle/orderedgrouped/Shuffle.java | 52 +++++++++++--------- 2 files changed, 31 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/cbd7323c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9e11008..4562d9b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ Release 0.7.0: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2186. OOM with a simple scatter gather job with re-use TEZ-2209. Fix pipelined shuffle to fetch data from any one attempt TEZ-2210. Record DAG AM CPU usage stats TEZ-2203. Intern strings in tez counters http://git-wip-us.apache.org/repos/asf/tez/blob/cbd7323c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java index f7b45a7..f9f4ec2 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java @@ -20,7 +20,6 @@ package org.apache.tez.runtime.library.common.shuffle.orderedgrouped; import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; import java.util.List; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -331,14 +330,18 @@ public class Shuffle implements ExceptionReporter { protected TezRawKeyValueIterator callInternal() throws IOException, InterruptedException { synchronized (this) { - for (int i = 0; i < numFetchers; ++i) { - FetcherOrderedGrouped - fetcher = new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger, - metrics, Shuffle.this, jobTokenSecretMgr, ifileReadAhead, ifileReadAheadLength, - codec, inputContext, conf, localDiskFetchEnabled, - inputContext.getExecutionContext().getHostName()); - fetchers.add(fetcher); - fetcher.start(); + synchronized (fetchers) { + for (int i = 0; i < numFetchers; ++i) { + if (!isShutDown.get()) { + FetcherOrderedGrouped + fetcher = new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger, + metrics, Shuffle.this, jobTokenSecretMgr, ifileReadAhead, ifileReadAheadLength, + codec, inputContext, conf, localDiskFetchEnabled, + inputContext.getExecutionContext().getHostName()); + fetchers.add(fetcher); + fetcher.start(); + } + } } } @@ -386,23 +389,28 @@ public class Shuffle implements ExceptionReporter { // Stop the fetcher threads InterruptedException ie = null; if (!fetchersClosed.getAndSet(true)) { - for (FetcherOrderedGrouped fetcher : fetchers) { - try { - fetcher.shutDown(); - } catch (InterruptedException e) { - if (ignoreErrors) { - LOG.info("Interrupted while shutting down fetchers. Ignoring."); - } else { - if (ie != null) { - ie = e; + synchronized (fetchers) { + for (FetcherOrderedGrouped fetcher : fetchers) { + try { + fetcher.shutDown(); + LOG.info("Shutdown.." + fetcher.getName() + ", status:" + fetcher.isAlive() + ", " + + "isInterrupted:" + fetcher.isInterrupted()); + } catch (InterruptedException e) { + if (ignoreErrors) { + LOG.info("Interrupted while shutting down fetchers. Ignoring."); } else { - LOG.warn("Ignoring exception while shutting down fetcher since a previous one was seen and will be thrown " - + e); + if (ie != null) { + ie = e; + } else { + LOG.warn( + "Ignoring exception while shutting down fetcher since a previous one was seen and will be thrown " + + e); + } } } } + fetchers.clear(); } - fetchers.clear(); // throw only the first exception while attempting to shutdown. if (ie != null) { throw ie; @@ -445,7 +453,7 @@ public class Shuffle implements ExceptionReporter { cleanupShuffleScheduler(true); cleanupMerger(true); } catch (Throwable t) { - // Ignore + LOG.info("Error in cleaning up.., ", t); } }
