Repository: tez
Updated Branches:
  refs/heads/master 50df8651f -> 49866b8fc


TEZ-2750. Shuffle may not shutdown in case of a fetch failure, causing it to 
hang. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/49866b8f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/49866b8f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/49866b8f

Branch: refs/heads/master
Commit: 49866b8fca8b71be9d1edb011599177d610bc0c3
Parents: 50df865
Author: Siddharth Seth <[email protected]>
Authored: Tue Sep 1 16:29:22 2015 -0700
Committer: Siddharth Seth <[email protected]>
Committed: Tue Sep 1 16:29:22 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/tez/http/HttpConnection.java     |   2 +-
 .../tez/runtime/library/common/Constants.java   |   5 +-
 .../orderedgrouped/FetcherOrderedGrouped.java   |  12 +-
 .../shuffle/orderedgrouped/MergeManager.java    |  66 ++++++-----
 .../common/shuffle/orderedgrouped/Shuffle.java  |  48 ++++----
 .../orderedgrouped/ShuffleScheduler.java        |  40 +++++--
 .../library/input/OrderedGroupedKVInput.java    |   2 +
 .../runtime/library/input/UnorderedKVInput.java |   2 +
 .../shuffle/orderedgrouped/TestShuffle.java     | 110 +++++++++++++++++++
 10 files changed, 223 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/49866b8f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3c7d0bd..e922ae1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.8.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2750. Shuffle may not shutdown in case of a fetch failure, causing it to 
hang.
   TEZ-2294. Add tez-site-template.xml with description of config properties.
   TEZ-2757. Fix download links for Tez releases.
   TEZ-2742. VertexImpl.finished() terminationCause hides member var of the

