Repository: tez Updated Branches: refs/heads/TEZ-2003 faad3a7db -> 437b57152
TEZ-2414. LogicalIOProcessorRuntimeTask, RuntimeTask, TezTaskRunner should handle interrupts & carry out necessary cleanups (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/437b5715 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/437b5715 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/437b5715 Branch: refs/heads/TEZ-2003 Commit: 437b571525a67d48d731b5cee207f4c898d055be Parents: faad3a7 Author: Rajesh Balamohan <[email protected]> Authored: Tue May 5 18:43:59 2015 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Tue May 5 18:43:59 2015 +0530 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../runtime/api/AbstractLogicalIOProcessor.java | 4 + .../api/ProcessorFrameworkInterface.java | 11 +- .../runtime/LogicalIOProcessorRuntimeTask.java | 137 ++++++++++ .../org/apache/tez/runtime/RuntimeTask.java | 5 + .../apache/tez/runtime/task/TezTaskRunner.java | 71 ++++- .../library/api/TezRuntimeConfiguration.java | 11 + .../common/readers/UnorderedKVReader.java | 1 + .../runtime/library/common/shuffle/Fetcher.java | 1 + .../common/shuffle/impl/ShuffleManager.java | 5 + .../orderedgrouped/FetcherOrderedGrouped.java | 7 +- .../shuffle/orderedgrouped/MergeManager.java | 124 +++++++-- .../shuffle/orderedgrouped/MergeThread.java | 18 +- .../common/shuffle/orderedgrouped/Shuffle.java | 16 +- .../orderedgrouped/ShuffleScheduler.java | 1 + .../common/sort/impl/ExternalSorter.java | 38 +++ .../common/sort/impl/PipelinedSorter.java | 261 +++++++++++-------- .../library/common/sort/impl/TezMerger.java | 31 ++- .../common/sort/impl/dflt/DefaultSorter.java | 70 +++-- .../library/input/OrderedGroupedKVInput.java | 1 + .../runtime/library/input/UnorderedKVInput.java | 1 + .../output/OrderedPartitionedKVOutput.java | 1 + .../library/output/UnorderedKVOutput.java | 1 + .../output/UnorderedPartitionedKVOutput.java | 1 + .../library/common/TestValuesIterator.java | 11 +- .../orderedgrouped/TestMergeManager.java | 87 +++++-- .../library/common/sort/impl/TestTezMerger.java | 3 +- 27 files changed, 724 insertions(+), 195 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/437b5715/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index 9fc9ed3..5f38358 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -1,4 +1,5 @@ ALL CHANGES: + TEZ-2414. LogicalIOProcessorRuntimeTask, RuntimeTask, TezTaskRunner should handle interrupts & carry out necessary cleanups. TEZ-2019. Temporarily allow the scheduler and launcher to be specified via configuration. TEZ-2006. Task communication plane needs to be pluggable. TEZ-2090. Add tests for jobs running in external services. http://git-wip-us.apache.org/repos/asf/tez/blob/437b5715/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java index 7714321..5a4cbe8 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java @@ -49,4 +49,8 @@ public abstract class AbstractLogicalIOProcessor implements LogicalIOProcessor, return context; } + @Override + public void abort() { + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/437b5715/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorFrameworkInterface.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorFrameworkInterface.java b/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorFrameworkInterface.java index f0ba9c9..89d4e3c 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorFrameworkInterface.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorFrameworkInterface.java @@ -18,9 +18,11 @@ package org.apache.tez.runtime.api; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + import java.util.List; -import org.apache.hadoop.classification.InterfaceAudience.Public; /** * Represents the Tez framework part of an {@link org.apache.tez.runtime.api.Processor}. @@ -56,4 +58,11 @@ public interface ProcessorFrameworkInterface { * if an error occurs */ public void close() throws Exception; + + /** + * Indicates <code>Processor</code> to abort. Cleanup can be done. + * + */ + @Unstable + public void abort(); } http://git-wip-us.apache.org/repos/asf/tez/blob/437b5715/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java index 80c2717..8c76f88 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java @@ -19,10 +19,14 @@ package org.apache.tez.runtime; import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -36,6 +40,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; +import com.google.common.base.Throwables; import org.apache.commons.lang.exception.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -107,6 +112,10 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { private final List<GroupInputSpec> groupInputSpecs; private ConcurrentHashMap<String, MergedLogicalInput> groupInputsMap; + private final ConcurrentHashMap<String, LogicalInput> initializedInputs; + private final ConcurrentHashMap<String, LogicalOutput> initializedOutputs; + private boolean processorClosed; + private final ProcessorDescriptor processorDescriptor; private AbstractLogicalIOProcessor processor; private ProcessorContext processorContext; @@ -158,6 +167,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { this.runInputMap = new LinkedHashMap<String, LogicalInput>(); this.runOutputMap = new LinkedHashMap<String, LogicalOutput>(); + this.initializedInputs = new ConcurrentHashMap<String, LogicalInput>(); + this.initializedOutputs = new ConcurrentHashMap<String, LogicalOutput>(); + this.processorDescriptor = taskSpec.getProcessorDescriptor(); this.serviceConsumerMetadata = serviceConsumerMetadata; this.envMap = envMap; @@ -336,11 +348,13 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { this.state.set(State.CLOSED); // Close the Processor. + processorClosed = true; processor.close(); // Close the Inputs. for (InputSpec inputSpec : inputSpecs) { String srcVertexName = inputSpec.getSourceVertexName(); + initializedInputs.remove(srcVertexName); List<Event> closeInputEvents = ((InputFrameworkInterface)inputsMap.get(srcVertexName)).close(); sendTaskGeneratedEvents(closeInputEvents, EventProducerConsumerType.INPUT, taskSpec.getVertexName(), @@ -350,6 +364,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { // Close the Outputs. for (OutputSpec outputSpec : outputSpecs) { String destVertexName = outputSpec.getDestinationVertexName(); + initializedOutputs.remove(destVertexName); List<Event> closeOutputEvents = ((LogicalOutputFrameworkInterface)outputsMap.get(destVertexName)).close(); sendTaskGeneratedEvents(closeOutputEvents, EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(), @@ -389,6 +404,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { inputContext.getTaskVertexName(), inputContext.getSourceVertexName(), taskSpec.getTaskAttemptID()); LOG.info("Initialized Input with src edge: " + edgeName); + initializedInputs.put(edgeName, input); return null; } } @@ -437,6 +453,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { outputContext.getTaskVertexName(), outputContext.getDestinationVertexName(), taskSpec.getTaskAttemptID()); LOG.info("Initialized Output with dest edge: " + edgeName); + initializedOutputs.put(edgeName, output); return null; } } @@ -662,6 +679,13 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { eventsToBeProcessed.addAll(events); } + @Override + public synchronized void abortTask() throws Exception { + if (processor != null) { + processor.abort(); + } + } + private void startRouterThread() { eventRouterThread = new Thread(new RunnableWithNdc() { public void runInternal() { @@ -681,6 +705,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { if (!isTaskDone()) { LOG.warn("Event Router thread interrupted. Returning."); } + Thread.currentThread().interrupt(); return; } } @@ -692,13 +717,125 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { eventRouterThread.start(); } + private void maybeResetInterruptStatus() { + if (!Thread.currentThread().isInterrupted()) { + Thread.currentThread().interrupt(); + } + } + public synchronized void cleanup() { + + /** + * Cleanup IPO that are not closed. In case, regular close() has happened in IPO, they + * would not be available in the IPOs to be cleaned. So this is safe. + * + * e.g whenever input gets closed() in normal way, it automatically removes it from + * initializedInputs map. + * + * In case any exception happens in processor close or IO close, it wouldn't be removed from + * the initialized IO data structures and here is the chance to close them and release + * resources. + * + */ + if (LOG.isDebugEnabled()) { + LOG.debug("Processor closed={}", processorClosed); + LOG.debug("Num of inputs to be closed={}", initializedInputs.size()); + LOG.debug("Num of outputs to be closed={}", initializedOutputs.size()); + } + if (!processorClosed) { + try { + processorClosed = true; + processor.close(); + + LOG.info("Closed processor for vertex={}, index={}, interruptedStatus={}", processor + .getContext().getTaskVertexName(), processor.getContext().getTaskVertexIndex(), + Thread.currentThread().isInterrupted()); + + maybeResetInterruptStatus(); + } catch (InterruptedException ie) { + //reset the status + LOG.info("Resetting interrupt for processor"); + Thread.currentThread().interrupt(); + } catch (Throwable e) { + LOG.warn("Exception when closing processor", e); + } + } + + // Close the remaining inited Inputs. + Iterator<String> srcVertexItr = initializedInputs.keySet().iterator(); + while (srcVertexItr.hasNext()) { + String srcVertexName = srcVertexItr.next(); + try { + srcVertexItr.remove(); + + ((InputFrameworkInterface) initializedInputs.get(srcVertexName)).close(); + initializedInputs.remove(srcVertexName); + + maybeResetInterruptStatus(); + } catch (InterruptedException ie) { + //reset the status + LOG.info("Resetting interrupt status for input with srcVertexName={}", srcVertexName); + Thread.currentThread().interrupt(); + } catch (Throwable e) { + LOG.warn("Exception when closing input in cleanup(interrupted)", e); + } finally { + LOG.info("Close input for vertex={}, sourceVertex={}, interruptedStatus={}", processor + .getContext().getTaskVertexName(), srcVertexName, Thread.currentThread() + .isInterrupted()); + } + } + + // Close the remaining inited Outputs. + try { + Iterator<String> outVertexItr = initializedOutputs.keySet().iterator(); + while (outVertexItr.hasNext()) { + String destVertexName = outVertexItr.next(); + try { + outVertexItr.remove(); + + ((OutputFrameworkInterface) initializedOutputs.get(destVertexName)).close(); + initializedOutputs.remove(destVertexName); + + maybeResetInterruptStatus(); + } catch (InterruptedException ie) { + //reset the status + LOG.info("Resetting interrupt status for output with destVertexName={}", destVertexName); + Thread.currentThread().interrupt(); + } catch (Throwable e) { + LOG.warn("Exception when closing output in cleanup(interrupted)", e); + } finally { + LOG.info("Close input for vertex={}, sourceVertex={}, interruptedStatus={}", processor + .getContext().getTaskVertexName(), destVertexName, Thread.currentThread() + .isInterrupted()); + } + } + } catch (Throwable e) { + LOG.warn(Throwables.getStackTraceAsString(e)); + } + + if (LOG.isDebugEnabled()) { + printThreads(); + } + LOG.info("Final Counters : " + getCounters().toShortString()); setTaskDone(); if (eventRouterThread != null) { eventRouterThread.interrupt(); } } + + /** + * Print all threads in JVM (only for debugging) + */ + void printThreads() { + //Print the status of all threads in JVM + ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); + long[] threadIds = threadMXBean.getAllThreadIds(); + for (Long id : threadIds) { + ThreadInfo threadInfo = threadMXBean.getThreadInfo(id); + LOG.info("ThreadId : " + id + ", name=" + threadInfo.getThreadName()); + } + } @Private @VisibleForTesting http://git-wip-us.apache.org/repos/asf/tez/blob/437b5715/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java index f8b8621..162caf0 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java @@ -72,6 +72,10 @@ public abstract class RuntimeTask { protected final AtomicReference<State> state = new AtomicReference<State>(); + public boolean isRunning() { + return (state.get() == State.RUNNING); + } + public TezCounters addAndGetTezCounter(String name) { TezCounters counter = new TezCounters(); counterMap.put(name, counter); @@ -143,4 +147,5 @@ public abstract class RuntimeTask { taskDone.set(true); } + public abstract void abortTask() throws Exception; } http://git-wip-us.apache.org/repos/asf/tez/blob/437b5715/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java index 33a7f4a..7238d5e 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java @@ -25,8 +25,13 @@ import java.security.PrivilegedExceptionAction; import java.util.Collection; import java.util.Map; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import com.google.common.base.Throwables; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSError; @@ -35,6 +40,7 @@ import org.apache.tez.common.CallableWithNdc; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask; +import org.apache.tez.runtime.RuntimeTask; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.ObjectRegistry; import org.apache.tez.runtime.api.impl.EventMetaData; @@ -61,6 +67,7 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter { private final ListeningExecutorService executor; private volatile ListenableFuture<Void> taskFuture; private volatile Thread waitingThread; + private volatile Thread taskRunner; private volatile Throwable firstException; // Effectively a duplicate check, since hadFatalError does the same thing. @@ -96,7 +103,10 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter { taskReporter.registerTask(task, this); TaskRunnerCallable callable = new TaskRunnerCallable(); Throwable failureCause = null; - taskFuture = executor.submit(callable); + if (!Thread.currentThread().isInterrupted()) { + taskFuture = executor.submit(callable); + return isShutdownRequested(); + } try { taskFuture.get(); @@ -158,6 +168,10 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter { } } } + return isShutdownRequested(); + } + + private boolean isShutdownRequested() { if (shutdownRequested.get()) { LOG.info("Shutdown requested... returning"); return false; @@ -173,11 +187,14 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter { @Override public Void run() throws Exception { try { + taskRunner = Thread.currentThread(); LOG.info("Initializing task" + ", taskAttemptId=" + task.getTaskAttemptID()); task.initialize(); if (!Thread.currentThread().isInterrupted() && firstException == null) { LOG.info("Running task, taskAttemptId=" + task.getTaskAttemptID()); task.run(); + maybeInterruptWaitingThread(); + LOG.info("Closing task, taskAttemptId=" + task.getTaskAttemptID()); task.close(); task.setFrameworkCounters(); @@ -199,6 +216,12 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter { } return null; } catch (Throwable cause) { + if (Thread.currentThread().isInterrupted()) { + LOG.info("TaskRunnerCallable interrupted=" + Thread.currentThread().isInterrupted() + + ", shutdownRequest=" + shutdownRequested.get()); + Thread.currentThread().interrupt(); + return null; + } if (cause instanceof FSError) { // Not immediately fatal, this is an error reported by Hadoop FileSystem maybeRegisterFirstException(cause); @@ -255,6 +278,17 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter { taskRunning.set(false); } } + + private void maybeInterruptWaitingThread() { + /** + * Possible that the processor is swallowing InterruptException of taskRunner.interrupt(). + * In such case, interrupt the waitingThread based on the shutdownRequested flag, so that + * entire task gets cancelled. + */ + if (shutdownRequested.get()) { + waitingThread.interrupt(); + } + } } // should wait until all messages are sent to AM before TezChild shutdown @@ -353,10 +387,43 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter { } } + private void abortRunningTask() { + if (!taskRunning.get()) { + LOG.info("Task is not running"); + waitingThread.interrupt(); + return; + } + + if (taskRunning.get()) { + try { + task.abortTask(); + } catch (Exception e) { + LOG.warn("Error when aborting the task", e); + try { + sendFailure(e, "Error when aborting the task"); + } catch (Exception ignored) { + // Ignored. + } + } + } + //Interrupt the relevant threads. TaskRunner should be interrupted preferably. + if (isTaskRunning()) { + LOG.info("Interrupting taskRunner=" + taskRunner.getName()); + taskRunner.interrupt(); + } else { + LOG.info("Interrupting waitingThread=" + waitingThread.getName()); + waitingThread.interrupt(); + } + } + + private boolean isTaskRunning() { + return (taskRunning.get() && task.isRunning()); + } + @Override public void shutdownRequested() { shutdownRequested.set(true); - waitingThread.interrupt(); + abortRunningTask(); } private String getTaskDiagnosticsString(Throwable t, String message) { http://git-wip-us.apache.org/repos/asf/tez/blob/437b5715/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java index a818de8..3d9a701 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java @@ -311,6 +311,16 @@ public class TezRuntimeConfiguration { */ public static final boolean TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH_DEFAULT = false; + /** + * Used only for internal testing. Strictly not recommended to be used elsewhere. This + * parameter could be changed/dropped later. + */ + @Unstable + @Private + public static final String TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT = TEZ_RUNTIME_PREFIX + + "cleanup.files.on.interrupt"; + public static final boolean TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT_DEFAULT = false; + // TODO TEZ-1233 - allow this property to be set per vertex // TODO TEZ-1231 - move these properties out since they are not relevant for Inputs / Outputs @@ -374,6 +384,7 @@ public class TezRuntimeConfiguration { tezRuntimeKeys.add(TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH); tezRuntimeKeys.add(TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT); tezRuntimeKeys.add(TEZ_RUNTIME_SORTER_CLASS); + tezRuntimeKeys.add(TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT); defaultConf.addResource("core-default.xml"); defaultConf.addResource("core-site.xml"); http://git-wip-us.apache.org/repos/asf/tez/blob/437b5715/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java index 454d0ed..cc6e352 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java @@ -176,6 +176,7 @@ public class UnorderedKVReader<K, V> extends KeyValueReader { currentFetchedInput = shuffleManager.getNextInput(); } catch (InterruptedException e) { LOG.warn("Interrupted while waiting for next available input", e); + Thread.currentThread().interrupt(); throw new IOException(e); } if (currentFetchedInput == null) { http://git-wip-us.apache.org/repos/asf/tez/blob/437b5715/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java index 01f5bd6..a553210 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java @@ -378,6 +378,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { // fall back to HTTP fetch below LOG.warn("Double locking detected for " + host); } catch (InterruptedException sleepInterrupted) { + Thread.currentThread().interrupt(); // fall back to HTTP fetch below LOG.warn("Lock was interrupted for " + host); } finally { http://git-wip-us.apache.org/repos/asf/tez/blob/437b5715/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index 5075578..a8d3553 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -748,6 +748,11 @@ public class ShuffleManager implements FetcherCallback { /////////////////// End of Methods from FetcherCallbackHandler public void shutdown() throws InterruptedException { + if (Thread.currentThread().isInterrupted()) { + //TODO: need to cleanup all FetchedInput (DiskFetchedInput, LocalDisFetchedInput), lockFile + //As of now relying on job cleanup (when all directories would be cleared) + LOG.info("Thread interrupted. Need to cleanup the local dirs"); + } if (!isShutdown.getAndSet(true)) { // Shut down any pending fetchers LOG.info("Shutting down pending fetchers on source" + srcNameTrimmed + ": " http://git-wip-us.apache.org/repos/asf/tez/blob/437b5715/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java index 8d20aa7..fbaabff 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java @@ -177,6 +177,9 @@ class FetcherOrderedGrouped extends Thread { } } } catch (InterruptedException ie) { + //TODO: might not be respected when fetcher is in progress / server is busy. TEZ-711 + //Set the status back + Thread.currentThread().interrupt(); return; } catch (Throwable t) { shuffle.reportException(t); @@ -191,7 +194,9 @@ class FetcherOrderedGrouped extends Thread { try { join(5000); } catch (InterruptedException ie) { - LOG.warn("Got interrupt while joining " + getName(), ie); + //Reset the status + Thread.currentThread().interrupt(); + LOG.warn("Got interrupt while joining " + getName()); } } http://git-wip-us.apache.org/repos/asf/tez/blob/437b5715/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java index 2e6ebd9..5a35f2f 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java @@ -19,6 +19,7 @@ package org.apache.tez.runtime.library.common.shuffle.orderedgrouped; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -137,6 +138,8 @@ public class MergeManager { private AtomicInteger mergeFileSequenceId = new AtomicInteger(0); + private final boolean cleanup; + /** * Construct the MergeManager. Must call start before it becomes usable. */ @@ -174,6 +177,9 @@ public class MergeManager { this.additionalBytesWritten = inputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN); this.additionalBytesRead = inputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ); + this.cleanup = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT, + TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT_DEFAULT); + this.codec = codec; this.ifileReadAhead = ifileReadAheadEnabled; if (this.ifileReadAhead) { @@ -514,27 +520,61 @@ public class MergeManager { public boolean isMergeComplete() { return finalMergeComplete; } - + public TezRawKeyValueIterator close() throws Throwable { // Wait for on-going merges to complete - if (memToMemMerger != null) { + if (memToMemMerger != null) { memToMemMerger.close(); } inMemoryMerger.close(); onDiskMerger.close(); - - List<MapOutput> memory = + + List<MapOutput> memory = new ArrayList<MapOutput>(inMemoryMergedMapOutputs); inMemoryMergedMapOutputs.clear(); memory.addAll(inMemoryMapOutputs); inMemoryMapOutputs.clear(); List<FileChunk> disk = new ArrayList<FileChunk>(onDiskMapOutputs); onDiskMapOutputs.clear(); - TezRawKeyValueIterator kvIter = finalMerge(conf, rfs, memory, disk); - this.finalMergeComplete = true; - return kvIter; + try { + TezRawKeyValueIterator kvIter = finalMerge(conf, rfs, memory, disk); + this.finalMergeComplete = true; + return kvIter; + } catch(InterruptedException e) { + //Cleanup the disk segments + if (cleanup) { + cleanup(localFS, disk); + cleanup(localFS, onDiskMapOutputs); + } + Thread.currentThread().interrupt(); //reset interrupt status + throw e; + } + } + + + static void cleanup(FileSystem fs, Collection<FileChunk> fileChunkList) { + for (FileChunk fileChunk : fileChunkList) { + cleanup(fs, fileChunk.getPath()); + } } - + + static void cleanup(FileSystem fs, Path path) { + if (path == null) { + return; + } + + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Deleting " + path); + } + fs.delete(path, true); + } catch (IOException e) { + LOG.info("Error in deleting " + path); + } + } + + + void runCombineProcessor(TezRawKeyValueIterator kvIter, Writer writer) throws IOException, InterruptedException { combiner.combine(kvIter, writer); @@ -555,7 +595,7 @@ public class MergeManager { } @Override - public void merge(List<MapOutput> inputs) throws IOException { + public void merge(List<MapOutput> inputs) throws IOException, InterruptedException { if (inputs == null || inputs.size() == 0) { return; } @@ -597,13 +637,28 @@ public class MergeManager { // Note the output of the merge closeInMemoryMergedFile(mergedMapOutputs); } + + @Override + public void cleanup(List<MapOutput> inputs, boolean deleteData) throws IOException, + InterruptedException { + //No OP + } } /** * Merges multiple in-memory segment to a disk segment */ private class InMemoryMerger extends MergeThread<MapOutput> { - + + @VisibleForTesting + volatile InputAttemptIdentifier srcTaskIdentifier; + + @VisibleForTesting + volatile Path outputPath; + + @VisibleForTesting + volatile Path tmpDir; + public InMemoryMerger(MergeManager manager) { super(manager, Integer.MAX_VALUE, exceptionReporter); setName("MemtoDiskMerger [" + TezUtilsInternal @@ -628,7 +683,7 @@ public class MergeManager { //in the merge method) //figure out the mapId - InputAttemptIdentifier srcTaskIdentifier = inputs.get(0).getAttemptIdentifier(); + srcTaskIdentifier = inputs.get(0).getAttemptIdentifier(); List<Segment> inMemorySegments = new ArrayList<Segment>(); long mergeOutputSize = @@ -639,7 +694,7 @@ public class MergeManager { // All disk writes done by this merge are overhead - due to the lac of // adequate memory to keep all segments in memory. - Path outputPath = mapOutputFile.getInputFileForWrite( + outputPath = mapOutputFile.getInputFileForWrite( srcTaskIdentifier.getInputIdentifier().getInputIndex(), srcTaskIdentifier.getSpillEventId(), mergeOutputSize).suffix(Constants.MERGED_OUTPUT_PREFIX); LOG.info("Patch..InMemoryMerger outputPath: " + outputPath); @@ -657,13 +712,13 @@ public class MergeManager { LOG.info("Initiating in-memory merge with " + noInMemorySegments + " segments..."); + tmpDir = new Path(inputContext.getUniqueIdentifier()); // Nothing actually materialized to disk - controlled by setting sort-factor to #segments. rIter = TezMerger.merge(conf, rfs, (Class)ConfigUtils.getIntermediateInputKeyClass(conf), (Class)ConfigUtils.getIntermediateInputValueClass(conf), inMemorySegments, inMemorySegments.size(), - new Path(inputContext.getUniqueIdentifier()), - (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf), + tmpDir, (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf), nullProgressable, spilledRecordsCounter, null, additionalBytesRead, null); // spilledRecordsCounter is tracking the number of keys that will be // read from each of the segments being merged - which is essentially @@ -700,6 +755,18 @@ public class MergeManager { closeOnDiskFile(new FileChunk(outputPath, 0, outFileLen)); } + @Override + public void cleanup(List<MapOutput> inputs, boolean deleteData) + throws IOException, InterruptedException { + if (deleteData) { + //Additional check at task level + if (cleanup) { + LOG.info("Try deleting stale data"); + MergeManager.cleanup(localFS, outputPath); + MergeManager.cleanup(localFS, tmpDir); + } + } + } } /** @@ -708,6 +775,11 @@ public class MergeManager { @VisibleForTesting class OnDiskMerger extends MergeThread<FileChunk> { + @VisibleForTesting + volatile Path outputPath; + @VisibleForTesting + volatile Path tmpDir; + public OnDiskMerger(MergeManager manager) { super(manager, ioSortFactor, exceptionReporter); setName("DiskToDiskMerger [" + TezUtilsInternal @@ -716,7 +788,7 @@ public class MergeManager { } @Override - public void merge(List<FileChunk> inputs) throws IOException { + public void merge(List<FileChunk> inputs) throws IOException, InterruptedException { // sanity check if (inputs == null || inputs.isEmpty()) { LOG.info("No ondisk files to merge..."); @@ -768,7 +840,7 @@ public class MergeManager { // namePart includes the suffix of the file. We need to remove it. namePart = FilenameUtils.removeExtension(namePart); - Path outputPath = localDirAllocator.getLocalPathForWrite(namePart, approxOutputSize, conf); + outputPath = localDirAllocator.getLocalPathForWrite(namePart, approxOutputSize, conf); outputPath = outputPath.suffix(Constants.MERGED_OUTPUT_PREFIX + mergeFileSequenceId.getAndIncrement()); Writer writer = @@ -776,7 +848,7 @@ public class MergeManager { (Class)ConfigUtils.getIntermediateInputKeyClass(conf), (Class)ConfigUtils.getIntermediateInputValueClass(conf), codec, null, null); - Path tmpDir = new Path(inputContext.getUniqueIdentifier()); + tmpDir = new Path(inputContext.getUniqueIdentifier()); try { TezRawKeyValueIterator iter = TezMerger.merge(conf, rfs, (Class)ConfigUtils.getIntermediateInputKeyClass(conf), @@ -808,6 +880,20 @@ public class MergeManager { " Local output file is " + outputPath + " of size " + outputLen); } + + @Override + public void cleanup(List<FileChunk> inputs, boolean deleteData) throws IOException, + InterruptedException { + if (deleteData) { + //Additional check at task level + if (cleanup) { + LOG.info("Try deleting stale data"); + MergeManager.cleanup(localFS, inputs); + MergeManager.cleanup(localFS, outputPath); + MergeManager.cleanup(localFS, tmpDir); + } + } + } } private long createInMemorySegments(List<MapOutput> inMemoryMapOutputs, @@ -821,7 +907,7 @@ public class MergeManager { for (MapOutput mo : inMemoryMapOutputs) { fullSize += mo.getMemory().length; } - while(fullSize > leaveBytes) { + while((fullSize > leaveBytes) && !Thread.currentThread().isInterrupted()) { MapOutput mo = inMemoryMapOutputs.remove(0); byte[] data = mo.getMemory(); long size = data.length; @@ -878,7 +964,7 @@ public class MergeManager { private TezRawKeyValueIterator finalMerge(Configuration job, FileSystem fs, List<MapOutput> inMemoryMapOutputs, List<FileChunk> onDiskMapOutputs - ) throws IOException { + ) throws IOException, InterruptedException { LOG.info("finalMerge called with " + inMemoryMapOutputs.size() + " in-memory map-outputs and " + onDiskMapOutputs.size() + " on-disk map-outputs"); http://git-wip-us.apache.org/repos/asf/tez/blob/437b5715/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeThread.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeThread.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeThread.java index d4faf51..52b4c5b 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeThread.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeThread.java @@ -46,8 +46,18 @@ abstract class MergeThread<T> extends Thread { public synchronized void close() throws InterruptedException { closed = true; - waitForMerge(); - interrupt(); + if (!Thread.currentThread().isInterrupted()) { + waitForMerge(); + interrupt(); + } else { + try { + interrupt(); + cleanup(inputs, Thread.currentThread().isInterrupted()); + } catch (IOException e) { + //ignore + LOG.warn("Error cleaning up", e); + } + } } public synchronized boolean isInProgress() { @@ -89,6 +99,7 @@ abstract class MergeThread<T> extends Thread { merge(inputs); } catch (InterruptedException ie) { // Meant to handle a shutdown of the entire fetch/merge process + Thread.currentThread().interrupt(); return; } catch(Throwable t) { reporter.reportException(t); @@ -106,4 +117,7 @@ abstract class MergeThread<T> extends Thread { public abstract void merge(List<T> inputs) throws IOException, InterruptedException; + + public abstract void cleanup(List<T> inputs, boolean deleteData) + throws IOException, InterruptedException; } http://git-wip-us.apache.org/repos/asf/tez/blob/437b5715/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java index f98aa3a..442f032 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java @@ -357,6 +357,7 @@ public class Shuffle implements ExceptionReporter { shufflePhaseTime.setValue(System.currentTimeMillis() - startTime); // Stop the map-output fetcher threads + LOG.info("Cleaning up fetchers"); cleanupFetchers(false); // stop the scheduler @@ -393,8 +394,7 @@ public class Shuffle implements ExceptionReporter { for (FetcherOrderedGrouped fetcher : fetchers) { try { fetcher.shutDown(); - LOG.info("Shutdown.." + fetcher.getName() + ", status:" + fetcher.isAlive() + ", " - + "isInterrupted:" + fetcher.isInterrupted()); + LOG.info("Shutdown.." + fetcher.getName()); } catch (InterruptedException e) { if (ignoreErrors) { LOG.info("Interrupted while shutting down fetchers. Ignoring."); @@ -425,6 +425,8 @@ public class Shuffle implements ExceptionReporter { scheduler.close(); } catch (InterruptedException e) { if (ignoreErrors) { + //Reset the status + Thread.currentThread().interrupt(); LOG.info("Interrupted while attempting to close the scheduler during cleanup. Ignoring"); } else { throw e; @@ -437,6 +439,14 @@ public class Shuffle implements ExceptionReporter { if (!mergerClosed.getAndSet(true)) { try { merger.close(); + } catch (InterruptedException e) { + if (ignoreErrors) { + //Reset the status + Thread.currentThread().interrupt(); + LOG.info("Interrupted while attempting to close the merger during cleanup. Ignoring"); + } else { + throw e; + } } catch (Throwable e) { if (ignoreErrors) { LOG.info("Exception while trying to shutdown merger, Ignoring", e); @@ -493,7 +503,7 @@ public class Shuffle implements ExceptionReporter { @Override public void onFailure(Throwable t) { if (isShutDown.get()) { - LOG.info("Already shutdown. Ignoring error: ", t); + LOG.info("Already shutdown. Ignoring error"); } else { LOG.error("ShuffleRunner failed with error", t); inputContext.fatalError(t, "Shuffle Runner Failed"); http://git-wip-us.apache.org/repos/asf/tez/blob/437b5715/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index a3d79ae..c54b005 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -765,6 +765,7 @@ class ShuffleScheduler { } } } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); // This handles shutdown of the entire fetch / merge process. return; } catch (Throwable t) { http://git-wip-us.apache.org/repos/asf/tez/blob/437b5715/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java index c0445c9..ca4d889 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java @@ -115,6 +115,8 @@ public abstract class ExternalSorter { protected Path finalIndexFile; protected int numSpills; + protected final boolean cleanup; + // Counters // MR compatilbity layer needs to rename counters back to what MR requries. @@ -148,6 +150,9 @@ public abstract class ExternalSorter { this.conf = conf; this.partitions = numOutputs; + cleanup = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT, + TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT_DEFAULT); + rfs = ((LocalFileSystem)FileSystem.getLocal(this.conf)).getRaw(); LOG.info("Initial Mem : " + initialMemoryAvailable + ", assignedMb=" + ((initialMemoryAvailable >> 20))); @@ -261,6 +266,7 @@ public abstract class ExternalSorter { try { combiner.combine(kvIter, writer); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new IOException(e); } } @@ -314,4 +320,36 @@ public abstract class ExternalSorter { public int getNumSpills() { return numSpills; } + + protected synchronized void cleanup() throws IOException { + if (!cleanup) { + return; + } + cleanup(spillFilePaths); + cleanup(spillFileIndexPaths); + //TODO: What if when same volume rename happens (have to rely on job completion cleanup) + cleanup(finalOutputFile); + cleanup(finalIndexFile); + } + + protected synchronized void cleanup(Path path) { + if (path == null || !cleanup) { + return; + } + try { + LOG.info("Deleting " + path); + rfs.delete(path, true); + } catch(IOException ioe) { + LOG.warn("Error in deleting " + path); + } + } + + protected synchronized void cleanup(Map<Integer, Path> spillMap) { + if (!cleanup) { + return; + } + for(Map.Entry<Integer, Path> entry : spillMap.entrySet()) { + cleanup(entry.getValue()); + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/437b5715/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java index 65606bf..3fae2f6 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java @@ -33,7 +33,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; - import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; @@ -356,6 +355,9 @@ public class PipelinedSorter extends ExternalSorter { merger.ready(); // wait for all the future results from sort threads LOG.info("Spilling to " + filename.toString()); for (int i = 0; i < partitions; ++i) { + if (isThreadInterrupted()) { + return; + } TezRawKeyValueIterator kvIter = merger.filter(i); //write merged output to disk long segmentStart = out.getPos(); @@ -391,147 +393,182 @@ public class PipelinedSorter extends ExternalSorter { ++numSpills; } catch(InterruptedException ie) { // TODO:the combiner has been interrupted + Thread.currentThread().interrupt(); } finally { out.close(); } } + + + + + private boolean isThreadInterrupted() throws IOException { + if (Thread.currentThread().isInterrupted()) { + if (cleanup) { + cleanup(); + } + sortmaster.shutdownNow(); + LOG.info("Thread interrupted, cleaned up stale data, sorter threads shutdown=" + sortmaster + .isShutdown() + ", terminated=" + sortmaster.isTerminated()); + return true; + } + return false; + } + @Override public void flush() throws IOException { final String uniqueIdentifier = outputContext.getUniqueIdentifier(); - LOG.info("Starting flush of map output"); - span.end(); - merger.add(span.sort(sorter)); - spill(); - sortmaster.shutdown(); + /** + * Possible that the thread got interrupted when flush was happening or when the flush was + * never invoked. As a part of cleanup activity in TezTaskRunner, it would invoke close() + * on all I/O. At that time, this is safe to cleanup + */ + if (isThreadInterrupted()) { + return; + } + + try { + LOG.info("Starting flush of map output"); + span.end(); + merger.add(span.sort(sorter)); + spill(); + sortmaster.shutdown(); - //safe to clean up - bufferList.clear(); + //safe to clean up + bufferList.clear(); - numAdditionalSpills.increment(numSpills - 1); + numAdditionalSpills.increment(numSpills - 1); - if (!finalMergeEnabled) { - //Generate events for all spills - List<Event> events = Lists.newLinkedList(); + if (!finalMergeEnabled) { + //Generate events for all spills + List<Event> events = Lists.newLinkedList(); - //For pipelined shuffle, previous events are already sent. Just generate the last event alone - int startIndex = (pipelinedShuffle) ? (numSpills - 1) : 0; - int endIndex = numSpills; + //For pipelined shuffle, previous events are already sent. Just generate the last event alone + int startIndex = (pipelinedShuffle) ? (numSpills - 1) : 0; + int endIndex = numSpills; - for (int i = startIndex; i < endIndex; i++) { - boolean isLastEvent = (i == numSpills - 1); + for (int i = startIndex; i < endIndex; i++) { + boolean isLastEvent = (i == numSpills - 1); - String pathComponent = (outputContext.getUniqueIdentifier() + "_" + i); - ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent, - outputContext, i, indexCacheList.get(i), partitions, - sendEmptyPartitionDetails, pathComponent); - LOG.info("Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + i); + String pathComponent = (outputContext.getUniqueIdentifier() + "_" + i); + ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent, + outputContext, i, indexCacheList.get(i), partitions, + sendEmptyPartitionDetails, pathComponent); + LOG.info("Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + i); + } + outputContext.sendEvents(events); + //No need to generate final merge + return; } - outputContext.sendEvents(events); - //No need to generate final merge - return; - } - //In case final merge is required, the following code path is executed. - if(numSpills == 1) { - // someday be able to pass this directly to shuffle - // without writing to disk - final Path filename = spillFilePaths.get(0); - final Path indexFilename = spillFileIndexPaths.get(0); - finalOutputFile = mapOutputFile.getOutputFileForWriteInVolume(filename); - finalIndexFile = mapOutputFile.getOutputIndexFileForWriteInVolume(indexFilename); - - sameVolRename(filename, finalOutputFile); - sameVolRename(indexFilename, finalIndexFile); - if (LOG.isInfoEnabled()) { - LOG.info("numSpills=" + numSpills + ", finalOutputFile=" + finalOutputFile + ", " - + "finalIndexFile=" + finalIndexFile + ", filename=" + filename + ", indexFilename=" + - indexFilename); + //In case final merge is required, the following code path is executed. + if (numSpills == 1) { + // someday be able to pass this directly to shuffle + // without writing to disk + final Path filename = spillFilePaths.get(0); + final Path indexFilename = spillFileIndexPaths.get(0); + finalOutputFile = mapOutputFile.getOutputFileForWriteInVolume(filename); + finalIndexFile = mapOutputFile.getOutputIndexFileForWriteInVolume(indexFilename); + + sameVolRename(filename, finalOutputFile); + sameVolRename(indexFilename, finalIndexFile); + if (LOG.isInfoEnabled()) { + LOG.info("numSpills=" + numSpills + ", finalOutputFile=" + finalOutputFile + ", " + + "finalIndexFile=" + finalIndexFile + ", filename=" + filename + ", indexFilename=" + + indexFilename); + } + return; } - return; - } - finalOutputFile = - mapOutputFile.getOutputFileForWrite(0); //TODO - finalIndexFile = - mapOutputFile.getOutputIndexFileForWrite(0); //TODO + finalOutputFile = + mapOutputFile.getOutputFileForWrite(0); //TODO + finalIndexFile = + mapOutputFile.getOutputIndexFileForWrite(0); //TODO - if (LOG.isDebugEnabled()) { - LOG.debug("numSpills: " + numSpills + ", finalOutputFile:" + finalOutputFile + ", finalIndexFile:" - + finalIndexFile); - } + if (LOG.isDebugEnabled()) { + LOG.debug( + "numSpills: " + numSpills + ", finalOutputFile:" + finalOutputFile + ", finalIndexFile:" + + finalIndexFile); + } - //The output stream for the final single output file - FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096); + //The output stream for the final single output file + FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096); - final TezSpillRecord spillRec = new TezSpillRecord(partitions); + final TezSpillRecord spillRec = new TezSpillRecord(partitions); + for (int parts = 0; parts < partitions; parts++) { + //create the segments to be merged + List<Segment> segmentList = + new ArrayList<Segment>(numSpills); + for (int i = 0; i < numSpills; i++) { + Path spillFilename = spillFilePaths.get(i); + TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts); - for (int parts = 0; parts < partitions; parts++) { - //create the segments to be merged - List<Segment> segmentList = - new ArrayList<Segment>(numSpills); - for(int i = 0; i < numSpills; i++) { - Path spillFilename = spillFilePaths.get(i); - TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts); + Segment s = + new Segment(rfs, spillFilename, indexRecord.getStartOffset(), + indexRecord.getPartLength(), codec, ifileReadAhead, + ifileReadAheadLength, ifileBufferSize, true); + segmentList.add(i, s); + } - Segment s = - new Segment(rfs, spillFilename, indexRecord.getStartOffset(), - indexRecord.getPartLength(), codec, ifileReadAhead, - ifileReadAheadLength, ifileBufferSize, true); - segmentList.add(i, s); - } + int mergeFactor = + this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR, + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR_DEFAULT); + // sort the segments only if there are intermediate merges + boolean sortSegments = segmentList.size() > mergeFactor; + //merge + TezRawKeyValueIterator kvIter = TezMerger.merge(conf, rfs, + keyClass, valClass, codec, + segmentList, mergeFactor, + new Path(uniqueIdentifier), + (RawComparator) ConfigUtils.getIntermediateOutputKeyComparator(conf), + nullProgressable, sortSegments, true, + null, spilledRecordsCounter, null, + null); // Not using any Progress in TezMerger. Should just work. - int mergeFactor = - this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR, - TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR_DEFAULT); - // sort the segments only if there are intermediate merges - boolean sortSegments = segmentList.size() > mergeFactor; - //merge - TezRawKeyValueIterator kvIter = TezMerger.merge(conf, rfs, - keyClass, valClass, codec, - segmentList, mergeFactor, - new Path(uniqueIdentifier), - (RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(conf), - nullProgressable, sortSegments, true, - null, spilledRecordsCounter, null, - null); // Not using any Progress in TezMerger. Should just work. - - //write merged output to disk - long segmentStart = finalOut.getPos(); - Writer writer = - new Writer(conf, finalOut, keyClass, valClass, codec, - spilledRecordsCounter, null, merger.needsRLE()); - if (combiner == null || numSpills < minSpillsForCombine) { - TezMerger.writeFile(kvIter, writer, nullProgressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT); - } else { - runCombineProcessor(kvIter, writer); - } + //write merged output to disk + long segmentStart = finalOut.getPos(); + Writer writer = + new Writer(conf, finalOut, keyClass, valClass, codec, + spilledRecordsCounter, null, merger.needsRLE()); + if (combiner == null || numSpills < minSpillsForCombine) { + TezMerger.writeFile(kvIter, writer, nullProgressable, + TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT); + } else { + runCombineProcessor(kvIter, writer); + } - //close - writer.close(); + //close + writer.close(); - // record offsets - final TezIndexRecord rec = - new TezIndexRecord( - segmentStart, - writer.getRawLength(), - writer.getCompressedLength()); - spillRec.putIndex(rec, parts); - } + // record offsets + final TezIndexRecord rec = + new TezIndexRecord( + segmentStart, + writer.getRawLength(), + writer.getCompressedLength()); + spillRec.putIndex(rec, parts); + } - spillRec.writeToFile(finalIndexFile, conf); - finalOut.close(); - for(int i = 0; i < numSpills; i++) { - Path indexFilename = spillFileIndexPaths.get(i); - Path spillFilename = spillFilePaths.get(i); - rfs.delete(indexFilename,true); - rfs.delete(spillFilename,true); - } + spillRec.writeToFile(finalIndexFile, conf); + finalOut.close(); + for (int i = 0; i < numSpills; i++) { + Path indexFilename = spillFileIndexPaths.get(i); + Path spillFilename = spillFilePaths.get(i); + rfs.delete(indexFilename, true); + rfs.delete(spillFilename, true); + } - spillFileIndexPaths.clear(); - spillFilePaths.clear(); + spillFileIndexPaths.clear(); + spillFilePaths.clear(); + } catch(InterruptedException ie) { + if (cleanup) { + cleanup(); + } + Thread.currentThread().interrupt(); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/437b5715/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java index 758e9c7..3b7bf05 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java @@ -76,7 +76,7 @@ public class TezMerger { TezCounter writesCounter, TezCounter bytesReadCounter, Progress mergePhase) - throws IOException { + throws IOException, InterruptedException { return new MergeQueue(conf, fs, inputs, deleteInputs, codec, ifileReadAhead, ifileReadAheadLength, ifileBufferSize, false, comparator, @@ -101,7 +101,7 @@ public class TezMerger { TezCounter mergedMapOutputsCounter, TezCounter bytesReadCounter, Progress mergePhase) - throws IOException { + throws IOException, InterruptedException { return new MergeQueue(conf, fs, inputs, deleteInputs, codec, ifileReadAhead, ifileReadAheadLength, ifileBufferSize, false, comparator, @@ -124,7 +124,7 @@ public class TezMerger { TezCounter writesCounter, TezCounter bytesReadCounter, Progress mergePhase) - throws IOException { + throws IOException, InterruptedException { // Get rid of this ? return merge(conf, fs, keyClass, valueClass, segments, mergeFactor, tmpDir, comparator, reporter, false, readsCounter, writesCounter, bytesReadCounter, @@ -142,7 +142,7 @@ public class TezMerger { TezCounter writesCounter, TezCounter bytesReadCounter, Progress mergePhase) - throws IOException { + throws IOException, InterruptedException { return new MergeQueue(conf, fs, segments, comparator, reporter, sortSegments, false).merge(keyClass, valueClass, mergeFactor, tmpDir, @@ -163,7 +163,7 @@ public class TezMerger { TezCounter writesCounter, TezCounter bytesReadCounter, Progress mergePhase) - throws IOException { + throws IOException, InterruptedException { return new MergeQueue(conf, fs, segments, comparator, reporter, sortSegments, codec, considerFinalMergeForProgress). merge(keyClass, valueClass, @@ -185,7 +185,7 @@ public class TezMerger { TezCounter writesCounter, TezCounter bytesReadCounter, Progress mergePhase) - throws IOException { + throws IOException, InterruptedException { return new MergeQueue(conf, fs, segments, comparator, reporter, sortSegments, codec, false).merge(keyClass, valueClass, mergeFactor, inMemSegments, @@ -196,9 +196,9 @@ public class TezMerger { } public static <K extends Object, V extends Object> - void writeFile(TezRawKeyValueIterator records, Writer writer, - Progressable progressable, long recordsBeforeProgress) - throws IOException { + void writeFile(TezRawKeyValueIterator records, Writer writer, + Progressable progressable, long recordsBeforeProgress) + throws IOException, InterruptedException { long recordCtr = 0; long count = 0; while(records.next()) { @@ -211,6 +211,15 @@ public class TezMerger { if (((recordCtr++) % recordsBeforeProgress) == 0) { progressable.progress(); + if (Thread.currentThread().isInterrupted()) { + /** + * Takes care DefaultSorter.mergeParts, MergeManager's merger threads, + * PipelinedSorter's flush(). This is not expensive check as it is carried out every + * 10000 records or so. + */ + throw new InterruptedException("Current thread=" + Thread.currentThread().getName() + " got " + + "interrupted"); + } } } if ((count > 0) && LOG.isDebugEnabled()) { @@ -614,7 +623,7 @@ public class TezMerger { TezCounter writesCounter, TezCounter bytesReadCounter, Progress mergePhase) - throws IOException { + throws IOException, InterruptedException { return merge(keyClass, valueClass, factor, 0, tmpDir, readsCounter, writesCounter, bytesReadCounter, mergePhase); } @@ -625,7 +634,7 @@ public class TezMerger { TezCounter writesCounter, TezCounter bytesReadCounter, Progress mergePhase) - throws IOException { + throws IOException, InterruptedException { LOG.info("Merging " + segments.size() + " sorted segments"); if (segments.size() == 0) { LOG.info("Nothing to merge. Returning an empty iterator"); http://git-wip-us.apache.org/repos/asf/tez/blob/437b5715/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java index 2cbb70a..9783c79 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java @@ -193,6 +193,9 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { spillDone.await(); } } catch (InterruptedException e) { + //interrupt spill thread + spillThread.interrupt(); + Thread.currentThread().interrupt(); throw new IOException("Spill thread failed to initialize", e); } finally { spillLock.unlock(); @@ -603,6 +606,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { spillDone.await(); } } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new IOException( "Buffer interrupted while waiting for the writer", e); } @@ -625,9 +629,45 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { } } + void interruptSpillThread() throws IOException { + assert !spillLock.isHeldByCurrentThread(); + // shut down spill thread and wait for it to exit. Since the preceding + // ensures that it is finished with its work (and sortAndSpill did not + // throw), we elect to use an interrupt instead of setting a flag. + // Spilling simultaneously from this thread while the spill thread + // finishes its work might be both a useful way to extend this and also + // sufficient motivation for the latter approach. + try { + spillThread.interrupt(); + spillThread.join(); + } catch (InterruptedException e) { + LOG.info("Spill thread interrupted"); + //Reset status + Thread.currentThread().interrupt(); + throw new IOException("Spill failed", e); + } + } + @Override public void flush() throws IOException { LOG.info("Starting flush of map output"); + if (Thread.currentThread().isInterrupted()) { + /** + * Possible that the thread got interrupted when flush was happening or when the flush was + * never invoked. As a part of cleanup activity in TezTaskRunner, it would invoke close() + * on all I/O. At that time, this is safe to cleanup + */ + if (cleanup) { + cleanup(); + } + try { + interruptSpillThread(); + } catch(IOException e) { + //safe to ignore + } + return; + } + spillLock.lock(); try { while (spillInProgress) { @@ -656,28 +696,25 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { sortAndSpill(); } } catch (InterruptedException e) { + //Reset status + Thread.currentThread().interrupt(); + interruptSpillThread(); throw new IOException("Interrupted while waiting for the writer", e); } finally { spillLock.unlock(); } - assert !spillLock.isHeldByCurrentThread(); - // shut down spill thread and wait for it to exit. Since the preceding - // ensures that it is finished with its work (and sortAndSpill did not - // throw), we elect to use an interrupt instead of setting a flag. - // Spilling simultaneously from this thread while the spill thread - // finishes its work might be both a useful way to extend this and also - // sufficient motivation for the latter approach. - try { - spillThread.interrupt(); - spillThread.join(); - } catch (InterruptedException e) { - throw new IOException("Spill failed", e); - } - // release sort buffer before the merge + + interruptSpillThread(); + // release sort buffer before the mergecl //FIXME //kvbuffer = null; - mergeParts(); + try { + mergeParts(); + } catch (InterruptedException e) { + cleanup(); + Thread.currentThread().interrupt(); + } if (finalMergeEnabled) { fileOutputByteCounter.increment(rfs.getFileStatus(finalOutputFile).getLen()); } @@ -715,6 +752,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { } } } catch (InterruptedException e) { + LOG.info("Spill thread interrupted"); Thread.currentThread().interrupt(); } finally { spillLock.unlock(); @@ -1085,7 +1123,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { outputContext.sendEvents(events); } - private void mergeParts() throws IOException { + private void mergeParts() throws IOException, InterruptedException { // get the approximate size of the final output/index files long finalOutFileSize = 0; long finalIndexFileSize = 0; http://git-wip-us.apache.org/repos/asf/tez/blob/437b5715/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java index e61dbdc..d905ed7 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java @@ -349,6 +349,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput { confKeys.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH); confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH); confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS); + confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT); } // TODO Maybe add helper methods to extract keys http://git-wip-us.apache.org/repos/asf/tez/blob/437b5715/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java index ce27103..12312bf 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java @@ -265,6 +265,7 @@ public class UnorderedKVInput extends AbstractLogicalInput { confKeys.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH); confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH); confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS); + confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT); } // TODO Maybe add helper methods to extract keys http://git-wip-us.apache.org/repos/asf/tez/blob/437b5715/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java index 40edc76..cc28d9f 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java @@ -240,6 +240,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput { confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH); confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS); + confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT); } // TODO Maybe add helper methods to extract keys http://git-wip-us.apache.org/repos/asf/tez/blob/437b5715/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java index 2c26374..9bb490d 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java @@ -162,6 +162,7 @@ public class UnorderedKVOutput extends AbstractLogicalOutput { confKeys.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH); confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH); confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS); + confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT); } // TODO Maybe add helper methods to extract keys http://git-wip-us.apache.org/repos/asf/tez/blob/437b5715/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java index 34f2e3e..e274895 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java @@ -135,6 +135,7 @@ public class UnorderedPartitionedKVOutput extends AbstractLogicalOutput { confKeys.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH); confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH); confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS); + confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT); } // TODO Maybe add helper methods to extract keys http://git-wip-us.apache.org/repos/asf/tez/blob/437b5715/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java index c483a81..a4866da 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java @@ -167,19 +167,19 @@ public class TestValuesIterator { } @Test(timeout = 20000) - public void testIteratorWithInMemoryReader() throws IOException { + public void testIteratorWithInMemoryReader() throws IOException, InterruptedException { ValuesIterator iterator = createIterator(true); verifyIteratorData(iterator); } @Test(timeout = 20000) - public void testIteratorWithIFileReader() throws IOException { + public void testIteratorWithIFileReader() throws IOException, InterruptedException { ValuesIterator iterator = createIterator(false); verifyIteratorData(iterator); } @Test(timeout = 20000) - public void testIteratorWithIFileReaderEmptyPartitions() throws IOException { + public void testIteratorWithIFileReaderEmptyPartitions() throws IOException, InterruptedException { ValuesIterator iterator = createEmptyIterator(false); assert(iterator.moveToNext() == false); @@ -187,7 +187,8 @@ public class TestValuesIterator { assert(iterator.moveToNext() == false); } - private ValuesIterator createEmptyIterator(boolean inMemory) throws IOException { + private ValuesIterator createEmptyIterator(boolean inMemory) + throws IOException, InterruptedException { if (!inMemory) { streamPaths = new Path[0]; //This will return EmptyIterator @@ -266,7 +267,7 @@ public class TestValuesIterator { * @return ValuesIterator * @throws IOException */ - private ValuesIterator createIterator(boolean inMemory) throws IOException { + private ValuesIterator createIterator(boolean inMemory) throws IOException, InterruptedException { if (!inMemory) { streamPaths = createFiles(); //Merge all files to get KeyValueIterator http://git-wip-us.apache.org/repos/asf/tez/blob/437b5715/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java index 094237a..0faa22a 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java @@ -28,6 +28,7 @@ import java.util.LinkedList; import java.util.List; import java.util.UUID; +import com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -166,9 +167,32 @@ public class TestMergeManager { Assert.assertTrue(mergeManager.postMergeMemLimit == initialMemoryAvailable); } + class InterruptingThread implements Runnable { + + MergeManager.OnDiskMerger mergeThread; + + public InterruptingThread(MergeManager.OnDiskMerger mergeThread) { + this.mergeThread = mergeThread; + } + + @Override public void run() { + while(this.mergeThread.tmpDir == null) { + //this is tight loop + } + + this.mergeThread.interrupt(); + } + } + @Test(timeout = 10000) - public void testLocalDiskMergeMultipleTasks() throws IOException { + public void testLocalDiskMergeMultipleTasks() throws IOException, InterruptedException { + testLocalDiskMergeMultipleTasks(false); + testLocalDiskMergeMultipleTasks(true); + } + + void testLocalDiskMergeMultipleTasks(boolean interruptInMiddle) + throws IOException, InterruptedException { Configuration conf = new TezConfiguration(defaultConf); conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, false); conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, IntWritable.class.getName()); @@ -194,6 +218,7 @@ public class TestMergeManager { new MergeManager(conf, localFs, localDirAllocator, t0inputContext, null, null, null, null, t0exceptionReporter, 2000000, null, false, -1); MergeManager t0mergeManager = spy(t0mergeManagerReal); + t0mergeManager.configureAndStart(); MergeManager t1mergeManagerReal = new MergeManager(conf, localFs, localDirAllocator, t1inputContext, null, null, null, null, @@ -249,30 +274,48 @@ public class TestMergeManager { List<FileChunk> t0MergeFiles = new LinkedList<FileChunk>(); t0MergeFiles.addAll(t0mergeManager.onDiskMapOutputs); t0mergeManager.onDiskMapOutputs.clear(); - t0mergeManager.onDiskMerger.merge(t0MergeFiles); - Assert.assertEquals(1, t0mergeManager.onDiskMapOutputs.size()); - - - t1MapOutput0.commit(); - t1MapOutput1.commit(); - verify(t1mergeManager).closeOnDiskFile(t1MapOutput0.getOutputPath()); - verify(t1mergeManager).closeOnDiskFile(t1MapOutput1.getOutputPath()); - // Run the OnDiskMerge via MergeManager - // Simulate the thread invocation - remove files, and invoke merge - List<FileChunk> t1MergeFiles = new LinkedList<FileChunk>(); - t1MergeFiles.addAll(t1mergeManager.onDiskMapOutputs); - t1mergeManager.onDiskMapOutputs.clear(); - t1mergeManager.onDiskMerger.merge(t1MergeFiles); - Assert.assertEquals(1, t1mergeManager.onDiskMapOutputs.size()); - Assert.assertNotEquals(t0mergeManager.onDiskMapOutputs.iterator().next().getPath(), - t1mergeManager.onDiskMapOutputs.iterator().next().getPath()); + if (!interruptInMiddle) { + t0mergeManager.onDiskMerger.merge(t0MergeFiles); + Assert.assertEquals(1, t0mergeManager.onDiskMapOutputs.size()); + } else { + + //Start Interrupting thread + Thread interruptingThread = new Thread(new InterruptingThread(t0mergeManager.onDiskMerger)); + interruptingThread.start(); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } - Assert.assertTrue(t0mergeManager.onDiskMapOutputs.iterator().next().getPath().toString() - .contains(t0inputContext.getUniqueIdentifier())); - Assert.assertTrue(t1mergeManager.onDiskMapOutputs.iterator().next().getPath().toString() - .contains(t1inputContext.getUniqueIdentifier())); + //Will be interrupted in the middle by interruptingThread. + t0mergeManager.onDiskMerger.startMerge(Sets.newHashSet(t0MergeFiles)); + t0mergeManager.onDiskMerger.waitForMerge(); + Assert.assertNotEquals(1, t0mergeManager.onDiskMapOutputs.size()); + } + if (!interruptInMiddle) { + t1MapOutput0.commit(); + t1MapOutput1.commit(); + verify(t1mergeManager).closeOnDiskFile(t1MapOutput0.getOutputPath()); + verify(t1mergeManager).closeOnDiskFile(t1MapOutput1.getOutputPath()); + // Run the OnDiskMerge via MergeManager + // Simulate the thread invocation - remove files, and invoke merge + List<FileChunk> t1MergeFiles = new LinkedList<FileChunk>(); + t1MergeFiles.addAll(t1mergeManager.onDiskMapOutputs); + t1mergeManager.onDiskMapOutputs.clear(); + t1mergeManager.onDiskMerger.merge(t1MergeFiles); + Assert.assertEquals(1, t1mergeManager.onDiskMapOutputs.size()); + + Assert.assertNotEquals(t0mergeManager.onDiskMapOutputs.iterator().next().getPath(), + t1mergeManager.onDiskMapOutputs.iterator().next().getPath()); + + Assert.assertTrue(t0mergeManager.onDiskMapOutputs.iterator().next().getPath().toString() + .contains(t0inputContext.getUniqueIdentifier())); + Assert.assertTrue(t1mergeManager.onDiskMapOutputs.iterator().next().getPath().toString() + .contains(t1inputContext.getUniqueIdentifier())); + } } private InputContext createMockInputContext(String uniqueId) { http://git-wip-us.apache.org/repos/asf/tez/blob/437b5715/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java index bb932f2..b86d054 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java @@ -557,7 +557,8 @@ public class TestTezMerger { * @return * @throws IOException */ - private TezRawKeyValueIterator merge(List<Path> pathList, RawComparator rc) throws IOException { + private TezRawKeyValueIterator merge(List<Path> pathList, RawComparator rc) + throws IOException, InterruptedException { TezMerger merger = new TezMerger(); TezRawKeyValueIterator records = merger.merge(defaultConf, localFs, IntWritable.class, LongWritable.class, null, false, 0, 1024, pathList.toArray(new Path[pathList.size()]),
