Repository: tez
Updated Branches:
  refs/heads/master 614937c5d -> 8dcf8a121


TEZ-3803. Tasks can get killed due to insufficient progress while waiting for 
shuffle inputs to complete. Contributed by Kuhu Shukla


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8dcf8a12
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8dcf8a12
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8dcf8a12

Branch: refs/heads/master
Commit: 8dcf8a121f5961e2974ef1121ec9d0200cbdc0ae
Parents: 614937c
Author: Jason Lowe <jl...@yahoo-inc.com>
Authored: Fri Aug 4 15:21:54 2017 -0500
Committer: Jason Lowe <jl...@yahoo-inc.com>
Committed: Fri Aug 4 15:21:54 2017 -0500

----------------------------------------------------------------------
 .../common/shuffle/impl/ShuffleManager.java     |  9 +++---
 .../orderedgrouped/ShuffleScheduler.java        | 33 +++++++++++---------
 .../common/shuffle/impl/TestShuffleManager.java | 21 +++++++++++++
 .../orderedgrouped/TestShuffleScheduler.java    | 24 ++++++++++++++
 4 files changed, 69 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/8dcf8a12/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index e1b7f99..24fb12b 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -37,6 +37,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
@@ -320,10 +321,10 @@ public class ShuffleManager implements FetcherCallback {
       while (!isShutdown.get() && numCompletedInputs.get() < numInputs) {
         lock.lock();
         try {
-          if (runningFetchers.size() >= numFetchers || pendingHosts.isEmpty()) 
{
-            if (numCompletedInputs.get() < numInputs) {
-              wakeLoop.await();
-            }
+          while ((runningFetchers.size() >= numFetchers || 
pendingHosts.isEmpty())
+              && numCompletedInputs.get() < numInputs) {
+            inputContext.notifyProgress();
+            boolean ret = wakeLoop.await(1000, TimeUnit.MILLISECONDS);
           }
         } finally {
           lock.unlock();

http://git-wip-us.apache.org/repos/asf/tez/blob/8dcf8a12/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index b223c1a..981e224 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -1120,7 +1120,7 @@ class ShuffleScheduler {
       if (LOG.isDebugEnabled()) {
         LOG.debug("PendingHosts=" + pendingHosts);
       }
-      wait();
+      waitAndNotifyProgress();
     }
 
     if (!pendingHosts.isEmpty()) {
@@ -1360,19 +1360,19 @@ class ShuffleScheduler {
     protected Void callInternal() throws InterruptedException {
       while (!isShutdown.get() && remainingMaps.get() > 0) {
         synchronized (ShuffleScheduler.this) {
-          if (runningFetchers.size() >= numFetchers || pendingHosts.isEmpty()) 
{
-            if (remainingMaps.get() > 0) {
-              try {
-                ShuffleScheduler.this.wait();
-              } catch (InterruptedException e) {
-                if (isShutdown.get()) {
-                  LOG.info(srcNameTrimmed + ": " +
-                      "Interrupted while waiting for fetchers to complete and 
hasBeenShutdown. Breaking out of ShuffleSchedulerCallable loop");
-                  Thread.currentThread().interrupt();
-                  break;
-                } else {
-                  throw e;
-                }
+          while ((runningFetchers.size() >= numFetchers || 
pendingHosts.isEmpty())
+              && remainingMaps.get() > 0) {
+            try {
+              waitAndNotifyProgress();
+            } catch (InterruptedException e) {
+              if (isShutdown.get()) {
+                LOG.info(srcNameTrimmed + ": " +
+                    "Interrupted while waiting for fetchers to complete" +
+                    "and hasBeenShutdown. Breaking out of 
ShuffleSchedulerCallable loop");
+                Thread.currentThread().interrupt();
+                break;
+              } else {
+                throw e;
               }
             }
           }
@@ -1446,6 +1446,11 @@ class ShuffleScheduler {
     }
   }
 
+  private synchronized void waitAndNotifyProgress() throws 
InterruptedException {
+      inputContext.notifyProgress();
+      wait(1000);
+  }
+
   @VisibleForTesting
   FetcherOrderedGrouped constructFetcherForHost(MapHost mapHost) {
     return new FetcherOrderedGrouped(httpConnectionParams, 
ShuffleScheduler.this, allocator,

http://git-wip-us.apache.org/repos/asf/tez/blob/8dcf8a12/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
index f361dc7..23248ed 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -193,6 +194,26 @@ public class TestShuffleManager {
     verify(inputContext).createTezFrameworkExecutorService(anyInt(), 
anyString());
   }
 
+  @Test (timeout = 20000)
+  public void testProgressWithEmptyPendingHosts() throws Exception {
+    InputContext inputContext = createInputContext();
+    final ShuffleManager shuffleManager = 
spy(createShuffleManager(inputContext, 1));
+    Thread schedulerGetHostThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          shuffleManager.run();
+          } catch (Exception e) {
+          e.printStackTrace();
+        }
+      }
+    });
+    schedulerGetHostThread.start();
+    Thread.currentThread().sleep(1000 * 3 + 1000);
+    schedulerGetHostThread.interrupt();
+    verify(inputContext, atLeast(3)).notifyProgress();
+  }
+
   private ShuffleManagerForTest createShuffleManager(
       InputContext inputContext, int expectedNumOfPhysicalInputs)
           throws IOException {

http://git-wip-us.apache.org/repos/asf/tez/blob/8dcf8a12/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
index c61391c..381ad85 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
@@ -812,6 +812,28 @@ public class TestShuffleScheduler {
     assertFalse("Host identifier mismatch", (host.getHost() + ":" + 
host.getPort() + ":" + host.getPartitionId()).equalsIgnoreCase("host0:10000"));
   }
 
+  @Test (timeout = 20000)
+  public void testProgressDuringGetHostWait() throws IOException, 
InterruptedException {
+    long startTime = System.currentTimeMillis();
+    Configuration conf = new TezConfiguration();
+    Shuffle shuffle = mock(Shuffle.class);
+    final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 1, 
shuffle, conf);
+    Thread schedulerGetHostThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          scheduler.getHost();
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
+      }
+    });
+    schedulerGetHostThread.start();
+    Thread.currentThread().sleep(1000 * 3 + 1000);
+    schedulerGetHostThread.interrupt();
+    verify(scheduler.inputContext, atLeast(3)).notifyProgress();
+  }
+
   @Test(timeout = 5000)
   public void testShutdown() throws Exception {
     InputContext inputContext = createTezInputContext();
@@ -964,6 +986,7 @@ public class TestShuffleScheduler {
     private final AtomicInteger numFetchersCreated = new AtomicInteger(0);
     private final boolean fetcherShouldWait;
     private final ExceptionReporter reporter;
+    private final InputContext inputContext;
 
     public ShuffleSchedulerForTest(InputContext inputContext, Configuration 
conf,
                                    int numberOfInputs,
@@ -989,6 +1012,7 @@ public class TestShuffleScheduler {
           ifileReadAhead, ifileReadAheadLength, srcNameTrimmed);
       this.fetcherShouldWait = fetcherShouldWait;
       this.reporter = shuffle;
+      this.inputContext = inputContext;
     }
 
     @Override

Reply via email to