Repository: tez Updated Branches: refs/heads/master 8c44f2484 -> 146ab0702
TEZ-1752. Inputs / Outputs in the Runtime library should be interruptable (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/146ab070 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/146ab070 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/146ab070 Branch: refs/heads/master Commit: 146ab0702a25cda7020de936e270e291ca567e3c Parents: 8c44f24 Author: Rajesh Balamohan <[email protected]> Authored: Wed May 6 03:43:49 2015 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Wed May 6 03:43:49 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../runtime/api/AbstractLogicalIOProcessor.java | 4 + .../api/ProcessorFrameworkInterface.java | 11 +- .../library/api/TezRuntimeConfiguration.java | 11 + .../common/readers/UnorderedKVReader.java | 1 + .../runtime/library/common/shuffle/Fetcher.java | 1 + .../common/shuffle/impl/ShuffleManager.java | 5 + .../orderedgrouped/FetcherOrderedGrouped.java | 7 +- .../shuffle/orderedgrouped/MergeManager.java | 124 +++++++-- .../shuffle/orderedgrouped/MergeThread.java | 18 +- .../common/shuffle/orderedgrouped/Shuffle.java | 16 +- .../orderedgrouped/ShuffleScheduler.java | 1 + .../common/sort/impl/ExternalSorter.java | 38 +++ .../common/sort/impl/PipelinedSorter.java | 261 +++++++++++-------- .../library/common/sort/impl/TezMerger.java | 31 ++- .../common/sort/impl/dflt/DefaultSorter.java | 70 +++-- .../library/input/OrderedGroupedKVInput.java | 1 + .../runtime/library/input/UnorderedKVInput.java | 1 + .../output/OrderedPartitionedKVOutput.java | 1 + .../library/output/UnorderedKVOutput.java | 1 + .../output/UnorderedPartitionedKVOutput.java | 1 + .../library/common/TestValuesIterator.java | 20 +- .../orderedgrouped/TestMergeManager.java | 87 +++++-- .../library/common/sort/impl/TestTezMerger.java | 3 +- 24 files changed, 518 insertions(+), 197 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d7a1e1f..7ba8021 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly ALL CHANGES: + TEZ-1752. Inputs / Outputs in the Runtime library should be interruptable. TEZ-2401. Tez UI: All-dag page has duration keep counting for KILLED dag. TEZ-2392. Have all readers throw an Exception on incorrect next() usage. TEZ-2408. TestTaskAttempt fails to compile against hadoop-2.4 and hadoop-2.2. http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java index 7714321..5a4cbe8 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java @@ -49,4 +49,8 @@ public abstract class AbstractLogicalIOProcessor implements LogicalIOProcessor, return context; } + @Override + public void abort() { + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorFrameworkInterface.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorFrameworkInterface.java b/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorFrameworkInterface.java index f0ba9c9..89d4e3c 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorFrameworkInterface.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorFrameworkInterface.java @@ -18,9 +18,11 @@ package org.apache.tez.runtime.api; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + import java.util.List; -import org.apache.hadoop.classification.InterfaceAudience.Public; /** * Represents the Tez framework part of an {@link org.apache.tez.runtime.api.Processor}. @@ -56,4 +58,11 @@ public interface ProcessorFrameworkInterface { * if an error occurs */ public void close() throws Exception; + + /** + * Indicates <code>Processor</code> to abort. Cleanup can be done. + * + */ + @Unstable + public void abort(); } http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java index a818de8..3d9a701 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java @@ -311,6 +311,16 @@ public class TezRuntimeConfiguration { */ public static final boolean TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH_DEFAULT = false; + /** + * Used only for internal testing. Strictly not recommended to be used elsewhere. This + * parameter could be changed/dropped later. + */ + @Unstable + @Private + public static final String TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT = TEZ_RUNTIME_PREFIX + + "cleanup.files.on.interrupt"; + public static final boolean TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT_DEFAULT = false; + // TODO TEZ-1233 - allow this property to be set per vertex // TODO TEZ-1231 - move these properties out since they are not relevant for Inputs / Outputs @@ -374,6 +384,7 @@ public class TezRuntimeConfiguration { tezRuntimeKeys.add(TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH); tezRuntimeKeys.add(TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT); tezRuntimeKeys.add(TEZ_RUNTIME_SORTER_CLASS); + tezRuntimeKeys.add(TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT); defaultConf.addResource("core-default.xml"); defaultConf.addResource("core-site.xml"); http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java index b14a461..fc2e312 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java @@ -184,6 +184,7 @@ public class UnorderedKVReader<K, V> extends KeyValueReader { currentFetchedInput = shuffleManager.getNextInput(); } catch (InterruptedException e) { LOG.warn("Interrupted while waiting for next available input", e); + Thread.currentThread().interrupt(); throw new IOException(e); } if (currentFetchedInput == null) { http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java index 3154943..48fe0f2 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java @@ -376,6 +376,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { // fall back to HTTP fetch below LOG.warn("Double locking detected for " + host); } catch (InterruptedException sleepInterrupted) { + Thread.currentThread().interrupt(); // fall back to HTTP fetch below LOG.warn("Lock was interrupted for " + host); } finally { http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index 749143a..d47e652 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -748,6 +748,11 @@ public class ShuffleManager implements FetcherCallback { /////////////////// End of Methods from FetcherCallbackHandler public void shutdown() throws InterruptedException { + if (Thread.currentThread().isInterrupted()) { + //TODO: need to cleanup all FetchedInput (DiskFetchedInput, LocalDisFetchedInput), lockFile + //As of now relying on job cleanup (when all directories would be cleared) + LOG.info("Thread interrupted. Need to cleanup the local dirs"); + } if (!isShutdown.getAndSet(true)) { // Shut down any pending fetchers LOG.info("Shutting down pending fetchers on source" + srcNameTrimmed + ": " http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java index 8d20aa7..fbaabff 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java @@ -177,6 +177,9 @@ class FetcherOrderedGrouped extends Thread { } } } catch (InterruptedException ie) { + //TODO: might not be respected when fetcher is in progress / server is busy. TEZ-711 + //Set the status back + Thread.currentThread().interrupt(); return; } catch (Throwable t) { shuffle.reportException(t); @@ -191,7 +194,9 @@ class FetcherOrderedGrouped extends Thread { try { join(5000); } catch (InterruptedException ie) { - LOG.warn("Got interrupt while joining " + getName(), ie); + //Reset the status + Thread.currentThread().interrupt(); + LOG.warn("Got interrupt while joining " + getName()); } } http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java index 2e6ebd9..5a35f2f 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java @@ -19,6 +19,7 @@ package org.apache.tez.runtime.library.common.shuffle.orderedgrouped; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -137,6 +138,8 @@ public class MergeManager { private AtomicInteger mergeFileSequenceId = new AtomicInteger(0); + private final boolean cleanup; + /** * Construct the MergeManager. Must call start before it becomes usable. */ @@ -174,6 +177,9 @@ public class MergeManager { this.additionalBytesWritten = inputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN); this.additionalBytesRead = inputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ); + this.cleanup = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT, + TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT_DEFAULT); + this.codec = codec; this.ifileReadAhead = ifileReadAheadEnabled; if (this.ifileReadAhead) { @@ -514,27 +520,61 @@ public class MergeManager { public boolean isMergeComplete() { return finalMergeComplete; } - + public TezRawKeyValueIterator close() throws Throwable { // Wait for on-going merges to complete - if (memToMemMerger != null) { + if (memToMemMerger != null) { memToMemMerger.close(); } inMemoryMerger.close(); onDiskMerger.close(); - - List<MapOutput> memory = + + List<MapOutput> memory = new ArrayList<MapOutput>(inMemoryMergedMapOutputs); inMemoryMergedMapOutputs.clear(); memory.addAll(inMemoryMapOutputs); inMemoryMapOutputs.clear(); List<FileChunk> disk = new ArrayList<FileChunk>(onDiskMapOutputs); onDiskMapOutputs.clear(); - TezRawKeyValueIterator kvIter = finalMerge(conf, rfs, memory, disk); - this.finalMergeComplete = true; - return kvIter; + try { + TezRawKeyValueIterator kvIter = finalMerge(conf, rfs, memory, disk); + this.finalMergeComplete = true; + return kvIter; + } catch(InterruptedException e) { + //Cleanup the disk segments + if (cleanup) { + cleanup(localFS, disk); + cleanup(localFS, onDiskMapOutputs); + } + Thread.currentThread().interrupt(); //reset interrupt status + throw e; + } + } + + + static void cleanup(FileSystem fs, Collection<FileChunk> fileChunkList) { + for (FileChunk fileChunk : fileChunkList) { + cleanup(fs, fileChunk.getPath()); + } } - + + static void cleanup(FileSystem fs, Path path) { + if (path == null) { + return; + } + + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Deleting " + path); + } + fs.delete(path, true); + } catch (IOException e) { + LOG.info("Error in deleting " + path); + } + } + + + void runCombineProcessor(TezRawKeyValueIterator kvIter, Writer writer) throws IOException, InterruptedException { combiner.combine(kvIter, writer); @@ -555,7 +595,7 @@ public class MergeManager { } @Override - public void merge(List<MapOutput> inputs) throws IOException { + public void merge(List<MapOutput> inputs) throws IOException, InterruptedException { if (inputs == null || inputs.size() == 0) { return; } @@ -597,13 +637,28 @@ public class MergeManager { // Note the output of the merge closeInMemoryMergedFile(mergedMapOutputs); } + + @Override + public void cleanup(List<MapOutput> inputs, boolean deleteData) throws IOException, + InterruptedException { + //No OP + } } /** * Merges multiple in-memory segment to a disk segment */ private class InMemoryMerger extends MergeThread<MapOutput> { - + + @VisibleForTesting + volatile InputAttemptIdentifier srcTaskIdentifier; + + @VisibleForTesting + volatile Path outputPath; + + @VisibleForTesting + volatile Path tmpDir; + public InMemoryMerger(MergeManager manager) { super(manager, Integer.MAX_VALUE, exceptionReporter); setName("MemtoDiskMerger [" + TezUtilsInternal @@ -628,7 +683,7 @@ public class MergeManager { //in the merge method) //figure out the mapId - InputAttemptIdentifier srcTaskIdentifier = inputs.get(0).getAttemptIdentifier(); + srcTaskIdentifier = inputs.get(0).getAttemptIdentifier(); List<Segment> inMemorySegments = new ArrayList<Segment>(); long mergeOutputSize = @@ -639,7 +694,7 @@ public class MergeManager { // All disk writes done by this merge are overhead - due to the lac of // adequate memory to keep all segments in memory. - Path outputPath = mapOutputFile.getInputFileForWrite( + outputPath = mapOutputFile.getInputFileForWrite( srcTaskIdentifier.getInputIdentifier().getInputIndex(), srcTaskIdentifier.getSpillEventId(), mergeOutputSize).suffix(Constants.MERGED_OUTPUT_PREFIX); LOG.info("Patch..InMemoryMerger outputPath: " + outputPath); @@ -657,13 +712,13 @@ public class MergeManager { LOG.info("Initiating in-memory merge with " + noInMemorySegments + " segments..."); + tmpDir = new Path(inputContext.getUniqueIdentifier()); // Nothing actually materialized to disk - controlled by setting sort-factor to #segments. rIter = TezMerger.merge(conf, rfs, (Class)ConfigUtils.getIntermediateInputKeyClass(conf), (Class)ConfigUtils.getIntermediateInputValueClass(conf), inMemorySegments, inMemorySegments.size(), - new Path(inputContext.getUniqueIdentifier()), - (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf), + tmpDir, (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf), nullProgressable, spilledRecordsCounter, null, additionalBytesRead, null); // spilledRecordsCounter is tracking the number of keys that will be // read from each of the segments being merged - which is essentially @@ -700,6 +755,18 @@ public class MergeManager { closeOnDiskFile(new FileChunk(outputPath, 0, outFileLen)); } + @Override + public void cleanup(List<MapOutput> inputs, boolean deleteData) + throws IOException, InterruptedException { + if (deleteData) { + //Additional check at task level + if (cleanup) { + LOG.info("Try deleting stale data"); + MergeManager.cleanup(localFS, outputPath); + MergeManager.cleanup(localFS, tmpDir); + } + } + } } /** @@ -708,6 +775,11 @@ public class MergeManager { @VisibleForTesting class OnDiskMerger extends MergeThread<FileChunk> { + @VisibleForTesting + volatile Path outputPath; + @VisibleForTesting + volatile Path tmpDir; + public OnDiskMerger(MergeManager manager) { super(manager, ioSortFactor, exceptionReporter); setName("DiskToDiskMerger [" + TezUtilsInternal @@ -716,7 +788,7 @@ public class MergeManager { } @Override - public void merge(List<FileChunk> inputs) throws IOException { + public void merge(List<FileChunk> inputs) throws IOException, InterruptedException { // sanity check if (inputs == null || inputs.isEmpty()) { LOG.info("No ondisk files to merge..."); @@ -768,7 +840,7 @@ public class MergeManager { // namePart includes the suffix of the file. We need to remove it. namePart = FilenameUtils.removeExtension(namePart); - Path outputPath = localDirAllocator.getLocalPathForWrite(namePart, approxOutputSize, conf); + outputPath = localDirAllocator.getLocalPathForWrite(namePart, approxOutputSize, conf); outputPath = outputPath.suffix(Constants.MERGED_OUTPUT_PREFIX + mergeFileSequenceId.getAndIncrement()); Writer writer = @@ -776,7 +848,7 @@ public class MergeManager { (Class)ConfigUtils.getIntermediateInputKeyClass(conf), (Class)ConfigUtils.getIntermediateInputValueClass(conf), codec, null, null); - Path tmpDir = new Path(inputContext.getUniqueIdentifier()); + tmpDir = new Path(inputContext.getUniqueIdentifier()); try { TezRawKeyValueIterator iter = TezMerger.merge(conf, rfs, (Class)ConfigUtils.getIntermediateInputKeyClass(conf), @@ -808,6 +880,20 @@ public class MergeManager { " Local output file is " + outputPath + " of size " + outputLen); } + + @Override + public void cleanup(List<FileChunk> inputs, boolean deleteData) throws IOException, + InterruptedException { + if (deleteData) { + //Additional check at task level + if (cleanup) { + LOG.info("Try deleting stale data"); + MergeManager.cleanup(localFS, inputs); + MergeManager.cleanup(localFS, outputPath); + MergeManager.cleanup(localFS, tmpDir); + } + } + } } private long createInMemorySegments(List<MapOutput> inMemoryMapOutputs, @@ -821,7 +907,7 @@ public class MergeManager { for (MapOutput mo : inMemoryMapOutputs) { fullSize += mo.getMemory().length; } - while(fullSize > leaveBytes) { + while((fullSize > leaveBytes) && !Thread.currentThread().isInterrupted()) { MapOutput mo = inMemoryMapOutputs.remove(0); byte[] data = mo.getMemory(); long size = data.length; @@ -878,7 +964,7 @@ public class MergeManager { private TezRawKeyValueIterator finalMerge(Configuration job, FileSystem fs, List<MapOutput> inMemoryMapOutputs, List<FileChunk> onDiskMapOutputs - ) throws IOException { + ) throws IOException, InterruptedException { LOG.info("finalMerge called with " + inMemoryMapOutputs.size() + " in-memory map-outputs and " + onDiskMapOutputs.size() + " on-disk map-outputs"); http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeThread.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeThread.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeThread.java index d4faf51..52b4c5b 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeThread.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeThread.java @@ -46,8 +46,18 @@ abstract class MergeThread<T> extends Thread { public synchronized void close() throws InterruptedException { closed = true; - waitForMerge(); - interrupt(); + if (!Thread.currentThread().isInterrupted()) { + waitForMerge(); + interrupt(); + } else { + try { + interrupt(); + cleanup(inputs, Thread.currentThread().isInterrupted()); + } catch (IOException e) { + //ignore + LOG.warn("Error cleaning up", e); + } + } } public synchronized boolean isInProgress() { @@ -89,6 +99,7 @@ abstract class MergeThread<T> extends Thread { merge(inputs); } catch (InterruptedException ie) { // Meant to handle a shutdown of the entire fetch/merge process + Thread.currentThread().interrupt(); return; } catch(Throwable t) { reporter.reportException(t); @@ -106,4 +117,7 @@ abstract class MergeThread<T> extends Thread { public abstract void merge(List<T> inputs) throws IOException, InterruptedException; + + public abstract void cleanup(List<T> inputs, boolean deleteData) + throws IOException, InterruptedException; } http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java index f98aa3a..442f032 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java @@ -357,6 +357,7 @@ public class Shuffle implements ExceptionReporter { shufflePhaseTime.setValue(System.currentTimeMillis() - startTime); // Stop the map-output fetcher threads + LOG.info("Cleaning up fetchers"); cleanupFetchers(false); // stop the scheduler @@ -393,8 +394,7 @@ public class Shuffle implements ExceptionReporter { for (FetcherOrderedGrouped fetcher : fetchers) { try { fetcher.shutDown(); - LOG.info("Shutdown.." + fetcher.getName() + ", status:" + fetcher.isAlive() + ", " - + "isInterrupted:" + fetcher.isInterrupted()); + LOG.info("Shutdown.." + fetcher.getName()); } catch (InterruptedException e) { if (ignoreErrors) { LOG.info("Interrupted while shutting down fetchers. Ignoring."); @@ -425,6 +425,8 @@ public class Shuffle implements ExceptionReporter { scheduler.close(); } catch (InterruptedException e) { if (ignoreErrors) { + //Reset the status + Thread.currentThread().interrupt(); LOG.info("Interrupted while attempting to close the scheduler during cleanup. Ignoring"); } else { throw e; @@ -437,6 +439,14 @@ public class Shuffle implements ExceptionReporter { if (!mergerClosed.getAndSet(true)) { try { merger.close(); + } catch (InterruptedException e) { + if (ignoreErrors) { + //Reset the status + Thread.currentThread().interrupt(); + LOG.info("Interrupted while attempting to close the merger during cleanup. Ignoring"); + } else { + throw e; + } } catch (Throwable e) { if (ignoreErrors) { LOG.info("Exception while trying to shutdown merger, Ignoring", e); @@ -493,7 +503,7 @@ public class Shuffle implements ExceptionReporter { @Override public void onFailure(Throwable t) { if (isShutDown.get()) { - LOG.info("Already shutdown. Ignoring error: ", t); + LOG.info("Already shutdown. Ignoring error"); } else { LOG.error("ShuffleRunner failed with error", t); inputContext.fatalError(t, "Shuffle Runner Failed"); http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index a3d79ae..c54b005 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -765,6 +765,7 @@ class ShuffleScheduler { } } } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); // This handles shutdown of the entire fetch / merge process. return; } catch (Throwable t) { http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java index c0445c9..ca4d889 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java @@ -115,6 +115,8 @@ public abstract class ExternalSorter { protected Path finalIndexFile; protected int numSpills; + protected final boolean cleanup; + // Counters // MR compatilbity layer needs to rename counters back to what MR requries. @@ -148,6 +150,9 @@ public abstract class ExternalSorter { this.conf = conf; this.partitions = numOutputs; + cleanup = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT, + TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT_DEFAULT); + rfs = ((LocalFileSystem)FileSystem.getLocal(this.conf)).getRaw(); LOG.info("Initial Mem : " + initialMemoryAvailable + ", assignedMb=" + ((initialMemoryAvailable >> 20))); @@ -261,6 +266,7 @@ public abstract class ExternalSorter { try { combiner.combine(kvIter, writer); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new IOException(e); } } @@ -314,4 +320,36 @@ public abstract class ExternalSorter { public int getNumSpills() { return numSpills; } + + protected synchronized void cleanup() throws IOException { + if (!cleanup) { + return; + } + cleanup(spillFilePaths); + cleanup(spillFileIndexPaths); + //TODO: What if when same volume rename happens (have to rely on job completion cleanup) + cleanup(finalOutputFile); + cleanup(finalIndexFile); + } + + protected synchronized void cleanup(Path path) { + if (path == null || !cleanup) { + return; + } + try { + LOG.info("Deleting " + path); + rfs.delete(path, true); + } catch(IOException ioe) { + LOG.warn("Error in deleting " + path); + } + } + + protected synchronized void cleanup(Map<Integer, Path> spillMap) { + if (!cleanup) { + return; + } + for(Map.Entry<Integer, Path> entry : spillMap.entrySet()) { + cleanup(entry.getValue()); + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java index 661f54c..030440e 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java @@ -33,7 +33,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; - import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; @@ -356,6 +355,9 @@ public class PipelinedSorter extends ExternalSorter { merger.ready(); // wait for all the future results from sort threads LOG.info("Spilling to " + filename.toString()); for (int i = 0; i < partitions; ++i) { + if (isThreadInterrupted()) { + return; + } TezRawKeyValueIterator kvIter = merger.filter(i); //write merged output to disk long segmentStart = out.getPos(); @@ -391,147 +393,182 @@ public class PipelinedSorter extends ExternalSorter { ++numSpills; } catch(InterruptedException ie) { // TODO:the combiner has been interrupted + Thread.currentThread().interrupt(); } finally { out.close(); } } + + + + + private boolean isThreadInterrupted() throws IOException { + if (Thread.currentThread().isInterrupted()) { + if (cleanup) { + cleanup(); + } + sortmaster.shutdownNow(); + LOG.info("Thread interrupted, cleaned up stale data, sorter threads shutdown=" + sortmaster + .isShutdown() + ", terminated=" + sortmaster.isTerminated()); + return true; + } + return false; + } + @Override public void flush() throws IOException { final String uniqueIdentifier = outputContext.getUniqueIdentifier(); - LOG.info("Starting flush of map output"); - span.end(); - merger.add(span.sort(sorter)); - spill(); - sortmaster.shutdown(); + /** + * Possible that the thread got interrupted when flush was happening or when the flush was + * never invoked. As a part of cleanup activity in TezTaskRunner, it would invoke close() + * on all I/O. At that time, this is safe to cleanup + */ + if (isThreadInterrupted()) { + return; + } + + try { + LOG.info("Starting flush of map output"); + span.end(); + merger.add(span.sort(sorter)); + spill(); + sortmaster.shutdown(); - //safe to clean up - bufferList.clear(); + //safe to clean up + bufferList.clear(); - numAdditionalSpills.increment(numSpills - 1); + numAdditionalSpills.increment(numSpills - 1); - if (!finalMergeEnabled) { - //Generate events for all spills - List<Event> events = Lists.newLinkedList(); + if (!finalMergeEnabled) { + //Generate events for all spills + List<Event> events = Lists.newLinkedList(); - //For pipelined shuffle, previous events are already sent. Just generate the last event alone - int startIndex = (pipelinedShuffle) ? (numSpills - 1) : 0; - int endIndex = numSpills; + //For pipelined shuffle, previous events are already sent. Just generate the last event alone + int startIndex = (pipelinedShuffle) ? (numSpills - 1) : 0; + int endIndex = numSpills; - for (int i = startIndex; i < endIndex; i++) { - boolean isLastEvent = (i == numSpills - 1); + for (int i = startIndex; i < endIndex; i++) { + boolean isLastEvent = (i == numSpills - 1); - String pathComponent = (outputContext.getUniqueIdentifier() + "_" + i); - ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent, - outputContext, i, indexCacheList.get(i), partitions, - sendEmptyPartitionDetails, pathComponent); - LOG.info("Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + i); + String pathComponent = (outputContext.getUniqueIdentifier() + "_" + i); + ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent, + outputContext, i, indexCacheList.get(i), partitions, + sendEmptyPartitionDetails, pathComponent); + LOG.info("Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + i); + } + outputContext.sendEvents(events); + //No need to generate final merge + return; } - outputContext.sendEvents(events); - //No need to generate final merge - return; - } - //In case final merge is required, the following code path is executed. - if(numSpills == 1) { - // someday be able to pass this directly to shuffle - // without writing to disk - final Path filename = spillFilePaths.get(0); - final Path indexFilename = spillFileIndexPaths.get(0); - finalOutputFile = mapOutputFile.getOutputFileForWriteInVolume(filename); - finalIndexFile = mapOutputFile.getOutputIndexFileForWriteInVolume(indexFilename); - - sameVolRename(filename, finalOutputFile); - sameVolRename(indexFilename, finalIndexFile); - if (LOG.isInfoEnabled()) { - LOG.info("numSpills=" + numSpills + ", finalOutputFile=" + finalOutputFile + ", " - + "finalIndexFile=" + finalIndexFile + ", filename=" + filename + ", indexFilename=" + - indexFilename); + //In case final merge is required, the following code path is executed. + if (numSpills == 1) { + // someday be able to pass this directly to shuffle + // without writing to disk + final Path filename = spillFilePaths.get(0); + final Path indexFilename = spillFileIndexPaths.get(0); + finalOutputFile = mapOutputFile.getOutputFileForWriteInVolume(filename); + finalIndexFile = mapOutputFile.getOutputIndexFileForWriteInVolume(indexFilename); + + sameVolRename(filename, finalOutputFile); + sameVolRename(indexFilename, finalIndexFile); + if (LOG.isInfoEnabled()) { + LOG.info("numSpills=" + numSpills + ", finalOutputFile=" + finalOutputFile + ", " + + "finalIndexFile=" + finalIndexFile + ", filename=" + filename + ", indexFilename=" + + indexFilename); + } + return; } - return; - } - finalOutputFile = - mapOutputFile.getOutputFileForWrite(0); //TODO - finalIndexFile = - mapOutputFile.getOutputIndexFileForWrite(0); //TODO + finalOutputFile = + mapOutputFile.getOutputFileForWrite(0); //TODO + finalIndexFile = + mapOutputFile.getOutputIndexFileForWrite(0); //TODO - if (LOG.isDebugEnabled()) { - LOG.debug("numSpills: " + numSpills + ", finalOutputFile:" + finalOutputFile + ", finalIndexFile:" - + finalIndexFile); - } + if (LOG.isDebugEnabled()) { + LOG.debug( + "numSpills: " + numSpills + ", finalOutputFile:" + finalOutputFile + ", finalIndexFile:" + + finalIndexFile); + } - //The output stream for the final single output file - FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096); + //The output stream for the final single output file + FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096); - final TezSpillRecord spillRec = new TezSpillRecord(partitions); + final TezSpillRecord spillRec = new TezSpillRecord(partitions); + for (int parts = 0; parts < partitions; parts++) { + //create the segments to be merged + List<Segment> segmentList = + new ArrayList<Segment>(numSpills); + for (int i = 0; i < numSpills; i++) { + Path spillFilename = spillFilePaths.get(i); + TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts); - for (int parts = 0; parts < partitions; parts++) { - //create the segments to be merged - List<Segment> segmentList = - new ArrayList<Segment>(numSpills); - for(int i = 0; i < numSpills; i++) { - Path spillFilename = spillFilePaths.get(i); - TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts); + Segment s = + new Segment(rfs, spillFilename, indexRecord.getStartOffset(), + indexRecord.getPartLength(), codec, ifileReadAhead, + ifileReadAheadLength, ifileBufferSize, true); + segmentList.add(i, s); + } - Segment s = - new Segment(rfs, spillFilename, indexRecord.getStartOffset(), - indexRecord.getPartLength(), codec, ifileReadAhead, - ifileReadAheadLength, ifileBufferSize, true); - segmentList.add(i, s); - } + int mergeFactor = + this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR, + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR_DEFAULT); + // sort the segments only if there are intermediate merges + boolean sortSegments = segmentList.size() > mergeFactor; + //merge + TezRawKeyValueIterator kvIter = TezMerger.merge(conf, rfs, + keyClass, valClass, codec, + segmentList, mergeFactor, + new Path(uniqueIdentifier), + (RawComparator) ConfigUtils.getIntermediateOutputKeyComparator(conf), + nullProgressable, sortSegments, true, + null, spilledRecordsCounter, null, + null); // Not using any Progress in TezMerger. Should just work. - int mergeFactor = - this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR, - TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR_DEFAULT); - // sort the segments only if there are intermediate merges - boolean sortSegments = segmentList.size() > mergeFactor; - //merge - TezRawKeyValueIterator kvIter = TezMerger.merge(conf, rfs, - keyClass, valClass, codec, - segmentList, mergeFactor, - new Path(uniqueIdentifier), - (RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(conf), - nullProgressable, sortSegments, true, - null, spilledRecordsCounter, null, - null); // Not using any Progress in TezMerger. Should just work. - - //write merged output to disk - long segmentStart = finalOut.getPos(); - Writer writer = - new Writer(conf, finalOut, keyClass, valClass, codec, - spilledRecordsCounter, null, merger.needsRLE()); - if (combiner == null || numSpills < minSpillsForCombine) { - TezMerger.writeFile(kvIter, writer, nullProgressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT); - } else { - runCombineProcessor(kvIter, writer); - } + //write merged output to disk + long segmentStart = finalOut.getPos(); + Writer writer = + new Writer(conf, finalOut, keyClass, valClass, codec, + spilledRecordsCounter, null, merger.needsRLE()); + if (combiner == null || numSpills < minSpillsForCombine) { + TezMerger.writeFile(kvIter, writer, nullProgressable, + TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT); + } else { + runCombineProcessor(kvIter, writer); + } - //close - writer.close(); + //close + writer.close(); - // record offsets - final TezIndexRecord rec = - new TezIndexRecord( - segmentStart, - writer.getRawLength(), - writer.getCompressedLength()); - spillRec.putIndex(rec, parts); - } + // record offsets + final TezIndexRecord rec = + new TezIndexRecord( + segmentStart, + writer.getRawLength(), + writer.getCompressedLength()); + spillRec.putIndex(rec, parts); + } - spillRec.writeToFile(finalIndexFile, conf); - finalOut.close(); - for(int i = 0; i < numSpills; i++) { - Path indexFilename = spillFileIndexPaths.get(i); - Path spillFilename = spillFilePaths.get(i); - rfs.delete(indexFilename,true); - rfs.delete(spillFilename,true); - } + spillRec.writeToFile(finalIndexFile, conf); + finalOut.close(); + for (int i = 0; i < numSpills; i++) { + Path indexFilename = spillFileIndexPaths.get(i); + Path spillFilename = spillFilePaths.get(i); + rfs.delete(indexFilename, true); + rfs.delete(spillFilename, true); + } - spillFileIndexPaths.clear(); - spillFilePaths.clear(); + spillFileIndexPaths.clear(); + spillFilePaths.clear(); + } catch(InterruptedException ie) { + if (cleanup) { + cleanup(); + } + Thread.currentThread().interrupt(); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java index 758e9c7..3b7bf05 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java @@ -76,7 +76,7 @@ public class TezMerger { TezCounter writesCounter, TezCounter bytesReadCounter, Progress mergePhase) - throws IOException { + throws IOException, InterruptedException { return new MergeQueue(conf, fs, inputs, deleteInputs, codec, ifileReadAhead, ifileReadAheadLength, ifileBufferSize, false, comparator, @@ -101,7 +101,7 @@ public class TezMerger { TezCounter mergedMapOutputsCounter, TezCounter bytesReadCounter, Progress mergePhase) - throws IOException { + throws IOException, InterruptedException { return new MergeQueue(conf, fs, inputs, deleteInputs, codec, ifileReadAhead, ifileReadAheadLength, ifileBufferSize, false, comparator, @@ -124,7 +124,7 @@ public class TezMerger { TezCounter writesCounter, TezCounter bytesReadCounter, Progress mergePhase) - throws IOException { + throws IOException, InterruptedException { // Get rid of this ? return merge(conf, fs, keyClass, valueClass, segments, mergeFactor, tmpDir, comparator, reporter, false, readsCounter, writesCounter, bytesReadCounter, @@ -142,7 +142,7 @@ public class TezMerger { TezCounter writesCounter, TezCounter bytesReadCounter, Progress mergePhase) - throws IOException { + throws IOException, InterruptedException { return new MergeQueue(conf, fs, segments, comparator, reporter, sortSegments, false).merge(keyClass, valueClass, mergeFactor, tmpDir, @@ -163,7 +163,7 @@ public class TezMerger { TezCounter writesCounter, TezCounter bytesReadCounter, Progress mergePhase) - throws IOException { + throws IOException, InterruptedException { return new MergeQueue(conf, fs, segments, comparator, reporter, sortSegments, codec, considerFinalMergeForProgress). merge(keyClass, valueClass, @@ -185,7 +185,7 @@ public class TezMerger { TezCounter writesCounter, TezCounter bytesReadCounter, Progress mergePhase) - throws IOException { + throws IOException, InterruptedException { return new MergeQueue(conf, fs, segments, comparator, reporter, sortSegments, codec, false).merge(keyClass, valueClass, mergeFactor, inMemSegments, @@ -196,9 +196,9 @@ public class TezMerger { } public static <K extends Object, V extends Object> - void writeFile(TezRawKeyValueIterator records, Writer writer, - Progressable progressable, long recordsBeforeProgress) - throws IOException { + void writeFile(TezRawKeyValueIterator records, Writer writer, + Progressable progressable, long recordsBeforeProgress) + throws IOException, InterruptedException { long recordCtr = 0; long count = 0; while(records.next()) { @@ -211,6 +211,15 @@ public class TezMerger { if (((recordCtr++) % recordsBeforeProgress) == 0) { progressable.progress(); + if (Thread.currentThread().isInterrupted()) { + /** + * Takes care DefaultSorter.mergeParts, MergeManager's merger threads, + * PipelinedSorter's flush(). This is not expensive check as it is carried out every + * 10000 records or so. + */ + throw new InterruptedException("Current thread=" + Thread.currentThread().getName() + " got " + + "interrupted"); + } } } if ((count > 0) && LOG.isDebugEnabled()) { @@ -614,7 +623,7 @@ public class TezMerger { TezCounter writesCounter, TezCounter bytesReadCounter, Progress mergePhase) - throws IOException { + throws IOException, InterruptedException { return merge(keyClass, valueClass, factor, 0, tmpDir, readsCounter, writesCounter, bytesReadCounter, mergePhase); } @@ -625,7 +634,7 @@ public class TezMerger { TezCounter writesCounter, TezCounter bytesReadCounter, Progress mergePhase) - throws IOException { + throws IOException, InterruptedException { LOG.info("Merging " + segments.size() + " sorted segments"); if (segments.size() == 0) { LOG.info("Nothing to merge. Returning an empty iterator"); http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java index 2cbb70a..9783c79 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java @@ -193,6 +193,9 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { spillDone.await(); } } catch (InterruptedException e) { + //interrupt spill thread + spillThread.interrupt(); + Thread.currentThread().interrupt(); throw new IOException("Spill thread failed to initialize", e); } finally { spillLock.unlock(); @@ -603,6 +606,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { spillDone.await(); } } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new IOException( "Buffer interrupted while waiting for the writer", e); } @@ -625,9 +629,45 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { } } + void interruptSpillThread() throws IOException { + assert !spillLock.isHeldByCurrentThread(); + // shut down spill thread and wait for it to exit. Since the preceding + // ensures that it is finished with its work (and sortAndSpill did not + // throw), we elect to use an interrupt instead of setting a flag. + // Spilling simultaneously from this thread while the spill thread + // finishes its work might be both a useful way to extend this and also + // sufficient motivation for the latter approach. + try { + spillThread.interrupt(); + spillThread.join(); + } catch (InterruptedException e) { + LOG.info("Spill thread interrupted"); + //Reset status + Thread.currentThread().interrupt(); + throw new IOException("Spill failed", e); + } + } + @Override public void flush() throws IOException { LOG.info("Starting flush of map output"); + if (Thread.currentThread().isInterrupted()) { + /** + * Possible that the thread got interrupted when flush was happening or when the flush was + * never invoked. As a part of cleanup activity in TezTaskRunner, it would invoke close() + * on all I/O. At that time, this is safe to cleanup + */ + if (cleanup) { + cleanup(); + } + try { + interruptSpillThread(); + } catch(IOException e) { + //safe to ignore + } + return; + } + spillLock.lock(); try { while (spillInProgress) { @@ -656,28 +696,25 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { sortAndSpill(); } } catch (InterruptedException e) { + //Reset status + Thread.currentThread().interrupt(); + interruptSpillThread(); throw new IOException("Interrupted while waiting for the writer", e); } finally { spillLock.unlock(); } - assert !spillLock.isHeldByCurrentThread(); - // shut down spill thread and wait for it to exit. Since the preceding - // ensures that it is finished with its work (and sortAndSpill did not - // throw), we elect to use an interrupt instead of setting a flag. - // Spilling simultaneously from this thread while the spill thread - // finishes its work might be both a useful way to extend this and also - // sufficient motivation for the latter approach. - try { - spillThread.interrupt(); - spillThread.join(); - } catch (InterruptedException e) { - throw new IOException("Spill failed", e); - } - // release sort buffer before the merge + + interruptSpillThread(); + // release sort buffer before the mergecl //FIXME //kvbuffer = null; - mergeParts(); + try { + mergeParts(); + } catch (InterruptedException e) { + cleanup(); + Thread.currentThread().interrupt(); + } if (finalMergeEnabled) { fileOutputByteCounter.increment(rfs.getFileStatus(finalOutputFile).getLen()); } @@ -715,6 +752,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { } } } catch (InterruptedException e) { + LOG.info("Spill thread interrupted"); Thread.currentThread().interrupt(); } finally { spillLock.unlock(); @@ -1085,7 +1123,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { outputContext.sendEvents(events); } - private void mergeParts() throws IOException { + private void mergeParts() throws IOException, InterruptedException { // get the approximate size of the final output/index files long finalOutFileSize = 0; long finalIndexFileSize = 0; http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java index d784fcd..49cf102 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java @@ -351,6 +351,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput { confKeys.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH); confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH); confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS); + confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT); } // TODO Maybe add helper methods to extract keys http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java index 62fa9a5..7fc9317 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java @@ -267,6 +267,7 @@ public class UnorderedKVInput extends AbstractLogicalInput { confKeys.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH); confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH); confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS); + confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT); } // TODO Maybe add helper methods to extract keys http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java index 6227fb9..53abc17 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java @@ -248,6 +248,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput { confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH); confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS); + confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT); } // TODO Maybe add helper methods to extract keys http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java index 08e6ec0..b50f17d 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java @@ -171,6 +171,7 @@ public class UnorderedKVOutput extends AbstractLogicalOutput { confKeys.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH); confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH); confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS); + confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT); } // TODO Maybe add helper methods to extract keys http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java index 38450ee..7498627 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java @@ -144,6 +144,7 @@ public class UnorderedPartitionedKVOutput extends AbstractLogicalOutput { confKeys.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH); confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH); confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS); + confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT); } // TODO Maybe add helper methods to extract keys http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java index edb9b15..f62179a 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java @@ -169,28 +169,28 @@ public class TestValuesIterator { } @Test(timeout = 20000) - public void testIteratorWithInMemoryReader() throws IOException { + public void testIteratorWithInMemoryReader() throws IOException, InterruptedException { ValuesIterator iterator = createIterator(true); verifyIteratorData(iterator); } @Test(timeout = 20000) - public void testIteratorWithIFileReader() throws IOException { + public void testIteratorWithIFileReader() throws IOException, InterruptedException { ValuesIterator iterator = createIterator(false); verifyIteratorData(iterator); } @Test(timeout = 20000) - public void testCountedIteratorWithInmemoryReader() throws IOException { + public void testCountedIteratorWithInmemoryReader() throws IOException, InterruptedException { verifyCountedIteratorReader(true); } @Test(timeout = 20000) - public void testCountedIteratorWithIFileReader() throws IOException { + public void testCountedIteratorWithIFileReader() throws IOException, InterruptedException { verifyCountedIteratorReader(false); } - private void verifyCountedIteratorReader(boolean inMemory) throws IOException { + private void verifyCountedIteratorReader(boolean inMemory) throws IOException, InterruptedException { TezCounter keyCounter = new GenericCounter("inputKeyCounter", "y3"); TezCounter tupleCounter = new GenericCounter("inputValuesCounter", "y4"); ValuesIterator iterator = createCountedIterator(inMemory, keyCounter, @@ -207,7 +207,7 @@ public class TestValuesIterator { } @Test(timeout = 20000) - public void testIteratorWithIFileReaderEmptyPartitions() throws IOException { + public void testIteratorWithIFileReaderEmptyPartitions() throws IOException, InterruptedException { ValuesIterator iterator = createEmptyIterator(false); assertTrue(iterator.moveToNext() == false); @@ -224,7 +224,8 @@ public class TestValuesIterator { } } - private ValuesIterator createEmptyIterator(boolean inMemory) throws IOException { + private ValuesIterator createEmptyIterator(boolean inMemory) + throws IOException, InterruptedException { if (!inMemory) { streamPaths = new Path[0]; //This will return EmptyIterator @@ -323,7 +324,7 @@ public class TestValuesIterator { * @return ValuesIterator * @throws IOException */ - private ValuesIterator createIterator(boolean inMemory) throws IOException { + private ValuesIterator createIterator(boolean inMemory) throws IOException, InterruptedException { if (!inMemory) { streamPaths = createFiles(); //Merge all files to get KeyValueIterator @@ -353,7 +354,8 @@ public class TestValuesIterator { * @return ValuesIterator * @throws IOException */ - private ValuesIterator createCountedIterator(boolean inMemory, TezCounter keyCounter, TezCounter tupleCounter) throws IOException { + private ValuesIterator createCountedIterator(boolean inMemory, TezCounter keyCounter, TezCounter tupleCounter) + throws IOException, InterruptedException { if (!inMemory) { streamPaths = createFiles(); //Merge all files to get KeyValueIterator http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java index 094237a..0faa22a 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java @@ -28,6 +28,7 @@ import java.util.LinkedList; import java.util.List; import java.util.UUID; +import com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -166,9 +167,32 @@ public class TestMergeManager { Assert.assertTrue(mergeManager.postMergeMemLimit == initialMemoryAvailable); } + class InterruptingThread implements Runnable { + + MergeManager.OnDiskMerger mergeThread; + + public InterruptingThread(MergeManager.OnDiskMerger mergeThread) { + this.mergeThread = mergeThread; + } + + @Override public void run() { + while(this.mergeThread.tmpDir == null) { + //this is tight loop + } + + this.mergeThread.interrupt(); + } + } + @Test(timeout = 10000) - public void testLocalDiskMergeMultipleTasks() throws IOException { + public void testLocalDiskMergeMultipleTasks() throws IOException, InterruptedException { + testLocalDiskMergeMultipleTasks(false); + testLocalDiskMergeMultipleTasks(true); + } + + void testLocalDiskMergeMultipleTasks(boolean interruptInMiddle) + throws IOException, InterruptedException { Configuration conf = new TezConfiguration(defaultConf); conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, false); conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, IntWritable.class.getName()); @@ -194,6 +218,7 @@ public class TestMergeManager { new MergeManager(conf, localFs, localDirAllocator, t0inputContext, null, null, null, null, t0exceptionReporter, 2000000, null, false, -1); MergeManager t0mergeManager = spy(t0mergeManagerReal); + t0mergeManager.configureAndStart(); MergeManager t1mergeManagerReal = new MergeManager(conf, localFs, localDirAllocator, t1inputContext, null, null, null, null, @@ -249,30 +274,48 @@ public class TestMergeManager { List<FileChunk> t0MergeFiles = new LinkedList<FileChunk>(); t0MergeFiles.addAll(t0mergeManager.onDiskMapOutputs); t0mergeManager.onDiskMapOutputs.clear(); - t0mergeManager.onDiskMerger.merge(t0MergeFiles); - Assert.assertEquals(1, t0mergeManager.onDiskMapOutputs.size()); - - - t1MapOutput0.commit(); - t1MapOutput1.commit(); - verify(t1mergeManager).closeOnDiskFile(t1MapOutput0.getOutputPath()); - verify(t1mergeManager).closeOnDiskFile(t1MapOutput1.getOutputPath()); - // Run the OnDiskMerge via MergeManager - // Simulate the thread invocation - remove files, and invoke merge - List<FileChunk> t1MergeFiles = new LinkedList<FileChunk>(); - t1MergeFiles.addAll(t1mergeManager.onDiskMapOutputs); - t1mergeManager.onDiskMapOutputs.clear(); - t1mergeManager.onDiskMerger.merge(t1MergeFiles); - Assert.assertEquals(1, t1mergeManager.onDiskMapOutputs.size()); - Assert.assertNotEquals(t0mergeManager.onDiskMapOutputs.iterator().next().getPath(), - t1mergeManager.onDiskMapOutputs.iterator().next().getPath()); + if (!interruptInMiddle) { + t0mergeManager.onDiskMerger.merge(t0MergeFiles); + Assert.assertEquals(1, t0mergeManager.onDiskMapOutputs.size()); + } else { + + //Start Interrupting thread + Thread interruptingThread = new Thread(new InterruptingThread(t0mergeManager.onDiskMerger)); + interruptingThread.start(); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } - Assert.assertTrue(t0mergeManager.onDiskMapOutputs.iterator().next().getPath().toString() - .contains(t0inputContext.getUniqueIdentifier())); - Assert.assertTrue(t1mergeManager.onDiskMapOutputs.iterator().next().getPath().toString() - .contains(t1inputContext.getUniqueIdentifier())); + //Will be interrupted in the middle by interruptingThread. + t0mergeManager.onDiskMerger.startMerge(Sets.newHashSet(t0MergeFiles)); + t0mergeManager.onDiskMerger.waitForMerge(); + Assert.assertNotEquals(1, t0mergeManager.onDiskMapOutputs.size()); + } + if (!interruptInMiddle) { + t1MapOutput0.commit(); + t1MapOutput1.commit(); + verify(t1mergeManager).closeOnDiskFile(t1MapOutput0.getOutputPath()); + verify(t1mergeManager).closeOnDiskFile(t1MapOutput1.getOutputPath()); + // Run the OnDiskMerge via MergeManager + // Simulate the thread invocation - remove files, and invoke merge + List<FileChunk> t1MergeFiles = new LinkedList<FileChunk>(); + t1MergeFiles.addAll(t1mergeManager.onDiskMapOutputs); + t1mergeManager.onDiskMapOutputs.clear(); + t1mergeManager.onDiskMerger.merge(t1MergeFiles); + Assert.assertEquals(1, t1mergeManager.onDiskMapOutputs.size()); + + Assert.assertNotEquals(t0mergeManager.onDiskMapOutputs.iterator().next().getPath(), + t1mergeManager.onDiskMapOutputs.iterator().next().getPath()); + + Assert.assertTrue(t0mergeManager.onDiskMapOutputs.iterator().next().getPath().toString() + .contains(t0inputContext.getUniqueIdentifier())); + Assert.assertTrue(t1mergeManager.onDiskMapOutputs.iterator().next().getPath().toString() + .contains(t1inputContext.getUniqueIdentifier())); + } } private InputContext createMockInputContext(String uniqueId) { http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java index bb932f2..b86d054 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java @@ -557,7 +557,8 @@ public class TestTezMerger { * @return * @throws IOException */ - private TezRawKeyValueIterator merge(List<Path> pathList, RawComparator rc) throws IOException { + private TezRawKeyValueIterator merge(List<Path> pathList, RawComparator rc) + throws IOException, InterruptedException { TezMerger merger = new TezMerger(); TezRawKeyValueIterator records = merger.merge(defaultConf, localFs, IntWritable.class, LongWritable.class, null, false, 0, 1024, pathList.toArray(new Path[pathList.size()]),