http://git-wip-us.apache.org/repos/asf/tez/blob/49866b8f/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java 
b/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java
index 4732354..841e542 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java
@@ -277,7 +277,7 @@ public class HttpConnection extends BaseHttpConnection {
       }
       if (connection != null && (disconnect || !httpConnParams.isKeepAlive())) 
{
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Closing connection on " + logIdentifier);
+          LOG.debug("Closing connection on " + logIdentifier + ", 
disconnectParam=" + disconnect);
         }
         connection.disconnect();
         connection = null;

http://git-wip-us.apache.org/repos/asf/tez/blob/49866b8f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java
index d56d86c..827cafe 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java
@@ -50,7 +50,10 @@ public class Constants {
 
   public static final String TEZ_RUNTIME_JOB_CREDENTIALS =
       "tez.runtime.job.credentials";
-  
+
+  /**
+   * Parameter used to specify the memory available to runtime components, for 
writing unit tests.
+   */
   @Private
   public static final String TEZ_RUNTIME_TASK_MEMORY =
       "tez.runtime.task.memory";

http://git-wip-us.apache.org/repos/asf/tez/blob/49866b8f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index d8be8dd..47df8f2 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -69,7 +69,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
   private final FetchedInputAllocatorOrderedGrouped allocator;
   private final ShuffleScheduler scheduler;
   private final ShuffleClientMetrics metrics;
-  private final Shuffle shuffle;
+  private final ExceptionReporter exceptionReporter;
   private final int id;
   private final String logIdentifier;
   private final String localShuffleHostPort;
@@ -103,7 +103,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
                                ShuffleScheduler scheduler,
                                FetchedInputAllocatorOrderedGrouped allocator,
                                ShuffleClientMetrics metrics,
-                               Shuffle shuffle, JobTokenSecretManager 
jobTokenSecretMgr,
+                               ExceptionReporter exceptionReporter, 
JobTokenSecretManager jobTokenSecretMgr,
                                boolean ifileReadAhead, int 
ifileReadAheadLength,
                                CompressionCodec codec,
                                Configuration conf,
@@ -122,7 +122,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
     this.scheduler = scheduler;
     this.allocator = allocator;
     this.metrics = metrics;
-    this.shuffle = shuffle;
+    this.exceptionReporter = exceptionReporter;
     this.mapHost = mapHost;
     this.currentPartition = this.mapHost.getPartitionId();
     this.id = nextId.incrementAndGet();
@@ -182,7 +182,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
       Thread.currentThread().interrupt();
       return null;
     } catch (Throwable t) {
-      shuffle.reportException(t);
+      exceptionReporter.reportException(t);
       // Shuffle knows how to deal with failures post shutdown via the 
onFailure hook
     }
     return null;
@@ -229,7 +229,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
     retryStartTime = 0;
     // Get completed maps on 'host'
     List<InputAttemptIdentifier> srcAttempts = scheduler.getMapsForHost(host);
-    // Sanity check to catch hosts with only 'OBSOLETE' maps, 
+    // Sanity check to catch hosts with only 'OBSOLETE' maps,
     // especially at the tail of large jobs
     if (srcAttempts.size() == 0) {
       return;
@@ -377,7 +377,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
     InputAttemptIdentifier srcAttemptId = null;
     long decompressedLength = -1;
     long compressedLength = -1;
-    
+
     try {
       long startTime = System.currentTimeMillis();
       int forReduce = -1;

http://git-wip-us.apache.org/repos/asf/tez/blob/49866b8f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index 0536bc0..0a44a6b 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -27,6 +27,8 @@ import java.util.Set;
 import java.util.TreeSet;
 
 import com.google.common.annotations.VisibleForTesting;
+
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.base.Preconditions;
@@ -107,7 +109,9 @@ public class MergeManager implements 
FetchedInputAllocatorOrderedGrouped {
   private long commitMemory;
   private final int ioSortFactor;
   private final long maxSingleShuffleLimit;
-  
+
+  private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+
   private final int memToMemMergeOutputsThreshold; 
   private final long mergeThreshold;
   
@@ -526,33 +530,43 @@ public class MergeManager implements 
FetchedInputAllocatorOrderedGrouped {
   }
 
   public TezRawKeyValueIterator close() throws Throwable {
-    // Wait for on-going merges to complete
-    if (memToMemMerger != null) {
-      memToMemMerger.close();
-    }
-    inMemoryMerger.close();
-    onDiskMerger.close();
-
-    List<MapOutput> memory =
-      new ArrayList<MapOutput>(inMemoryMergedMapOutputs);
-    inMemoryMergedMapOutputs.clear();
-    memory.addAll(inMemoryMapOutputs);
-    inMemoryMapOutputs.clear();
-    List<FileChunk> disk = new ArrayList<FileChunk>(onDiskMapOutputs);
-    onDiskMapOutputs.clear();
-    try {
-      TezRawKeyValueIterator kvIter = finalMerge(conf, rfs, memory, disk);
-      this.finalMergeComplete = true;
-      return kvIter;
-    } catch(InterruptedException e) {
-      //Cleanup the disk segments
-      if (cleanup) {
-        cleanup(localFS, disk);
-        cleanup(localFS, onDiskMapOutputs);
+    // TODO TEZ-2756. Don't attempt a final merge if close is invoked as a 
result of a previous
+    // shuffle exception / error.
+    if (!isShutdown.getAndSet(true)) {
+      // Wait for on-going merges to complete
+      if (memToMemMerger != null) {
+        memToMemMerger.close();
+      }
+      inMemoryMerger.close();
+      onDiskMerger.close();
+
+      List<MapOutput> memory =
+          new ArrayList<MapOutput>(inMemoryMergedMapOutputs);
+      inMemoryMergedMapOutputs.clear();
+      memory.addAll(inMemoryMapOutputs);
+      inMemoryMapOutputs.clear();
+      List<FileChunk> disk = new ArrayList<FileChunk>(onDiskMapOutputs);
+      onDiskMapOutputs.clear();
+      try {
+        TezRawKeyValueIterator kvIter = finalMerge(conf, rfs, memory, disk);
+        this.finalMergeComplete = true;
+        return kvIter;
+      } catch (InterruptedException e) {
+        //Cleanup the disk segments
+        if (cleanup) {
+          cleanup(localFS, disk);
+          cleanup(localFS, onDiskMapOutputs);
+        }
+        Thread.currentThread().interrupt(); //reset interrupt status
+        throw e;
       }
-      Thread.currentThread().interrupt(); //reset interrupt status
-      throw e;
     }
+    return null;
+  }
+
+  @VisibleForTesting
+  public boolean isShutdown() {
+    return isShutdown.get();
   }
 
 

http://git-wip-us.apache.org/repos/asf/tez/blob/49866b8f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
index 20e7f5b..18c8302 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 
+import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -76,8 +77,10 @@ public class Shuffle implements ExceptionReporter {
   private final InputContext inputContext;
   
   private final ShuffleInputEventHandlerOrderedGrouped eventHandler;
-  private final ShuffleScheduler scheduler;
-  private final MergeManager merger;
+  @VisibleForTesting
+  final ShuffleScheduler scheduler;
+  @VisibleForTesting
+  final MergeManager merger;
 
   private final CompressionCodec codec;
   private final boolean ifileReadAhead;
@@ -290,11 +293,19 @@ public class Shuffle implements ExceptionReporter {
           throw new ShuffleError("Error during shuffle", e);
         }
       }
+      // The ShuffleScheduler may have exited cleanly as a result of a 
shutdown invocation
+      // triggered by a previously reportedException. Check before proceeding 
further.s
+      synchronized (Shuffle.this) {
+        if (throwable.get() != null) {
+          throw new ShuffleError("error in shuffle in " + throwingThreadName,
+              throwable.get());
+        }
+      }
 
       shufflePhaseTime.setValue(System.currentTimeMillis() - startTime);
 
       // stop the scheduler
-      cleanupShuffleScheduler(false);
+      cleanupShuffleScheduler();
 
       // Finish the on-going merges...
       TezRawKeyValueIterator kvIter = null;
@@ -321,20 +332,18 @@ public class Shuffle implements ExceptionReporter {
     }
   }
 
-  private void cleanupShuffleScheduler(boolean ignoreErrors) throws 
InterruptedException {
+  private void cleanupShuffleSchedulerIgnoreErrors() {
+    try {
+      cleanupShuffleScheduler();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOG.info("Interrupted while attempting to close the scheduler during 
cleanup. Ignoring");
+    }
+  }
 
+  private void cleanupShuffleScheduler() throws InterruptedException {
     if (!schedulerClosed.getAndSet(true)) {
-      try {
-        scheduler.close();
-      } catch (InterruptedException e) {
-        if (ignoreErrors) {
-          //Reset the status
-          Thread.currentThread().interrupt();
-          LOG.info("Interrupted while attempting to close the scheduler during 
cleanup. Ignoring");
-        } else {
-          throw e;
-        }
-      }
+      scheduler.close();
     }
   }
 
@@ -362,7 +371,7 @@ public class Shuffle implements ExceptionReporter {
 
   private void cleanupIgnoreErrors() {
     try {
-      cleanupShuffleScheduler(true);
+      cleanupShuffleSchedulerIgnoreErrors();
       cleanupMerger(true);
     } catch (Throwable t) {
       LOG.info("Error in cleaning up.., ", t);
@@ -370,16 +379,17 @@ public class Shuffle implements ExceptionReporter {
   }
 
   @Private
+  @Override
   public synchronized void reportException(Throwable t) {
     // RunShuffleCallable onFailure deals with ignoring errors on shutdown.
     if (throwable.get() == null) {
+      LOG.info("Setting throwable in reportException with message [" + 
t.getMessage() +
+          "] from thread [" + Thread.currentThread().getName());
       throwable.set(t);
       throwingThreadName = Thread.currentThread().getName();
       // Notify the scheduler so that the reporting thread finds the 
       // exception immediately.
-      synchronized (scheduler) {
-        scheduler.notifyAll();
-      }
+      cleanupShuffleSchedulerIgnoreErrors();
     }
   }
   

http://git-wip-us.apache.org/repos/asf/tez/blob/49866b8f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 281844f..26464bb 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -147,7 +147,7 @@ class ShuffleScheduler {
   private final HttpConnectionParams httpConnectionParams;
   private final FetchedInputAllocatorOrderedGrouped allocator;
   private final ShuffleClientMetrics shuffleMetrics;
-  private final Shuffle shuffle;
+  private final ExceptionReporter exceptionReporter;
   private final MergeManager mergeManager;
   private final JobTokenSecretManager jobTokenSecretManager;
   private final boolean ifileReadAhead;
@@ -173,13 +173,15 @@ class ShuffleScheduler {
   private final int abortFailureLimit;
   private int maxMapRuntime = 0;
 
+  private volatile Thread shuffleSchedulerThread = null;
+
   private long totalBytesShuffledTillNow = 0;
   private DecimalFormat  mbpsFormat = new DecimalFormat("0.00");
 
   public ShuffleScheduler(InputContext inputContext,
                           Configuration conf,
                           int numberOfInputs,
-                          Shuffle shuffle,
+                          ExceptionReporter exceptionReporter,
                           MergeManager mergeManager,
                           FetchedInputAllocatorOrderedGrouped allocator,
                           long startTime,
@@ -189,7 +191,7 @@ class ShuffleScheduler {
                           String srcNameTrimmed) throws IOException {
     this.inputContext = inputContext;
     this.conf = conf;
-    this.shuffle = shuffle;
+    this.exceptionReporter = exceptionReporter;
     this.allocator = allocator;
     this.mergeManager = mergeManager;
     this.numInputs = numberOfInputs;
@@ -295,6 +297,7 @@ class ShuffleScheduler {
   }
 
   public void start() throws Exception {
+    shuffleSchedulerThread = Thread.currentThread();
     ShuffleSchedulerCallable schedulerCallable = new 
ShuffleSchedulerCallable();
     schedulerCallable.call();
   }
@@ -302,10 +305,18 @@ class ShuffleScheduler {
   public void close() throws InterruptedException {
     if (!isShutdown.getAndSet(true)) {
 
-      // Interrupt the waiting Scheduler thread.
+      // Notify and interrupt the waiting scheduler thread
       synchronized (this) {
         notifyAll();
       }
+      // Interrupt the ShuffleScheduler thread only if the close is invoked by 
another thread.
+      // If this is invoked on the same thread, then the shuffleRunner has 
already complete, and there's
+      // no point interrupting it.
+      // The interrupt is needed to unblock any merges or waits which may be 
happening, so that the thread can
+      // exit.
+      if (shuffleSchedulerThread != null && 
!Thread.currentThread().equals(shuffleSchedulerThread)) {
+        shuffleSchedulerThread.interrupt();
+      }
 
       // Interrupt the fetchers.
       for (FetcherOrderedGrouped fetcher : runningFetchers) {
@@ -318,6 +329,11 @@ class ShuffleScheduler {
     }
   }
 
+  @VisibleForTesting
+  public boolean isShutdown() {
+    return isShutdown.get();
+  }
+
   protected synchronized  void updateEventReceivedTime() {
     long relativeTime = System.currentTimeMillis() - startTime;
     if (firstEventReceived.getValue() == 0) {
@@ -503,7 +519,7 @@ class ShuffleScheduler {
   @VisibleForTesting
   void reportExceptionForInput(Exception exception) {
     LOG.error("Reporting exception for input", exception);
-    shuffle.reportException(exception);
+    exceptionReporter.reportException(exception);
   }
 
   private void logProgress() {
@@ -554,7 +570,7 @@ class ShuffleScheduler {
                 srcAttempt.getAttemptNumber()));
       ioe.fillInStackTrace();
       // Shuffle knows how to deal with failures post shutdown via the 
onFailure hook
-      shuffle.reportException(ioe);
+      exceptionReporter.reportException(ioe);
     }
 
     failedShuffleCounter.increment(1);
@@ -571,7 +587,7 @@ class ShuffleScheduler {
   public void reportLocalError(IOException ioe) {
     LOG.error("Shuffle failed : caused by local error", ioe);
     // Shuffle knows how to deal with failures post shutdown via the onFailure 
hook
-    shuffle.reportException(ioe);
+    exceptionReporter.reportException(ioe);
   }
 
   // Notify the AM  
@@ -645,7 +661,7 @@ class ShuffleScheduler {
           + reducerProgressedEnough + ", reducerStalled=" + reducerStalled);
       String errorMsg = "Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.";
       // Shuffle knows how to deal with failures post shutdown via the 
onFailure hook
-      shuffle.reportException(new IOException(errorMsg));
+      exceptionReporter.reportException(new IOException(errorMsg));
     }
 
   }
@@ -688,7 +704,7 @@ class ShuffleScheduler {
     if (shuffleInfoEventsMap.containsKey(srcAttempt.getInputIdentifier())) {
       //Pipelined shuffle case (where shuffleInfoEventsMap gets populated).
       //Fail fast here.
-      shuffle.reportException(new IOException(srcAttempt + " is marked as 
obsoleteInput, but it "
+      exceptionReporter.reportException(new IOException(srcAttempt + " is 
marked as obsoleteInput, but it "
           + "exists in shuffleInfoEventMap. Some data could have been already 
merged "
           + "to memory/disk outputs.  Failing the fetch early."));
       return;
@@ -902,7 +918,7 @@ class ShuffleScheduler {
         // This handles shutdown of the entire fetch / merge process.
       } catch (Throwable t) {
         // Shuffle knows how to deal with failures post shutdown via the 
onFailure hook
-        shuffle.reportException(t);
+        exceptionReporter.reportException(t);
       }
     }
   }
@@ -1019,7 +1035,7 @@ class ShuffleScheduler {
   @VisibleForTesting
   FetcherOrderedGrouped constructFetcherForHost(MapHost mapHost) {
     return new FetcherOrderedGrouped(httpConnectionParams, 
ShuffleScheduler.this, allocator,
-        shuffleMetrics, shuffle, jobTokenSecretManager, ifileReadAhead, 
ifileReadAheadLength,
+        shuffleMetrics, exceptionReporter, jobTokenSecretManager, 
ifileReadAhead, ifileReadAheadLength,
         codec, conf, localDiskFetchEnabled, localHostname, shufflePort, 
srcNameTrimmed, mapHost,
         ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, 
wrongMapErrsCounter,
         connectionErrsCounter, wrongReduceErrsCounter, asyncHttp);
@@ -1060,7 +1076,7 @@ class ShuffleScheduler {
         LOG.info("Already shutdown. Ignoring fetch complete");
       } else {
         LOG.error("Fetcher failed with error", t);
-        shuffle.reportException(t);
+        exceptionReporter.reportException(t);
         doBookKeepingForFetcherComplete();
       }
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/49866b8f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
index 7399359..7d887de 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.tez.runtime.library.api.IOInterruptedException;
+import org.apache.tez.runtime.library.common.Constants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -361,6 +362,7 @@ public class OrderedGroupedKVInput extends 
AbstractLogicalInput {
     confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH);
     confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS);
     
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT);
+    confKeys.add(Constants.TEZ_RUNTIME_TASK_MEMORY);
   }
 
   // TODO Maybe add helper methods to extract keys

http://git-wip-us.apache.org/repos/asf/tez/blob/49866b8f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
index 1016263..271eed3 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
@@ -24,6 +24,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.tez.runtime.library.common.Constants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -269,6 +270,7 @@ public class UnorderedKVInput extends AbstractLogicalInput {
     confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH);
     confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS);
     
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT);
+    confKeys.add(Constants.TEZ_RUNTIME_TASK_MEMORY);
   }
 
   // TODO Maybe add helper methods to extract keys

http://git-wip-us.apache.org/repos/asf/tez/blob/49866b8f/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java
new file mode 100644
index 0000000..28f813c
--- /dev/null
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.runtime.api.ExecutionContext;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
+import org.apache.tez.runtime.library.common.Constants;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestShuffle {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestShuffle.class);
+
+  @Test(timeout = 10000)
+  public void testSchedulerTerminatesOnException() throws IOException, 
InterruptedException {
+
+    InputContext inputContext = createTezInputContext();
+    TezConfiguration conf = new TezConfiguration();
+    conf.setLong(Constants.TEZ_RUNTIME_TASK_MEMORY, 300000l);
+    Shuffle shuffle = new Shuffle(inputContext, conf, 1, 3000000l);
+    try {
+      shuffle.run();
+      ShuffleScheduler scheduler = shuffle.scheduler;
+      MergeManager mergeManager = shuffle.merger;
+      assertFalse(scheduler.isShutdown());
+      assertFalse(mergeManager.isShutdown());
+
+      String exceptionMessage = "Simulating fetch failure";
+      shuffle.reportException(new IOException(exceptionMessage));
+
+      while (!scheduler.isShutdown()) {
+        Thread.sleep(100l);
+      }
+      assertTrue(scheduler.isShutdown());
+
+      while (!mergeManager.isShutdown()) {
+        Thread.sleep(100l);
+      }
+      assertTrue(mergeManager.isShutdown());
+
+      ArgumentCaptor<Throwable> throwableArgumentCaptor = 
ArgumentCaptor.forClass(Throwable.class);
+      ArgumentCaptor<String> stringArgumentCaptor = 
ArgumentCaptor.forClass(String.class);
+      verify(inputContext, 
times(1)).fatalError(throwableArgumentCaptor.capture(),
+          stringArgumentCaptor.capture());
+
+      Throwable t = throwableArgumentCaptor.getValue();
+      assertTrue(t.getCause().getMessage().contains(exceptionMessage));
+
+    } finally {
+      shuffle.shutdown();
+    }
+
+
+  }
+
+
+  private InputContext createTezInputContext() throws IOException {
+    ApplicationId applicationId = ApplicationId.newInstance(1, 1);
+    InputContext inputContext = mock(InputContext.class);
+    doReturn(applicationId).when(inputContext).getApplicationId();
+    doReturn("sourceVertex").when(inputContext).getSourceVertexName();
+    when(inputContext.getCounters()).thenReturn(new TezCounters());
+    ExecutionContext executionContext = new ExecutionContextImpl("localhost");
+    doReturn(executionContext).when(inputContext).getExecutionContext();
+    ByteBuffer shuffleBuffer = ByteBuffer.allocate(4).putInt(0, 4);
+    
doReturn(shuffleBuffer).when(inputContext).getServiceProviderMetaData(anyString());
+    Token<JobTokenIdentifier>
+        sessionToken = new Token<JobTokenIdentifier>(new 
JobTokenIdentifier(new Text("text")),
+        new JobTokenSecretManager());
+    ByteBuffer tokenBuffer = TezCommonUtils.serializeServiceData(sessionToken);
+    
doReturn(tokenBuffer).when(inputContext).getServiceConsumerMetaData(anyString());
+    return inputContext;
+  }
+}

Reply via email to