Repository: tez Updated Branches: refs/heads/master 614937c5d -> 8dcf8a121
TEZ-3803. Tasks can get killed due to insufficient progress while waiting for shuffle inputs to complete. Contributed by Kuhu Shukla Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8dcf8a12 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8dcf8a12 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8dcf8a12 Branch: refs/heads/master Commit: 8dcf8a121f5961e2974ef1121ec9d0200cbdc0ae Parents: 614937c Author: Jason Lowe <jl...@yahoo-inc.com> Authored: Fri Aug 4 15:21:54 2017 -0500 Committer: Jason Lowe <jl...@yahoo-inc.com> Committed: Fri Aug 4 15:21:54 2017 -0500 ---------------------------------------------------------------------- .../common/shuffle/impl/ShuffleManager.java | 9 +++--- .../orderedgrouped/ShuffleScheduler.java | 33 +++++++++++--------- .../common/shuffle/impl/TestShuffleManager.java | 21 +++++++++++++ .../orderedgrouped/TestShuffleScheduler.java | 24 ++++++++++++++ 4 files changed, 69 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/8dcf8a12/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index e1b7f99..24fb12b 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -37,6 +37,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; @@ -320,10 +321,10 @@ public class ShuffleManager implements FetcherCallback { while (!isShutdown.get() && numCompletedInputs.get() < numInputs) { lock.lock(); try { - if (runningFetchers.size() >= numFetchers || pendingHosts.isEmpty()) { - if (numCompletedInputs.get() < numInputs) { - wakeLoop.await(); - } + while ((runningFetchers.size() >= numFetchers || pendingHosts.isEmpty()) + && numCompletedInputs.get() < numInputs) { + inputContext.notifyProgress(); + boolean ret = wakeLoop.await(1000, TimeUnit.MILLISECONDS); } } finally { lock.unlock(); http://git-wip-us.apache.org/repos/asf/tez/blob/8dcf8a12/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index b223c1a..981e224 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -1120,7 +1120,7 @@ class ShuffleScheduler { if (LOG.isDebugEnabled()) { LOG.debug("PendingHosts=" + pendingHosts); } - wait(); + waitAndNotifyProgress(); } if (!pendingHosts.isEmpty()) { @@ -1360,19 +1360,19 @@ class ShuffleScheduler { protected Void callInternal() throws InterruptedException { while (!isShutdown.get() && remainingMaps.get() > 0) { synchronized (ShuffleScheduler.this) { - if (runningFetchers.size() >= numFetchers || pendingHosts.isEmpty()) { - if (remainingMaps.get() > 0) { - try { - ShuffleScheduler.this.wait(); - } catch (InterruptedException e) { - if (isShutdown.get()) { - LOG.info(srcNameTrimmed + ": " + - "Interrupted while waiting for fetchers to complete and hasBeenShutdown. Breaking out of ShuffleSchedulerCallable loop"); - Thread.currentThread().interrupt(); - break; - } else { - throw e; - } + while ((runningFetchers.size() >= numFetchers || pendingHosts.isEmpty()) + && remainingMaps.get() > 0) { + try { + waitAndNotifyProgress(); + } catch (InterruptedException e) { + if (isShutdown.get()) { + LOG.info(srcNameTrimmed + ": " + + "Interrupted while waiting for fetchers to complete" + + "and hasBeenShutdown. Breaking out of ShuffleSchedulerCallable loop"); + Thread.currentThread().interrupt(); + break; + } else { + throw e; } } } @@ -1446,6 +1446,11 @@ class ShuffleScheduler { } } + private synchronized void waitAndNotifyProgress() throws InterruptedException { + inputContext.notifyProgress(); + wait(1000); + } + @VisibleForTesting FetcherOrderedGrouped constructFetcherForHost(MapHost mapHost) { return new FetcherOrderedGrouped(httpConnectionParams, ShuffleScheduler.this, allocator, http://git-wip-us.apache.org/repos/asf/tez/blob/8dcf8a12/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java index f361dc7..23248ed 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -193,6 +194,26 @@ public class TestShuffleManager { verify(inputContext).createTezFrameworkExecutorService(anyInt(), anyString()); } + @Test (timeout = 20000) + public void testProgressWithEmptyPendingHosts() throws Exception { + InputContext inputContext = createInputContext(); + final ShuffleManager shuffleManager = spy(createShuffleManager(inputContext, 1)); + Thread schedulerGetHostThread = new Thread(new Runnable() { + @Override + public void run() { + try { + shuffleManager.run(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + schedulerGetHostThread.start(); + Thread.currentThread().sleep(1000 * 3 + 1000); + schedulerGetHostThread.interrupt(); + verify(inputContext, atLeast(3)).notifyProgress(); + } + private ShuffleManagerForTest createShuffleManager( InputContext inputContext, int expectedNumOfPhysicalInputs) throws IOException { http://git-wip-us.apache.org/repos/asf/tez/blob/8dcf8a12/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java index c61391c..381ad85 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java @@ -812,6 +812,28 @@ public class TestShuffleScheduler { assertFalse("Host identifier mismatch", (host.getHost() + ":" + host.getPort() + ":" + host.getPartitionId()).equalsIgnoreCase("host0:10000")); } + @Test (timeout = 20000) + public void testProgressDuringGetHostWait() throws IOException, InterruptedException { + long startTime = System.currentTimeMillis(); + Configuration conf = new TezConfiguration(); + Shuffle shuffle = mock(Shuffle.class); + final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 1, shuffle, conf); + Thread schedulerGetHostThread = new Thread(new Runnable() { + @Override + public void run() { + try { + scheduler.getHost(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + schedulerGetHostThread.start(); + Thread.currentThread().sleep(1000 * 3 + 1000); + schedulerGetHostThread.interrupt(); + verify(scheduler.inputContext, atLeast(3)).notifyProgress(); + } + @Test(timeout = 5000) public void testShutdown() throws Exception { InputContext inputContext = createTezInputContext(); @@ -964,6 +986,7 @@ public class TestShuffleScheduler { private final AtomicInteger numFetchersCreated = new AtomicInteger(0); private final boolean fetcherShouldWait; private final ExceptionReporter reporter; + private final InputContext inputContext; public ShuffleSchedulerForTest(InputContext inputContext, Configuration conf, int numberOfInputs, @@ -989,6 +1012,7 @@ public class TestShuffleScheduler { ifileReadAhead, ifileReadAheadLength, srcNameTrimmed); this.fetcherShouldWait = fetcherShouldWait; this.reporter = shuffle; + this.inputContext = inputContext; } @Override