Repository: tez Updated Branches: refs/heads/master 50df8651f -> 49866b8fc
TEZ-2750. Shuffle may not shutdown in case of a fetch failure, causing it to hang. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/49866b8f Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/49866b8f Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/49866b8f Branch: refs/heads/master Commit: 49866b8fca8b71be9d1edb011599177d610bc0c3 Parents: 50df865 Author: Siddharth Seth <[email protected]> Authored: Tue Sep 1 16:29:22 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Tue Sep 1 16:29:22 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/tez/http/HttpConnection.java | 2 +- .../tez/runtime/library/common/Constants.java | 5 +- .../orderedgrouped/FetcherOrderedGrouped.java | 12 +- .../shuffle/orderedgrouped/MergeManager.java | 66 ++++++----- .../common/shuffle/orderedgrouped/Shuffle.java | 48 ++++---- .../orderedgrouped/ShuffleScheduler.java | 40 +++++-- .../library/input/OrderedGroupedKVInput.java | 2 + .../runtime/library/input/UnorderedKVInput.java | 2 + .../shuffle/orderedgrouped/TestShuffle.java | 110 +++++++++++++++++++ 10 files changed, 223 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/49866b8f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 3c7d0bd..e922ae1 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ Release 0.8.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2750. Shuffle may not shutdown in case of a fetch failure, causing it to hang. TEZ-2294. Add tez-site-template.xml with description of config properties. TEZ-2757. Fix download links for Tez releases. TEZ-2742. VertexImpl.finished() terminationCause hides member var of the http://git-wip-us.apache.org/repos/asf/tez/blob/49866b8f/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java b/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java index 4732354..841e542 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java @@ -277,7 +277,7 @@ public class HttpConnection extends BaseHttpConnection { } if (connection != null && (disconnect || !httpConnParams.isKeepAlive())) { if (LOG.isDebugEnabled()) { - LOG.debug("Closing connection on " + logIdentifier); + LOG.debug("Closing connection on " + logIdentifier + ", disconnectParam=" + disconnect); } connection.disconnect(); connection = null; http://git-wip-us.apache.org/repos/asf/tez/blob/49866b8f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java index d56d86c..827cafe 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java @@ -50,7 +50,10 @@ public class Constants { public static final String TEZ_RUNTIME_JOB_CREDENTIALS = "tez.runtime.job.credentials"; - + + /** + * Parameter used to specify the memory available to runtime components, for writing unit tests. + */ @Private public static final String TEZ_RUNTIME_TASK_MEMORY = "tez.runtime.task.memory"; http://git-wip-us.apache.org/repos/asf/tez/blob/49866b8f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java index d8be8dd..47df8f2 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java @@ -69,7 +69,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { private final FetchedInputAllocatorOrderedGrouped allocator; private final ShuffleScheduler scheduler; private final ShuffleClientMetrics metrics; - private final Shuffle shuffle; + private final ExceptionReporter exceptionReporter; private final int id; private final String logIdentifier; private final String localShuffleHostPort; @@ -103,7 +103,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { ShuffleScheduler scheduler, FetchedInputAllocatorOrderedGrouped allocator, ShuffleClientMetrics metrics, - Shuffle shuffle, JobTokenSecretManager jobTokenSecretMgr, + ExceptionReporter exceptionReporter, JobTokenSecretManager jobTokenSecretMgr, boolean ifileReadAhead, int ifileReadAheadLength, CompressionCodec codec, Configuration conf, @@ -122,7 +122,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { this.scheduler = scheduler; this.allocator = allocator; this.metrics = metrics; - this.shuffle = shuffle; + this.exceptionReporter = exceptionReporter; this.mapHost = mapHost; this.currentPartition = this.mapHost.getPartitionId(); this.id = nextId.incrementAndGet(); @@ -182,7 +182,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { Thread.currentThread().interrupt(); return null; } catch (Throwable t) { - shuffle.reportException(t); + exceptionReporter.reportException(t); // Shuffle knows how to deal with failures post shutdown via the onFailure hook } return null; @@ -229,7 +229,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { retryStartTime = 0; // Get completed maps on 'host' List<InputAttemptIdentifier> srcAttempts = scheduler.getMapsForHost(host); - // Sanity check to catch hosts with only 'OBSOLETE' maps, + // Sanity check to catch hosts with only 'OBSOLETE' maps, // especially at the tail of large jobs if (srcAttempts.size() == 0) { return; @@ -377,7 +377,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { InputAttemptIdentifier srcAttemptId = null; long decompressedLength = -1; long compressedLength = -1; - + try { long startTime = System.currentTimeMillis(); int forReduce = -1; http://git-wip-us.apache.org/repos/asf/tez/blob/49866b8f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java index 0536bc0..0a44a6b 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java @@ -27,6 +27,8 @@ import java.util.Set; import java.util.TreeSet; import com.google.common.annotations.VisibleForTesting; + +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.base.Preconditions; @@ -107,7 +109,9 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { private long commitMemory; private final int ioSortFactor; private final long maxSingleShuffleLimit; - + + private final AtomicBoolean isShutdown = new AtomicBoolean(false); + private final int memToMemMergeOutputsThreshold; private final long mergeThreshold; @@ -526,33 +530,43 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { } public TezRawKeyValueIterator close() throws Throwable { - // Wait for on-going merges to complete - if (memToMemMerger != null) { - memToMemMerger.close(); - } - inMemoryMerger.close(); - onDiskMerger.close(); - - List<MapOutput> memory = - new ArrayList<MapOutput>(inMemoryMergedMapOutputs); - inMemoryMergedMapOutputs.clear(); - memory.addAll(inMemoryMapOutputs); - inMemoryMapOutputs.clear(); - List<FileChunk> disk = new ArrayList<FileChunk>(onDiskMapOutputs); - onDiskMapOutputs.clear(); - try { - TezRawKeyValueIterator kvIter = finalMerge(conf, rfs, memory, disk); - this.finalMergeComplete = true; - return kvIter; - } catch(InterruptedException e) { - //Cleanup the disk segments - if (cleanup) { - cleanup(localFS, disk); - cleanup(localFS, onDiskMapOutputs); + // TODO TEZ-2756. Don't attempt a final merge if close is invoked as a result of a previous + // shuffle exception / error. + if (!isShutdown.getAndSet(true)) { + // Wait for on-going merges to complete + if (memToMemMerger != null) { + memToMemMerger.close(); + } + inMemoryMerger.close(); + onDiskMerger.close(); + + List<MapOutput> memory = + new ArrayList<MapOutput>(inMemoryMergedMapOutputs); + inMemoryMergedMapOutputs.clear(); + memory.addAll(inMemoryMapOutputs); + inMemoryMapOutputs.clear(); + List<FileChunk> disk = new ArrayList<FileChunk>(onDiskMapOutputs); + onDiskMapOutputs.clear(); + try { + TezRawKeyValueIterator kvIter = finalMerge(conf, rfs, memory, disk); + this.finalMergeComplete = true; + return kvIter; + } catch (InterruptedException e) { + //Cleanup the disk segments + if (cleanup) { + cleanup(localFS, disk); + cleanup(localFS, onDiskMapOutputs); + } + Thread.currentThread().interrupt(); //reset interrupt status + throw e; } - Thread.currentThread().interrupt(); //reset interrupt status - throw e; } + return null; + } + + @VisibleForTesting + public boolean isShutdown() { + return isShutdown.get(); } http://git-wip-us.apache.org/repos/asf/tez/blob/49866b8f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java index 20e7f5b..18c8302 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -76,8 +77,10 @@ public class Shuffle implements ExceptionReporter { private final InputContext inputContext; private final ShuffleInputEventHandlerOrderedGrouped eventHandler; - private final ShuffleScheduler scheduler; - private final MergeManager merger; + @VisibleForTesting + final ShuffleScheduler scheduler; + @VisibleForTesting + final MergeManager merger; private final CompressionCodec codec; private final boolean ifileReadAhead; @@ -290,11 +293,19 @@ public class Shuffle implements ExceptionReporter { throw new ShuffleError("Error during shuffle", e); } } + // The ShuffleScheduler may have exited cleanly as a result of a shutdown invocation + // triggered by a previously reportedException. Check before proceeding further.s + synchronized (Shuffle.this) { + if (throwable.get() != null) { + throw new ShuffleError("error in shuffle in " + throwingThreadName, + throwable.get()); + } + } shufflePhaseTime.setValue(System.currentTimeMillis() - startTime); // stop the scheduler - cleanupShuffleScheduler(false); + cleanupShuffleScheduler(); // Finish the on-going merges... TezRawKeyValueIterator kvIter = null; @@ -321,20 +332,18 @@ public class Shuffle implements ExceptionReporter { } } - private void cleanupShuffleScheduler(boolean ignoreErrors) throws InterruptedException { + private void cleanupShuffleSchedulerIgnoreErrors() { + try { + cleanupShuffleScheduler(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.info("Interrupted while attempting to close the scheduler during cleanup. Ignoring"); + } + } + private void cleanupShuffleScheduler() throws InterruptedException { if (!schedulerClosed.getAndSet(true)) { - try { - scheduler.close(); - } catch (InterruptedException e) { - if (ignoreErrors) { - //Reset the status - Thread.currentThread().interrupt(); - LOG.info("Interrupted while attempting to close the scheduler during cleanup. Ignoring"); - } else { - throw e; - } - } + scheduler.close(); } } @@ -362,7 +371,7 @@ public class Shuffle implements ExceptionReporter { private void cleanupIgnoreErrors() { try { - cleanupShuffleScheduler(true); + cleanupShuffleSchedulerIgnoreErrors(); cleanupMerger(true); } catch (Throwable t) { LOG.info("Error in cleaning up.., ", t); @@ -370,16 +379,17 @@ public class Shuffle implements ExceptionReporter { } @Private + @Override public synchronized void reportException(Throwable t) { // RunShuffleCallable onFailure deals with ignoring errors on shutdown. if (throwable.get() == null) { + LOG.info("Setting throwable in reportException with message [" + t.getMessage() + + "] from thread [" + Thread.currentThread().getName()); throwable.set(t); throwingThreadName = Thread.currentThread().getName(); // Notify the scheduler so that the reporting thread finds the // exception immediately. - synchronized (scheduler) { - scheduler.notifyAll(); - } + cleanupShuffleSchedulerIgnoreErrors(); } } http://git-wip-us.apache.org/repos/asf/tez/blob/49866b8f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index 281844f..26464bb 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -147,7 +147,7 @@ class ShuffleScheduler { private final HttpConnectionParams httpConnectionParams; private final FetchedInputAllocatorOrderedGrouped allocator; private final ShuffleClientMetrics shuffleMetrics; - private final Shuffle shuffle; + private final ExceptionReporter exceptionReporter; private final MergeManager mergeManager; private final JobTokenSecretManager jobTokenSecretManager; private final boolean ifileReadAhead; @@ -173,13 +173,15 @@ class ShuffleScheduler { private final int abortFailureLimit; private int maxMapRuntime = 0; + private volatile Thread shuffleSchedulerThread = null; + private long totalBytesShuffledTillNow = 0; private DecimalFormat mbpsFormat = new DecimalFormat("0.00"); public ShuffleScheduler(InputContext inputContext, Configuration conf, int numberOfInputs, - Shuffle shuffle, + ExceptionReporter exceptionReporter, MergeManager mergeManager, FetchedInputAllocatorOrderedGrouped allocator, long startTime, @@ -189,7 +191,7 @@ class ShuffleScheduler { String srcNameTrimmed) throws IOException { this.inputContext = inputContext; this.conf = conf; - this.shuffle = shuffle; + this.exceptionReporter = exceptionReporter; this.allocator = allocator; this.mergeManager = mergeManager; this.numInputs = numberOfInputs; @@ -295,6 +297,7 @@ class ShuffleScheduler { } public void start() throws Exception { + shuffleSchedulerThread = Thread.currentThread(); ShuffleSchedulerCallable schedulerCallable = new ShuffleSchedulerCallable(); schedulerCallable.call(); } @@ -302,10 +305,18 @@ class ShuffleScheduler { public void close() throws InterruptedException { if (!isShutdown.getAndSet(true)) { - // Interrupt the waiting Scheduler thread. + // Notify and interrupt the waiting scheduler thread synchronized (this) { notifyAll(); } + // Interrupt the ShuffleScheduler thread only if the close is invoked by another thread. + // If this is invoked on the same thread, then the shuffleRunner has already complete, and there's + // no point interrupting it. + // The interrupt is needed to unblock any merges or waits which may be happening, so that the thread can + // exit. + if (shuffleSchedulerThread != null && !Thread.currentThread().equals(shuffleSchedulerThread)) { + shuffleSchedulerThread.interrupt(); + } // Interrupt the fetchers. for (FetcherOrderedGrouped fetcher : runningFetchers) { @@ -318,6 +329,11 @@ class ShuffleScheduler { } } + @VisibleForTesting + public boolean isShutdown() { + return isShutdown.get(); + } + protected synchronized void updateEventReceivedTime() { long relativeTime = System.currentTimeMillis() - startTime; if (firstEventReceived.getValue() == 0) { @@ -503,7 +519,7 @@ class ShuffleScheduler { @VisibleForTesting void reportExceptionForInput(Exception exception) { LOG.error("Reporting exception for input", exception); - shuffle.reportException(exception); + exceptionReporter.reportException(exception); } private void logProgress() { @@ -554,7 +570,7 @@ class ShuffleScheduler { srcAttempt.getAttemptNumber())); ioe.fillInStackTrace(); // Shuffle knows how to deal with failures post shutdown via the onFailure hook - shuffle.reportException(ioe); + exceptionReporter.reportException(ioe); } failedShuffleCounter.increment(1); @@ -571,7 +587,7 @@ class ShuffleScheduler { public void reportLocalError(IOException ioe) { LOG.error("Shuffle failed : caused by local error", ioe); // Shuffle knows how to deal with failures post shutdown via the onFailure hook - shuffle.reportException(ioe); + exceptionReporter.reportException(ioe); } // Notify the AM @@ -645,7 +661,7 @@ class ShuffleScheduler { + reducerProgressedEnough + ", reducerStalled=" + reducerStalled); String errorMsg = "Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out."; // Shuffle knows how to deal with failures post shutdown via the onFailure hook - shuffle.reportException(new IOException(errorMsg)); + exceptionReporter.reportException(new IOException(errorMsg)); } } @@ -688,7 +704,7 @@ class ShuffleScheduler { if (shuffleInfoEventsMap.containsKey(srcAttempt.getInputIdentifier())) { //Pipelined shuffle case (where shuffleInfoEventsMap gets populated). //Fail fast here. - shuffle.reportException(new IOException(srcAttempt + " is marked as obsoleteInput, but it " + exceptionReporter.reportException(new IOException(srcAttempt + " is marked as obsoleteInput, but it " + "exists in shuffleInfoEventMap. Some data could have been already merged " + "to memory/disk outputs. Failing the fetch early.")); return; @@ -902,7 +918,7 @@ class ShuffleScheduler { // This handles shutdown of the entire fetch / merge process. } catch (Throwable t) { // Shuffle knows how to deal with failures post shutdown via the onFailure hook - shuffle.reportException(t); + exceptionReporter.reportException(t); } } } @@ -1019,7 +1035,7 @@ class ShuffleScheduler { @VisibleForTesting FetcherOrderedGrouped constructFetcherForHost(MapHost mapHost) { return new FetcherOrderedGrouped(httpConnectionParams, ShuffleScheduler.this, allocator, - shuffleMetrics, shuffle, jobTokenSecretManager, ifileReadAhead, ifileReadAheadLength, + shuffleMetrics, exceptionReporter, jobTokenSecretManager, ifileReadAhead, ifileReadAheadLength, codec, conf, localDiskFetchEnabled, localHostname, shufflePort, srcNameTrimmed, mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, asyncHttp); @@ -1060,7 +1076,7 @@ class ShuffleScheduler { LOG.info("Already shutdown. Ignoring fetch complete"); } else { LOG.error("Fetcher failed with error", t); - shuffle.reportException(t); + exceptionReporter.reportException(t); doBookKeepingForFetcherComplete(); } } http://git-wip-us.apache.org/repos/asf/tez/blob/49866b8f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java index 7399359..7d887de 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.annotations.VisibleForTesting; import org.apache.tez.runtime.library.api.IOInterruptedException; +import org.apache.tez.runtime.library.common.Constants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -361,6 +362,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput { confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH); confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT); + confKeys.add(Constants.TEZ_RUNTIME_TASK_MEMORY); } // TODO Maybe add helper methods to extract keys http://git-wip-us.apache.org/repos/asf/tez/blob/49866b8f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java index 1016263..271eed3 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java @@ -24,6 +24,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.tez.runtime.library.common.Constants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -269,6 +270,7 @@ public class UnorderedKVInput extends AbstractLogicalInput { confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH); confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT); + confKeys.add(Constants.TEZ_RUNTIME_TASK_MEMORY); } // TODO Maybe add helper methods to extract keys http://git-wip-us.apache.org/repos/asf/tez/blob/49866b8f/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java new file mode 100644 index 0000000..28f813c --- /dev/null +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java @@ -0,0 +1,110 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.library.common.shuffle.orderedgrouped; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.common.TezCommonUtils; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.common.security.JobTokenIdentifier; +import org.apache.tez.common.security.JobTokenSecretManager; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.runtime.api.ExecutionContext; +import org.apache.tez.runtime.api.InputContext; +import org.apache.tez.runtime.api.impl.ExecutionContextImpl; +import org.apache.tez.runtime.library.common.Constants; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestShuffle { + + private static final Logger LOG = LoggerFactory.getLogger(TestShuffle.class); + + @Test(timeout = 10000) + public void testSchedulerTerminatesOnException() throws IOException, InterruptedException { + + InputContext inputContext = createTezInputContext(); + TezConfiguration conf = new TezConfiguration(); + conf.setLong(Constants.TEZ_RUNTIME_TASK_MEMORY, 300000l); + Shuffle shuffle = new Shuffle(inputContext, conf, 1, 3000000l); + try { + shuffle.run(); + ShuffleScheduler scheduler = shuffle.scheduler; + MergeManager mergeManager = shuffle.merger; + assertFalse(scheduler.isShutdown()); + assertFalse(mergeManager.isShutdown()); + + String exceptionMessage = "Simulating fetch failure"; + shuffle.reportException(new IOException(exceptionMessage)); + + while (!scheduler.isShutdown()) { + Thread.sleep(100l); + } + assertTrue(scheduler.isShutdown()); + + while (!mergeManager.isShutdown()) { + Thread.sleep(100l); + } + assertTrue(mergeManager.isShutdown()); + + ArgumentCaptor<Throwable> throwableArgumentCaptor = ArgumentCaptor.forClass(Throwable.class); + ArgumentCaptor<String> stringArgumentCaptor = ArgumentCaptor.forClass(String.class); + verify(inputContext, times(1)).fatalError(throwableArgumentCaptor.capture(), + stringArgumentCaptor.capture()); + + Throwable t = throwableArgumentCaptor.getValue(); + assertTrue(t.getCause().getMessage().contains(exceptionMessage)); + + } finally { + shuffle.shutdown(); + } + + + } + + + private InputContext createTezInputContext() throws IOException { + ApplicationId applicationId = ApplicationId.newInstance(1, 1); + InputContext inputContext = mock(InputContext.class); + doReturn(applicationId).when(inputContext).getApplicationId(); + doReturn("sourceVertex").when(inputContext).getSourceVertexName(); + when(inputContext.getCounters()).thenReturn(new TezCounters()); + ExecutionContext executionContext = new ExecutionContextImpl("localhost"); + doReturn(executionContext).when(inputContext).getExecutionContext(); + ByteBuffer shuffleBuffer = ByteBuffer.allocate(4).putInt(0, 4); + doReturn(shuffleBuffer).when(inputContext).getServiceProviderMetaData(anyString()); + Token<JobTokenIdentifier> + sessionToken = new Token<JobTokenIdentifier>(new JobTokenIdentifier(new Text("text")), + new JobTokenSecretManager()); + ByteBuffer tokenBuffer = TezCommonUtils.serializeServiceData(sessionToken); + doReturn(tokenBuffer).when(inputContext).getServiceConsumerMetaData(anyString()); + return inputContext; + } +}
