TEZ-1094. Support pipelined data transfer for Unordered Output (rbalamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e0e19122 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e0e19122 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e0e19122 Branch: refs/heads/TEZ-2003 Commit: e0e19122f6b0371eb3bb85e1236dd139778198eb Parents: 59529ab Author: Rajesh Balamohan <[email protected]> Authored: Thu Mar 12 14:37:09 2015 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Thu Mar 12 14:37:49 2015 +0530 ---------------------------------------------------------------------- .../library/api/TezRuntimeConfiguration.java | 24 +- .../runtime/library/common/shuffle/Fetcher.java | 6 +- .../impl/ShuffleInputEventHandlerImpl.java | 38 ++- .../common/shuffle/impl/ShuffleManager.java | 234 +++++++++++-- .../orderedgrouped/ShuffleScheduler.java | 8 +- .../common/sort/impl/PipelinedSorter.java | 5 +- .../common/sort/impl/dflt/DefaultSorter.java | 4 +- .../writers/UnorderedPartitionedKVWriter.java | 341 ++++++++++++++----- .../output/OrderedPartitionedKVOutput.java | 8 +- .../output/UnorderedPartitionedKVOutput.java | 1 + .../common/sort/impl/TestPipelinedSorter.java | 3 +- .../sort/impl/dflt/TestDefaultSorter.java | 7 +- .../TestUnorderedPartitionedKVWriter.java | 259 +++++++++++++- .../TestUnorderedPartitionedKVOutputConfig.java | 9 + .../library/output/TestOnFileSortedOutput.java | 6 +- 15 files changed, 777 insertions(+), 176 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/e0e19122/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java index f2ab382..565b47a 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java @@ -280,25 +280,25 @@ public class TezRuntimeConfiguration { */ public static final boolean TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH_DEFAULT = false; - //TODO: Change description when we start supporting pipelined shuffle in unordered cases /** - * Enable pipelined shuffle in ordered producer/consumer. Expert knob. - * Works only with PipelinedSorter. Set tez.runtime.sort.threads > 2 for enabling - * PipelinedSorter. Ensure to set tez.runtime.disable.final-merge.in.sorter=true. + * Expert level setting. Enable pipelined shuffle in ordered outputs and in unordered + * partitioned outputs. In ordered cases, it works with PipelinedSorter. + * set tez.runtime.sort.threads to greater than 1 to enable pipelinedsorter. + * Ensure to set tez.runtime.enable.final-merge.in.output=false. * Speculative execution needs to be turned off when using this parameter. //TODO: TEZ-2132 */ public static final String TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED = - TEZ_RUNTIME_PREFIX + - "pipelined-shuffle.enabled"; + TEZ_RUNTIME_PREFIX + "pipelined-shuffle.enabled"; public static final boolean TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED_DEFAULT = false; /** - * final merge in defaultsorter/pipelinedsorter. - * speculative execution needs to be turned off when disabling this parameter. //TODO: TEZ-2132 + * Expert level setting. Enable final merge in ordered (defaultsorter/pipelinedsorter) outputs. + * Speculative execution needs to be turned off when disabling this parameter. //TODO: TEZ-2132 */ - public static final String TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER = - TEZ_RUNTIME_PREFIX + "enable.final-merge.in.sorter"; - public static final boolean TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER_DEFAULT = true; + public static final String TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT = + TEZ_RUNTIME_PREFIX + "enable.final-merge.in.output"; + public static final boolean TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT_DEFAULT = true; + /** * Share data fetched between tasks running on the same host if applicable @@ -368,7 +368,7 @@ public class TezRuntimeConfiguration { tezRuntimeKeys.add(TEZ_RUNTIME_KEY_SECONDARY_COMPARATOR_CLASS); tezRuntimeKeys.add(TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED); tezRuntimeKeys.add(TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED); - tezRuntimeKeys.add(TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER); + tezRuntimeKeys.add(TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT); tezRuntimeKeys.add(TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED); tezRuntimeKeys.add(TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE); tezRuntimeKeys.add(TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS); http://git-wip-us.apache.org/repos/asf/tez/blob/e0e19122/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java index 30dad46..3661361 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java @@ -30,7 +30,6 @@ import java.nio.channels.OverlappingFileLockException; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; -import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -104,7 +103,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { // Maps from the pathComponents (unique per srcTaskId) to the specific taskId private final Map<String, InputAttemptIdentifier> pathToAttemptMap; - private LinkedHashSet<InputAttemptIdentifier> remaining; + private List<InputAttemptIdentifier> remaining; private URL url; private volatile DataInputStream input; @@ -181,7 +180,8 @@ public class Fetcher extends CallableWithNdc<FetchResult> { + "- partition is non-zero (%d)", partition); } - remaining = new LinkedHashSet<InputAttemptIdentifier>(srcAttempts); + //Similar to TEZ-2172 (remove can be expensive with list) + remaining = new LinkedList<InputAttemptIdentifier>(srcAttempts); HostFetchResult hostFetchResult; http://git-wip-us.apache.org/repos/asf/tez/blob/e0e19122/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java index ffcd128..c4d6ce3 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java @@ -32,10 +32,12 @@ import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.Input; import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.api.events.InputFailedEvent; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; +import org.apache.tez.runtime.library.common.InputIdentifier; import org.apache.tez.runtime.library.common.shuffle.DiskFetchedInput; import org.apache.tez.runtime.library.common.shuffle.FetchedInput; import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator; @@ -112,8 +114,8 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler { .getEmptyPartitions()); BitSet emptyPartionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions); if (emptyPartionsBitSet.get(srcIndex)) { - InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(dme.getTargetIndex(), - dme.getVersion()); + InputAttemptIdentifier srcAttemptIdentifier = + constructInputAttemptIdentifier(dme, shufflePayload, false); if (LOG.isDebugEnabled()) { LOG.debug("Source partition: " + srcIndex + " did not generate any data. SrcAttempt: [" + srcAttemptIdentifier + "]. Not fetching."); @@ -123,9 +125,8 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler { } } - InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier( - dme.getTargetIndex(), dme.getVersion(), - shufflePayload.getPathComponent(), (useSharedInputs && srcIndex == 0)); + InputAttemptIdentifier srcAttemptIdentifier = constructInputAttemptIdentifier(dme, + shufflePayload, (useSharedInputs && srcIndex == 0)); if (shufflePayload.hasData()) { DataProto dataProto = shufflePayload.getData(); @@ -166,5 +167,32 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler { shuffleManager.obsoleteKnownInput(srcAttemptIdentifier); } + /** + * Helper method to create InputAttemptIdentifier + * + * @param dmEvent + * @param shufflePayload + * @return InputAttemptIdentifier + */ + private InputAttemptIdentifier constructInputAttemptIdentifier(DataMovementEvent dmEvent, + DataMovementEventPayloadProto shufflePayload, boolean isShared) { + String pathComponent = (shufflePayload.hasPathComponent()) ? shufflePayload.getPathComponent() : null; + InputAttemptIdentifier srcAttemptIdentifier = null; + if (shufflePayload.hasSpillId()) { + int spillEventId = shufflePayload.getSpillId(); + boolean lastEvent = shufflePayload.getLastEvent(); + InputAttemptIdentifier.SPILL_INFO spillInfo = (lastEvent) ? InputAttemptIdentifier.SPILL_INFO + .FINAL_UPDATE : InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE; + srcAttemptIdentifier = + new InputAttemptIdentifier(new InputIdentifier(dmEvent.getTargetIndex()), dmEvent + .getVersion(), pathComponent, isShared, spillInfo, spillEventId); + } else { + srcAttemptIdentifier = + new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent.getVersion(), + pathComponent, isShared); + } + return srcAttemptIdentifier; + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/e0e19122/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index fc42e3d..d2e9682 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -23,16 +23,18 @@ import java.io.InputStream; import java.io.OutputStream; import java.text.DecimalFormat; import java.util.Arrays; +import java.util.BitSet; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -41,6 +43,8 @@ import java.util.concurrent.locks.ReentrantLock; import javax.crypto.SecretKey; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -115,7 +119,8 @@ public class ShuffleManager implements FetcherCallback { private Set<Fetcher> runningFetchers; private final AtomicInteger numCompletedInputs = new AtomicInteger(0); - + private final AtomicInteger numFetchedSpills = new AtomicInteger(0); + private final long startTime; private long lastProgressTime; private long totalBytesShuffledTillNow; @@ -161,6 +166,10 @@ public class ShuffleManager implements FetcherCallback { private final TezCounter firstEventReceived; private final TezCounter lastEventReceived; + //To track shuffleInfo events when finalMerge is disabled OR pipelined shuffle is enabled in source. + @VisibleForTesting + final Map<InputAttemptIdentifier, ShuffleEventInfo> shuffleInfoEventsMap = Maps.newHashMap(); + // TODO More counters - FetchErrors, speed? public ShuffleManager(InputContext inputContext, Configuration conf, int numInputs, @@ -194,7 +203,11 @@ public class ShuffleManager implements FetcherCallback { this.srcNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName()); completedInputSet = Collections.newSetFromMap(new ConcurrentHashMap<InputIdentifier, Boolean>(numInputs)); - completedInputs = new LinkedBlockingQueue<FetchedInput>(numInputs); + /** + * In case of pipelined shuffle, it is possible to get multiple FetchedInput per attempt. + * We do not know upfront the number of spills from source. + */ + completedInputs = new LinkedBlockingDeque<FetchedInput>(); knownSrcHosts = new ConcurrentHashMap<String, InputHost>(); pendingHosts = new LinkedBlockingQueue<InputHost>(); obsoletedInputs = Collections.newSetFromMap(new ConcurrentHashMap<InputAttemptIdentifier, Boolean>()); @@ -370,6 +383,17 @@ public class ShuffleManager implements FetcherCallback { for (Iterator<InputAttemptIdentifier> inputIter = pendingInputsForHost .iterator(); inputIter.hasNext();) { InputAttemptIdentifier input = inputIter.next(); + + //For pipelined shuffle. + //TODO: TEZ-2132 for error handling. As of now, fail fast if there is a different attempt + if (input.canRetrieveInputInChunks() && input.getAttemptNumber() > 0) { + //speculative attempts or failure attempts. Fail fast here. + reportFatalError(new IOException(), input + " already exists. " + + "Previous attempt's data could have been already merged " + + "to memory/disk outputs. Failing the fetch early instead of adding to fetcher"); + continue; + } + // Avoid adding attempts which have already completed. if (completedInputSet.contains(input.getInputIdentifier())) { inputIter.remove(); @@ -406,6 +430,21 @@ public class ShuffleManager implements FetcherCallback { if (LOG.isDebugEnabled()) { LOG.debug("Adding input: " + srcAttemptIdentifier + ", to host: " + host); } + + if (srcAttemptIdentifier.canRetrieveInputInChunks()) { + //TODO: need to check for speculative tasks later. TEZ-2132 + if (srcAttemptIdentifier.getAttemptNumber() > 0) { + //speculative attempts or failure attempts. Fail fast here. + reportFatalError(new IOException(), srcAttemptIdentifier + " already exists. " + + "Previous attempt's data could have been already merged " + + "to memory/disk outputs. Failing the fetch early instead of adding to addKnownInput"); + return; + } + if (shuffleInfoEventsMap.get(srcAttemptIdentifier) == null) { + shuffleInfoEventsMap.put(srcAttemptIdentifier, new ShuffleEventInfo(srcAttemptIdentifier)); + } + } + host.addKnownInput(srcAttemptIdentifier); lock.lock(); try { @@ -429,7 +468,12 @@ public class ShuffleManager implements FetcherCallback { if (!completedInputSet.contains(inputIdentifier)) { synchronized (completedInputSet) { if (!completedInputSet.contains(inputIdentifier)) { - registerCompletedInput(new NullFetchedInput(srcAttemptIdentifier)); + NullFetchedInput fetchedInput = new NullFetchedInput(srcAttemptIdentifier); + if (!srcAttemptIdentifier.canRetrieveInputInChunks()) { + registerCompletedInput(fetchedInput); + } else { + registerCompletedInputForPipelinedShuffle(srcAttemptIdentifier, fetchedInput); + } } } } @@ -474,7 +518,11 @@ public class ShuffleManager implements FetcherCallback { if (!completedInputSet.contains(inputIdentifier)) { fetchedInput.commit(); committed = true; - registerCompletedInput(fetchedInput); + if (!srcAttemptIdentifier.canRetrieveInputInChunks()) { + registerCompletedInput(fetchedInput); + } else { + registerCompletedInputForPipelinedShuffle(srcAttemptIdentifier, fetchedInput); + } } } } @@ -499,7 +547,47 @@ public class ShuffleManager implements FetcherCallback { /////////////////// End of Methods for InputEventHandler /////////////////// Methods from FetcherCallbackHandler - + + /** + * Placeholder for tracking shuffle events in case we get multiple spills info for the same + * attempt. + */ + static class ShuffleEventInfo { + BitSet eventsProcessed; + int finalEventId = -1; //0 indexed + String id; + + + ShuffleEventInfo(InputAttemptIdentifier input) { + this.id = input.getInputIdentifier().getInputIndex() + "_" + input.getAttemptNumber(); + this.eventsProcessed = new BitSet(); + } + + void spillProcessed(int spillId) { + if (finalEventId != -1) { + Preconditions.checkState(eventsProcessed.cardinality() <= (finalEventId + 1), + "Wrong state. eventsProcessed cardinality=" + eventsProcessed.cardinality() + " " + + "finalEventId=" + finalEventId + ", spillId=" + spillId + ", " + toString()); + } + eventsProcessed.set(spillId); + } + + void setFinalEventId(int spillId) { + finalEventId = spillId; + } + + boolean isDone() { + LOG.info("finalEventId=" + finalEventId + ", eventsProcessed cardinality=" + + eventsProcessed.cardinality()); + return ((finalEventId != -1) && (finalEventId + 1) == eventsProcessed.cardinality()); + } + + public String toString() { + return "[eventsProcessed=" + eventsProcessed + ", finalEventId=" + finalEventId + + ", id=" + id + "]"; + } + } + @Override public void fetchSucceeded(String host, InputAttemptIdentifier srcAttemptIdentifier, FetchedInput fetchedInput, long fetchedBytes, long decompressedLength, long copyDuration) @@ -537,7 +625,12 @@ public class ShuffleManager implements FetcherCallback { } decompressedDataSizeCounter.increment(decompressedLength); - registerCompletedInput(fetchedInput); + if (!srcAttemptIdentifier.canRetrieveInputInChunks()) { + registerCompletedInput(fetchedInput); + } else { + registerCompletedInputForPipelinedShuffle(srcAttemptIdentifier, fetchedInput); + } + lock.lock(); try { totalBytesShuffledTillNow += fetchedBytes; @@ -562,6 +655,104 @@ public class ShuffleManager implements FetcherCallback { // TODO NEWTEZ Maybe inform fetchers, in case they have an alternate attempt of the same task in their queue. } + private void registerCompletedInput(FetchedInput fetchedInput) { + lock.lock(); + try { + maybeInformInputReady(fetchedInput); + adjustCompletedInputs(fetchedInput); + numFetchedSpills.getAndIncrement(); + } finally { + lock.unlock(); + } + } + + private void maybeInformInputReady(FetchedInput fetchedInput) { + lock.lock(); + try { + completedInputs.add(fetchedInput); + if (!inputReadyNotificationSent.getAndSet(true)) { + // TODO Should eventually be controlled by Inputs which are processing the data. + inputContext.inputIsReady(); + } + } finally { + lock.unlock(); + } + } + + private void adjustCompletedInputs(FetchedInput fetchedInput) { + lock.lock(); + try { + completedInputSet.add(fetchedInput.getInputAttemptIdentifier().getInputIdentifier()); + + int numComplete = numCompletedInputs.incrementAndGet(); + if (numComplete == numInputs) { + LOG.info("All inputs fetched for input vertex : " + inputContext.getSourceVertexName()); + } + } finally { + lock.unlock(); + } + } + + private void registerCompletedInputForPipelinedShuffle(InputAttemptIdentifier + srcAttemptIdentifier, FetchedInput fetchedInput) { + /** + * For pipelinedshuffle it is possible to get multiple spills. Claim success only when + * all spills pertaining to an attempt are done. + */ + ShuffleEventInfo eventInfo = shuffleInfoEventsMap.get(srcAttemptIdentifier); + + //TODO: need to check for speculative tasks later. TEZ-2132 + if (srcAttemptIdentifier.getAttemptNumber() > 0) { + //speculative attempts or failure attempts. Fail fast here. + reportFatalError(new IOException(), "Previous event already got scheduled for " + + srcAttemptIdentifier + ". Previous attempt's data could have been already merged " + + "to memory/disk outputs. Failing the fetch early."); + return; + } + + //for empty partition case + if (eventInfo == null && fetchedInput instanceof NullFetchedInput) { + eventInfo = new ShuffleEventInfo(srcAttemptIdentifier); + shuffleInfoEventsMap.put(srcAttemptIdentifier, eventInfo); + } + + assert(eventInfo != null); + eventInfo.spillProcessed(srcAttemptIdentifier.getSpillEventId()); + numFetchedSpills.getAndIncrement(); + + if (srcAttemptIdentifier.getFetchTypeInfo() == InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE) { + eventInfo.setFinalEventId(srcAttemptIdentifier.getSpillEventId()); + } + + lock.lock(); + try { + /** + * When fetch is complete for a spill, add it to completedInputs to ensure that it is + * available for downstream processing. Final success will be claimed only when all + * spills are downloaded from the source. + */ + maybeInformInputReady(fetchedInput); + + + //check if we downloaded all spills pertaining to this InputAttemptIdentifier + if (eventInfo.isDone()) { + adjustCompletedInputs(fetchedInput); + shuffleInfoEventsMap.remove(srcAttemptIdentifier); + } + } finally { + lock.unlock(); + } + + if (LOG.isTraceEnabled()) { + LOG.trace("eventInfo " + eventInfo.toString()); + } + } + + private void reportFatalError(Throwable exception, String message) { + LOG.fatal(message); + inputContext.fatalError(exception, message); + } + @Override public void fetchFailed(String host, InputAttemptIdentifier srcAttemptIdentifier, boolean connectFailed) { @@ -572,9 +763,7 @@ public class ShuffleManager implements FetcherCallback { + connectFailed); failedShufflesCounter.increment(1); if (srcAttemptIdentifier == null) { - String message = "Received fetchFailure for an unknown src (null)"; - LOG.fatal(message); - inputContext.fatalError(null, message); + reportFatalError(null, "Received fetchFailure for an unknown src (null)"); } else { InputReadErrorEvent readError = InputReadErrorEvent.create( "Fetch failure while fetching from " @@ -584,7 +773,7 @@ public class ShuffleManager implements FetcherCallback { srcAttemptIdentifier.getAttemptNumber()), srcAttemptIdentifier.getInputIdentifier().getInputIndex(), srcAttemptIdentifier.getAttemptNumber()); - + List<Event> failedEvents = Lists.newArrayListWithCapacity(1); failedEvents.add(readError); inputContext.sendEvents(failedEvents); @@ -616,24 +805,6 @@ public class ShuffleManager implements FetcherCallback { } } - private void registerCompletedInput(FetchedInput fetchedInput) { - lock.lock(); - try { - completedInputSet.add(fetchedInput.getInputAttemptIdentifier().getInputIdentifier()); - completedInputs.add(fetchedInput); - if (!inputReadyNotificationSent.getAndSet(true)) { - // TODO Should eventually be controlled by Inputs which are processing the data. - inputContext.inputIsReady(); - } - int numComplete = numCompletedInputs.incrementAndGet(); - if (numComplete == numInputs) { - LOG.info("All inputs fetched for input vertex : " + inputContext.getSourceVertexName()); - } - } finally { - lock.unlock(); - } - } - /////////////////// Methods for walking the available inputs /** @@ -723,11 +894,12 @@ public class ShuffleManager implements FetcherCallback { private void logProgress() { double mbs = (double) totalBytesShuffledTillNow / (1024 * 1024); - int inputsDone = numInputs - numCompletedInputs.get(); + int inputsDone = numCompletedInputs.get(); long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1; double transferRate = mbs / secsSinceStart; - LOG.info("copy(" + inputsDone + " of " + numInputs + + LOG.info("copy(" + inputsDone + " (spillsFetched=" + numFetchedSpills.get() + ") of " + + numInputs + ". Transfer rate (CumulativeDataFetched/TimeSinceInputStarted)) " + mbpsFormat.format(transferRate) + " MB/s)"); } http://git-wip-us.apache.org/repos/asf/tez/blob/e0e19122/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index f09a20c..d3ee161 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -71,6 +71,7 @@ class ShuffleScheduler { private boolean[] finishedMaps; private final int numInputs; private int remainingMaps; + private int numFetchedSpills; private Map<String, MapHost> mapLocations = new HashMap<String, MapHost>(); //TODO Clean this and other maps at some point private ConcurrentMap<String, InputAttemptIdentifier> pathToIdentifierMap = new ConcurrentHashMap<String, InputAttemptIdentifier>(); @@ -200,7 +201,8 @@ class ShuffleScheduler { void spillProcessed(int spillId) { if (finalEventId != -1) { Preconditions.checkState(eventsProcessed.cardinality() <= (finalEventId + 1), - "Wrong state " + toString()); + "Wrong state. eventsProcessed cardinality=" + eventsProcessed.cardinality() + " " + + "finalEventId=" + finalEventId + ", spillId=" + spillId + ", " + toString()); } eventsProcessed.set(spillId); } @@ -260,6 +262,7 @@ class ShuffleScheduler { if (!srcAttemptIdentifier.canRetrieveInputInChunks()) { remainingMaps = remainingMaps - 1; setInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex()); + numFetchedSpills++; } else { ShuffleEventInfo eventInfo = shuffleInfoEventsMap.get(srcAttemptIdentifier); @@ -280,6 +283,7 @@ class ShuffleScheduler { assert(eventInfo != null); eventInfo.spillProcessed(srcAttemptIdentifier.getSpillEventId()); + numFetchedSpills++; if (srcAttemptIdentifier.getFetchTypeInfo() == InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE) { eventInfo.setFinalEventId(srcAttemptIdentifier.getSpillEventId()); @@ -338,7 +342,7 @@ class ShuffleScheduler { long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1; double transferRate = mbs / secsSinceStart; - LOG.info("copy(" + inputsDone + " of " + numInputs + + LOG.info("copy(" + inputsDone + " (spillsFetched=" + numFetchedSpills + ") of " + numInputs + ". Transfer rate (CumulativeDataFetched/TimeSinceInputStarted)) " + mbpsFormat.format(transferRate) + " MB/s)"); } http://git-wip-us.apache.org/repos/asf/tez/blob/e0e19122/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java index d36053c..34a7e3b 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java @@ -29,7 +29,6 @@ import java.util.LinkedList; import java.util.List; import java.util.ListIterator; import java.util.PriorityQueue; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -128,8 +127,8 @@ public class PipelinedSorter extends ExternalSorter { partitionBits = bitcount(partitions)+1; finalMergeEnabled = conf.getBoolean( - TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER, - TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER_DEFAULT); + TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, + TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT_DEFAULT); boolean confPipelinedShuffle = this.conf.getBoolean(TezRuntimeConfiguration .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, TezRuntimeConfiguration http://git-wip-us.apache.org/repos/asf/tez/blob/e0e19122/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java index f44e176..a56249d 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java @@ -143,8 +143,8 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES_DEFAULT); finalMergeEnabled = conf.getBoolean( - TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER, - TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER_DEFAULT); + TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, + TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT_DEFAULT); boolean confPipelinedShuffle = this.conf.getBoolean(TezRuntimeConfiguration .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, TezRuntimeConfiguration http://git-wip-us.apache.org/repos/asf/tez/blob/e0e19122/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java index be128a9..8b5f196 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java @@ -28,7 +28,6 @@ import java.util.BitSet; import java.util.Collections; import java.util.List; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; @@ -37,8 +36,10 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -135,6 +136,10 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit private final ReentrantLock spillLock = new ReentrantLock(); private final Condition spillInProgress = spillLock.newCondition(); + private final boolean pipelinedShuffle; + + private final long indexFileSizeEstimate; + public UnorderedPartitionedKVWriter(OutputContext outputContext, Configuration conf, int numOutputs, long availableMemoryBytes) throws IOException { super(outputContext, conf, numOutputs); @@ -173,6 +178,16 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit outputLargeRecordsCounter = outputContext.getCounters().findCounter( TaskCounter.OUTPUT_LARGE_RECORDS); + + //Not checking for TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT as it might not add much value in + // this case. Add it later if needed. + pipelinedShuffle = this.conf.getBoolean(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED_DEFAULT); + + + indexFileSizeEstimate = numPartitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH; + LOG.info("pipelinedShuffle=" + pipelinedShuffle); } private void computeNumBuffersAndSize(int bufferLimit) { @@ -219,7 +234,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit if (metaStart == 0) { // Started writing at the start of the buffer. Write Key to disk. // Key too large for any buffer. Write entire record to disk. currentBuffer.reset(); - writeLargeRecord(key, value, partition, numSpills.incrementAndGet()); + writeLargeRecord(key, value, partition); return; } else { // Exceeded length on current buffer. // Try resetting the buffer to the next one, if this was not the start of a buffer, @@ -237,7 +252,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit if (metaStart == 0) { // Key + Value too large for a single buffer. currentBuffer.reset(); - writeLargeRecord(key, value, partition, numSpills.incrementAndGet()); + writeLargeRecord(key, value, partition); return; } else { // Exceeded length on current buffer. // Try writing key+value to a new buffer - will fall back to disk if that fails. @@ -276,9 +291,11 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit pendingSpillCount.incrementAndGet(); - ListenableFuture<SpillResult> future = spillExecutor.submit(new SpillCallable(currentBuffer, - numSpills.incrementAndGet(), codec, spilledRecordsCounter, false)); - Futures.addCallback(future, new SpillCallback(numSpills.get())); + SpillPathDetails spillPathDetails = getSpillPathDetails(false, -1); + + ListenableFuture<SpillResult> future = spillExecutor.submit( + new SpillCallable(currentBuffer, codec, spilledRecordsCounter, spillPathDetails)); + Futures.addCallback(future, new SpillCallback(spillPathDetails.spillIndex)); WrappedBuffer wb = getNextAvailableBuffer(); currentBuffer = wb; @@ -317,16 +334,18 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit private final WrappedBuffer wrappedBuffer; private final CompressionCodec codec; private final TezCounter numRecordsCounter; - private final int spillNumber; - private final boolean isFinalSpill; + private final int spillIndex; + private final SpillPathDetails spillPathDetails; - public SpillCallable(WrappedBuffer wrappedBuffer, int spillNumber, CompressionCodec codec, - TezCounter numRecordsCounter, boolean isFinal) { + public SpillCallable(WrappedBuffer wrappedBuffer, CompressionCodec codec, + TezCounter numRecordsCounter, SpillPathDetails spillPathDetails) { this.wrappedBuffer = wrappedBuffer; this.codec = codec; this.numRecordsCounter = numRecordsCounter; - this.spillNumber = spillNumber; - this.isFinalSpill = isFinal; + this.spillIndex = spillPathDetails.spillIndex; + Preconditions.checkArgument(spillPathDetails.outputFilePath != null, "Spill output file " + + "path can not be null"); + this.spillPathDetails = spillPathDetails; } @Override @@ -336,18 +355,11 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit // Number of parallel spills determined by number of threads. // Last spill synchronization handled separately. SpillResult spillResult = null; - long spillSize = wrappedBuffer.nextPosition + numPartitions * APPROX_HEADER_LENGTH; - Path outPath = null; - if (isFinalSpill) { - outPath = outputFileHandler.getOutputFileForWrite(spillSize); - finalOutPath = outPath; - } else { - outPath = outputFileHandler.getSpillFileForWrite(spillNumber, spillSize); - } - FSDataOutputStream out = rfs.create(outPath); + FSDataOutputStream out = rfs.create(spillPathDetails.outputFilePath); TezSpillRecord spillRecord = new TezSpillRecord(numPartitions); DataInputBuffer key = new DataInputBuffer(); DataInputBuffer val = new DataInputBuffer(); + long compressedLength = 0; for (int i = 0; i < numPartitions; i++) { IFile.Writer writer = null; try { @@ -359,12 +371,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit writer = new Writer(conf, out, keyClass, valClass, codec, numRecordsCounter, null); writePartition(wrappedBuffer.partitionPositions[i], wrappedBuffer, writer, key, val); writer.close(); - if (isFinalSpill) { - fileOutputBytesCounter.increment(writer.getCompressedLength()); - } else { - additionalSpillBytesWritternCounter.increment(writer.getCompressedLength()); - } - spillResult = new SpillResult(writer.getCompressedLength(), this.wrappedBuffer); + compressedLength += writer.getCompressedLength(); TezIndexRecord indexRecord = new TezIndexRecord(segmentStart, writer.getRawLength(), writer.getCompressedLength()); spillRecord.putIndex(indexRecord, i); @@ -375,17 +382,14 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit } } } - if (isFinalSpill) { - long indexFileSizeEstimate = numPartitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH; - finalIndexPath = outputFileHandler.getOutputIndexFileForWrite(indexFileSizeEstimate); - spillRecord.writeToFile(finalIndexPath, conf); - fileOutputBytesCounter.increment(indexFileSizeEstimate); - LOG.info("Finished final and only spill"); - } else { - SpillInfo spillInfo = new SpillInfo(spillRecord, outPath); - spillInfoList.add(spillInfo); - numAdditionalSpillsCounter.increment(1); - LOG.info("Finished spill " + spillNumber); + spillResult = new SpillResult(compressedLength, this.wrappedBuffer); + + handleSpillIndex(spillPathDetails, spillRecord); + LOG.info("Finished spill " + spillIndex); + + if (LOG.isDebugEnabled()) { + LOG.debug("Spill=" + spillIndex + ", indexPath=" + + spillPathDetails.indexFilePath + ", outputPath=" + spillPathDetails.outputFilePath); } return spillResult; } @@ -406,7 +410,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit } public static long getInitialMemoryRequirement(Configuration conf, long maxAvailableTaskMemory) { - int initialMemRequestMb = conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB, + int initialMemRequestMb = conf.getInt( + TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB, TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB_DEFAULT); Preconditions.checkArgument(initialMemRequestMb != 0, TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB + " should be larger than 0"); @@ -443,80 +448,160 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit LOG.info("All spills complete"); // Assuming close will be called on the same thread as the write cleanup(); - if (numSpills.get() > 0) { - mergeAll(); - } else { - finalSpill(); + + List<Event> events = Lists.newLinkedList(); + if (!pipelinedShuffle) { + //Regular code path. + if (numSpills.get() > 0) { + mergeAll(); + } else { + finalSpill(); + } + cleanupCurrentBuffer(); + return Collections.singletonList(generateDMEvent()); } - currentBuffer.cleanup(); - currentBuffer = null; + //For pipelined case, send out an event in case finalspill generated a spill file. + if (finalSpill()) { + sendPipelinedEventForSpill(currentBuffer.recordsPerPartition, numSpills.get() - 1, true); + } + cleanupCurrentBuffer(); + return events; } - - return Collections.singletonList(generateEvent()); } - private void cleanup() { - if (spillExecutor != null) { - spillExecutor.shutdownNow(); - } - for (int i = 0; i < buffers.length; i++) { - if (buffers[i] != null && buffers[i] != currentBuffer) { - buffers[i].cleanup(); - buffers[i] = null; + private BitSet getEmptyPartitions(int[] recordsPerPartition) { + Preconditions.checkArgument(recordsPerPartition != null, "records per partition can not be null"); + BitSet emptyPartitions = new BitSet(); + for (int i = 0; i < numPartitions; i++) { + if (recordsPerPartition[i] == 0 ) { + emptyPartitions.set(i); } } - availableBuffers.clear(); + return emptyPartitions; + } + + private Event generateDMEvent() throws IOException { + BitSet emptyPartitions = getEmptyPartitions(numRecordsPerPartition); + return generateDMEvent(false, -1, false, outputContext.getUniqueIdentifier(), emptyPartitions); } - private Event generateEvent() throws IOException { - DataMovementEventPayloadProto.Builder payloadBuidler = DataMovementEventPayloadProto + private Event generateDMEvent(boolean addSpillDetails, int spillId, + boolean isLastSpill, String pathComponent, BitSet emptyPartitions) + throws IOException { + + DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto .newBuilder(); String host = getHost(); - int shufflePort = getShufflePort(); - - BitSet emptyPartitions = new BitSet(); - for (int i = 0; i < numPartitions; i++) { - if (numRecordsPerPartition[i] == 0) { - emptyPartitions.set(i); - } - } if (emptyPartitions.cardinality() != 0) { // Empty partitions exist - ByteString emptyPartitionsByteString = TezCommonUtils.compressByteArrayToByteString( - TezUtilsInternal - .toByteArray(emptyPartitions)); - payloadBuidler.setEmptyPartitions(emptyPartitionsByteString); + ByteString emptyPartitionsByteString = + TezCommonUtils.compressByteArrayToByteString(TezUtilsInternal.toByteArray(emptyPartitions)); + payloadBuilder.setEmptyPartitions(emptyPartitionsByteString); } + if (emptyPartitions.cardinality() != numPartitions) { // Populate payload only if at least 1 partition has data - payloadBuidler.setHost(host); - payloadBuidler.setPort(shufflePort); - payloadBuidler.setPathComponent(outputContext.getUniqueIdentifier()); + payloadBuilder.setHost(host); + payloadBuilder.setPort(getShufflePort()); + payloadBuilder.setPathComponent(pathComponent); } - CompositeDataMovementEvent cDme = CompositeDataMovementEvent.create(0, numPartitions, - payloadBuidler.build().toByteString().asReadOnlyByteBuffer()); - return cDme; + if (addSpillDetails) { + payloadBuilder.setSpillId(spillId); + payloadBuilder.setLastEvent(isLastSpill); + } + + ByteBuffer payload = payloadBuilder.build().toByteString().asReadOnlyByteBuffer(); + return CompositeDataMovementEvent.create(0, numPartitions, payload); + } + + private void cleanupCurrentBuffer() { + currentBuffer.cleanup(); + currentBuffer = null; + } + + private void cleanup() { + if (spillExecutor != null) { + spillExecutor.shutdownNow(); + } + for (int i = 0; i < buffers.length; i++) { + if (buffers[i] != null && buffers[i] != currentBuffer) { + buffers[i].cleanup(); + buffers[i] = null; + } + } + availableBuffers.clear(); } - private void finalSpill() throws IOException { + private boolean finalSpill() throws IOException { if (currentBuffer.nextPosition == 0) { - return; + if (pipelinedShuffle) { + //Send final event with all empty partitions and null path component. + BitSet emptyPartitions = new BitSet(numPartitions); + emptyPartitions.flip(0, numPartitions); + + outputContext.sendEvents( + Collections.singletonList(generateDMEvent(true, numSpills.get(), true, null, emptyPartitions))); + } + return false; } else { updateGlobalStats(currentBuffer); - SpillCallable spillCallable = new SpillCallable(currentBuffer, 0, codec, null, true); + + //setup output file and index file + SpillPathDetails spillPathDetails = getSpillPathDetails(true, -1); + SpillCallable spillCallable = new SpillCallable(currentBuffer, codec, null, spillPathDetails); try { - spillCallable.call(); + SpillResult spillResult = spillCallable.call(); + + fileOutputBytesCounter.increment(spillResult.spillSize); + fileOutputBytesCounter.increment(indexFileSizeEstimate); } catch (Exception ex) { throw (ex instanceof IOException) ? (IOException)ex : new IOException(ex); } - return; + return true; } } + /** + * Set up spill output file, index file details. + * + * @param isFinalSpill + * @param expectedSpillSize + * @return SpillPathDetails + * @throws IOException + */ + private SpillPathDetails getSpillPathDetails(boolean isFinalSpill, long expectedSpillSize) throws + IOException { + int spillNumber = numSpills.getAndIncrement(); + long spillSize = (expectedSpillSize < 0) ? + (currentBuffer.nextPosition + numPartitions * APPROX_HEADER_LENGTH) : expectedSpillSize; + + Path outputFilePath = null; + Path indexFilePath = null; + + if (!pipelinedShuffle) { + if (isFinalSpill) { + outputFilePath = outputFileHandler.getOutputFileForWrite(spillSize); + indexFilePath = outputFileHandler.getOutputIndexFileForWrite(indexFileSizeEstimate); + + //Setting this for tests + finalOutPath = outputFilePath; + finalIndexPath = indexFilePath; + } else { + outputFilePath = outputFileHandler.getSpillFileForWrite(spillNumber, spillSize); + } + } else { + outputFilePath = outputFileHandler.getSpillFileForWrite(spillNumber, spillSize); + indexFilePath = outputFileHandler.getSpillIndexFileForWrite(spillNumber, indexFileSizeEstimate); + } + + SpillPathDetails spillDetails = new SpillPathDetails(outputFilePath, indexFilePath, spillNumber); + return spillDetails; + } + private void mergeAll() throws IOException { long expectedSize = spilledSize; if (currentBuffer.nextPosition != 0) { @@ -526,9 +611,9 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit updateGlobalStats(currentBuffer); } - long indexFileSizeEstimate = numPartitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH; - finalOutPath = outputFileHandler.getOutputFileForWrite(expectedSize); - finalIndexPath = outputFileHandler.getOutputIndexFileForWrite(indexFileSizeEstimate); + SpillPathDetails spillPathDetails = getSpillPathDetails(true, expectedSize); + finalIndexPath = spillPathDetails.indexFilePath; + finalOutPath = spillPathDetails.outputFilePath; TezSpillRecord finalSpillRecord = new TezSpillRecord(numPartitions); @@ -601,17 +686,23 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit LOG.info("Finished final spill after merging : " + numSpills.get() + " spills"); } - private void writeLargeRecord(final Object key, final Object value, final int partition, - final int spillNumber) throws IOException { + private void writeLargeRecord(final Object key, final Object value, final int partition) + throws IOException { numAdditionalSpillsCounter.increment(1); long size = sizePerBuffer - (currentBuffer.numRecords * META_SIZE) - currentBuffer.skipSize + numPartitions * APPROX_HEADER_LENGTH; + SpillPathDetails spillPathDetails = getSpillPathDetails(false, size); + int spillIndex = spillPathDetails.spillIndex; FSDataOutputStream out = null; long outSize = 0; try { final TezSpillRecord spillRecord = new TezSpillRecord(numPartitions); - final Path outPath = outputFileHandler.getSpillFileForWrite(spillNumber, size); + final Path outPath = spillPathDetails.outputFilePath; out = rfs.create(outPath); + BitSet emptyPartitions = null; + if (pipelinedShuffle) { + emptyPartitions = new BitSet(numPartitions); + } for (int i = 0; i < numPartitions; i++) { final long recordStart = out.getPos(); if (i == partition) { @@ -634,11 +725,22 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit writer.close(); } } + } else { + if (emptyPartitions != null) { + emptyPartitions.set(i); + } } } - SpillInfo spillInfo = new SpillInfo(spillRecord, outPath); - spillInfoList.add(spillInfo); - LOG.info("Finished writing large record of size " + outSize + " to spill file " + spillNumber); + handleSpillIndex(spillPathDetails, spillRecord); + + sendPipelinedEventForSpill(emptyPartitions, spillIndex, false); + + LOG.info("Finished writing large record of size " + outSize + " to spill file " + spillIndex); + if (LOG.isDebugEnabled()) { + LOG.debug("LargeRecord Spill=" + spillIndex + ", indexPath=" + + spillPathDetails.indexFilePath + ", outputPath=" + + spillPathDetails.outputFilePath); + } } finally { if (out != null) { out.close(); @@ -646,6 +748,19 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit } } + private void handleSpillIndex(SpillPathDetails spillPathDetails, TezSpillRecord spillRecord) + throws IOException { + if (spillPathDetails.indexFilePath != null) { + //write the index record + spillRecord.writeToFile(spillPathDetails.indexFilePath, conf); + } else { + //add to cache + SpillInfo spillInfo = new SpillInfo(spillRecord, spillPathDetails.outputFilePath); + spillInfoList.add(spillInfo); + numAdditionalSpillsCounter.increment(1); + } + } + private class ByteArrayOutputStream extends OutputStream { private final byte[] scratch = new byte[1]; @@ -721,6 +836,29 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit private static final long serialVersionUID = 1L; } + private void sendPipelinedEventForSpill(BitSet emptyPartitions, int spillNumber, boolean isFinalUpdate) { + if (!pipelinedShuffle) { + return; + } + //Send out an event for consuming. + try { + String pathComponent = (outputContext.getUniqueIdentifier() + "_" + spillNumber); + Event compEvent = generateDMEvent(true, spillNumber, isFinalUpdate, + pathComponent, emptyPartitions); + + LOG.info("Adding spill event for spill (final update=" + isFinalUpdate + "), spillId=" + spillNumber); + outputContext.sendEvents(Collections.singletonList(compEvent)); + } catch (IOException e) { + LOG.fatal("Error in sending pipelined events", e); + outputContext.fatalError(e, "Error in sending pipelined events"); + } + } + + private void sendPipelinedEventForSpill(int[] recordsPerPartition, int spillNumber, boolean isFinalUpdate) { + BitSet emptyPartitions = getEmptyPartitions(recordsPerPartition); + sendPipelinedEventForSpill(emptyPartitions, spillNumber, isFinalUpdate); + } + private class SpillCallback implements FutureCallback<SpillResult> { private final int spillNumber; @@ -733,6 +871,9 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit public void onSuccess(SpillResult result) { LOG.info("Spill# " + spillNumber + " complete."); spilledSize += result.spillSize; + + sendPipelinedEventForSpill(result.wrappedBuffer.recordsPerPartition, spillNumber, false); + try { result.wrappedBuffer.reset(); availableBuffers.add(result.wrappedBuffer); @@ -742,6 +883,13 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit outputContext.fatalError(e, "Failure while attempting to reset buffer after spill"); } + if (!pipelinedShuffle) { + additionalSpillBytesWritternCounter.increment(result.spillSize); + } else { + fileOutputBytesCounter.increment(indexFileSizeEstimate); + fileOutputBytesCounter.increment(result.spillSize); + } + spillLock.lock(); try { if (pendingSpillCount.decrementAndGet() == 0) { @@ -800,4 +948,17 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit int shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata); return shufflePort; } + + @InterfaceAudience.Private + static class SpillPathDetails { + final Path indexFilePath; + final Path outputFilePath; + final int spillIndex; + + SpillPathDetails(Path outputFilePath, Path indexFilePath, int spillIndex) { + this.outputFilePath = outputFilePath; + this.indexFilePath = indexFilePath; + this.spillIndex = spillIndex; + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/e0e19122/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java index 0e859ba..90147ca 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java @@ -104,8 +104,8 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput { TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS_DEFAULT); finalMergeEnabled = conf.getBoolean( - TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER, - TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER_DEFAULT); + TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, + TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT_DEFAULT); pipelinedShuffle = this.conf.getBoolean(TezRuntimeConfiguration .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, TezRuntimeConfiguration @@ -121,7 +121,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput { if (pipelinedShuffle) { Preconditions.checkArgument(!finalMergeEnabled, TezRuntimeConfiguration - .TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER + " has to be set to false for pipelined " + .TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT + " has to be set to false for pipelined " + "shuffle to work properly."); //TODO: Enable it for pipelinedsorter only and not for DefaultSorter @@ -203,7 +203,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput { confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED); - confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER); + confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT); confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX); confKeys.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH); confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH); http://git-wip-us.apache.org/repos/asf/tez/blob/e0e19122/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java index 1e39535..9053507 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java @@ -120,6 +120,7 @@ public class UnorderedPartitionedKVOutput extends AbstractLogicalOutput { confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT); + confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED); confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX); confKeys.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH); confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH); http://git-wip-us.apache.org/repos/asf/tez/blob/e0e19122/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java index d0187bd..073d956 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java @@ -33,7 +33,6 @@ import java.util.UUID; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.anyList; import static org.mockito.Matchers.anyListOf; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -137,7 +136,7 @@ public class TestPipelinedSorter { this.numOutputs = 1; this.initialAvailableMem = 5 *1024 * 1024; conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 5); - conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER, false); + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false); PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, initialAvailableMem, 1<<20); http://git-wip-us.apache.org/repos/asf/tez/blob/e0e19122/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java index 875f23a..c9c215d 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java @@ -49,7 +49,6 @@ import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.MemoryUpdateCallback; import org.apache.tez.runtime.api.OutputContext; import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; -import org.apache.tez.runtime.api.events.VertexManagerEvent; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler; @@ -180,7 +179,7 @@ public class TestDefaultSorter { public void testWithEmptyDataWithFinalMergeDisabled() throws IOException { OutputContext context = createTezOutputContext(); - conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER, false); + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false); conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 1); MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler(); context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf, @@ -204,7 +203,7 @@ public class TestDefaultSorter { public void testWithSingleSpillWithFinalMergeDisabled() throws IOException { OutputContext context = createTezOutputContext(); - conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER, false); + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false); conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 4); MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler(); context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf, @@ -233,7 +232,7 @@ public class TestDefaultSorter { public void testWithMultipleSpillsWithFinalMergeDisabled() throws IOException { OutputContext context = createTezOutputContext(); - conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER, false); + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false); conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 4); conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES, 1); MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler(); http://git-wip-us.apache.org/repos/asf/tez/blob/e0e19122/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java index cb385ea..995eee6 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java @@ -22,10 +22,15 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyListOf; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.atMost; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import java.io.IOException; @@ -85,6 +90,9 @@ import org.junit.runners.Parameterized.Parameters; import com.google.common.collect.LinkedListMultimap; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; +import org.mockito.ArgumentCaptor; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; @RunWith(value = Parameterized.class) public class TestUnorderedPartitionedKVWriter { @@ -127,7 +135,7 @@ public class TestUnorderedPartitionedKVWriter { @Test(timeout = 10000) public void testBufferSizing() throws IOException { - ApplicationId appId = ApplicationId.newInstance(10000, 1); + ApplicationId appId = ApplicationId.newInstance(10000000, 1); TezCounters counters = new TezCounters(); String uniqueId = UUID.randomUUID().toString(); OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId); @@ -190,34 +198,34 @@ public class TestUnorderedPartitionedKVWriter { @Test(timeout = 10000) public void testRandomText() throws IOException, InterruptedException { - textTest(100, 10, 2048, 0, 0, 0); + textTest(100, 10, 2048, 0, 0, 0, false); } @Test(timeout = 10000) public void testLargeKeys() throws IOException, InterruptedException { - textTest(0, 10, 2048, 10, 0, 0); + textTest(0, 10, 2048, 10, 0, 0, false); } @Test(timeout = 10000) public void testLargevalues() throws IOException, InterruptedException { - textTest(0, 10, 2048, 0, 10, 0); + textTest(0, 10, 2048, 0, 10, 0, false); } @Test(timeout = 10000) public void testLargeKvPairs() throws IOException, InterruptedException { - textTest(0, 10, 2048, 0, 0, 10); + textTest(0, 10, 2048, 0, 0, 10, false); } @Test(timeout = 10000) public void testTextMixedRecords() throws IOException, InterruptedException { - textTest(100, 10, 2048, 10, 10, 10); + textTest(100, 10, 2048, 10, 10, 10, false); } public void textTest(int numRegularRecords, int numPartitions, long availableMemory, - int numLargeKeys, int numLargevalues, int numLargeKvPairs) throws IOException, + int numLargeKeys, int numLargevalues, int numLargeKvPairs, boolean pipeliningEnabled) throws IOException, InterruptedException { Partitioner partitioner = new HashPartitioner(); - ApplicationId appId = ApplicationId.newInstance(10000, 1); + ApplicationId appId = ApplicationId.newInstance(10000000, 1); TezCounters counters = new TezCounters(); String uniqueId = UUID.randomUUID().toString(); OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId); @@ -225,6 +233,10 @@ public class TestUnorderedPartitionedKVWriter { Configuration conf = createConfiguration(outputContext, Text.class, Text.class, shouldCompress, -1, HashPartitioner.class); + if (pipeliningEnabled) { + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, true); + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false); + } CompressionCodec codec = null; if (shouldCompress) { codec = new DefaultCodec(); @@ -270,6 +282,9 @@ public class TestUnorderedPartitionedKVWriter { kvWriter.write(keyText, valText); numRecordsWritten++; } + if (pipeliningEnabled) { + verify(outputContext, times(numLargeKeys)).sendEvents(anyListOf(Event.class)); + } // Write Large val records for (int i = 0; i < numLargevalues; i++) { @@ -283,6 +298,9 @@ public class TestUnorderedPartitionedKVWriter { kvWriter.write(keyText, valText); numRecordsWritten++; } + if (pipeliningEnabled) { + verify(outputContext, times(numLargevalues + numLargeKeys)).sendEvents(anyListOf(Event.class)); + } // Write records where key + val are large (but both can fit in the buffer individually) for (int i = 0; i < numLargeKvPairs; i++) { @@ -296,6 +314,10 @@ public class TestUnorderedPartitionedKVWriter { kvWriter.write(keyText, valText); numRecordsWritten++; } + if (pipeliningEnabled) { + verify(outputContext, times(numLargevalues + numLargeKeys + numLargeKvPairs)) + .sendEvents(anyListOf(Event.class)); + } List<Event> events = kvWriter.close(); verify(outputContext, never()).fatalError(any(Throwable.class), any(String.class)); @@ -304,6 +326,10 @@ public class TestUnorderedPartitionedKVWriter { assertEquals(numLargeKeys + numLargevalues + numLargeKvPairs, outputLargeRecordsCounter.getValue()); + if (pipeliningEnabled) { + return; + } + // Validate the event assertEquals(1, events.size()); assertTrue(events.get(0) instanceof CompositeDataMovementEvent); @@ -373,10 +399,207 @@ public class TestUnorderedPartitionedKVWriter { assertEquals(0, expectedValues.size()); } + @Test(timeout = 10000) + public void testNoSpill_WithPipelinedShuffle() throws IOException, InterruptedException { + baseTestWithPipelinedTransfer(10, 10, null, shouldCompress); + } + + @Test(timeout = 10000) + public void testSingleSpill_WithPipelinedShuffle() throws IOException, InterruptedException { + baseTestWithPipelinedTransfer(50, 10, null, shouldCompress); + } + + @Test(timeout = 10000) + public void testMultipleSpills_WithPipelinedShuffle() throws IOException, InterruptedException { + baseTestWithPipelinedTransfer(200, 10, null, shouldCompress); + } + + @Test(timeout = 10000) + public void testNoRecords_WithPipelinedShuffle() throws IOException, InterruptedException { + baseTestWithPipelinedTransfer(0, 10, null, shouldCompress); + } + + @Test(timeout = 10000) + public void testSkippedPartitions_WithPipelinedShuffle() throws IOException, InterruptedException { + baseTestWithPipelinedTransfer(200, 10, Sets.newHashSet(2, 5), shouldCompress); + } + + @Test(timeout = 10000) + public void testLargeKvPairs_WithPipelinedShuffle() throws IOException, InterruptedException { + textTest(0, 10, 2048, 10, 20, 50, true); + } + + + @SuppressWarnings("unchecked") + private void baseTestWithPipelinedTransfer(int numRecords, int numPartitions, Set<Integer> + skippedPartitions, boolean shouldCompress) throws IOException, InterruptedException { + + PartitionerForTest partitioner = new PartitionerForTest(); + ApplicationId appId = ApplicationId.newInstance(10000000, 1); + TezCounters counters = new TezCounters(); + String uniqueId = UUID.randomUUID().toString(); + OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId); + + Configuration conf = createConfiguration(outputContext, IntWritable.class, LongWritable.class, + shouldCompress, -1); + conf.setBoolean(TezRuntimeConfiguration + .TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false); + conf.setBoolean(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, true); + + CompressionCodec codec = null; + if (shouldCompress) { + codec = new DefaultCodec(); + ((Configurable) codec).setConf(conf); + } + + int numOutputs = numPartitions; + long availableMemory = 2048; + int numRecordsWritten = 0; + + UnorderedPartitionedKVWriter kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, + conf, numOutputs, availableMemory); + + int sizePerBuffer = kvWriter.sizePerBuffer; + int sizePerRecord = 4 + 8; // IntW + LongW + int sizePerRecordWithOverhead = sizePerRecord + 12; // Record + META_OVERHEAD + + IntWritable intWritable = new IntWritable(); + LongWritable longWritable = new LongWritable(); + for (int i = 0; i < numRecords; i++) { + intWritable.set(i); + longWritable.set(i); + int partition = partitioner.getPartition(intWritable, longWritable, numOutputs); + if (skippedPartitions != null && skippedPartitions.contains(partition)) { + continue; + } + kvWriter.write(intWritable, longWritable); + numRecordsWritten++; + } + + int recordsPerBuffer = sizePerBuffer / sizePerRecordWithOverhead; + int numExpectedSpills = numRecordsWritten / recordsPerBuffer; + + ArgumentCaptor<List> eventCaptor = ArgumentCaptor.forClass(List.class); + List<Event> events = kvWriter.close(); + assertTrue(events.size() == 0); //no events are sent to kvWriter upon close with pipelining + + verify(outputContext, atLeast(numExpectedSpills)).sendEvents(eventCaptor.capture()); + events = eventCaptor.getValue(); + + assertTrue(events.size() == 1); //the last event which was sent out + + verify(outputContext, never()).fatalError(any(Throwable.class), any(String.class)); + + // Verify the status of the buffers + if (numExpectedSpills == 0) { + assertEquals(1, kvWriter.numInitializedBuffers); + } else { + assertTrue(kvWriter.numInitializedBuffers > 1); + } + assertNull(kvWriter.currentBuffer); + assertEquals(0, kvWriter.availableBuffers.size()); + + // Verify the counters + TezCounter outputRecordBytesCounter = counters.findCounter(TaskCounter.OUTPUT_BYTES); + TezCounter outputRecordsCounter = counters.findCounter(TaskCounter.OUTPUT_RECORDS); + TezCounter outputBytesWithOverheadCounter = counters + .findCounter(TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD); + TezCounter fileOutputBytesCounter = counters.findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL); + TezCounter spilledRecordsCounter = counters.findCounter(TaskCounter.SPILLED_RECORDS); + TezCounter additionalSpillBytesWritternCounter = counters + .findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN); + TezCounter additionalSpillBytesReadCounter = counters + .findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ); + TezCounter numAdditionalSpillsCounter = counters + .findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT); + assertEquals(numRecordsWritten * sizePerRecord, outputRecordBytesCounter.getValue()); + assertEquals(numRecordsWritten, outputRecordsCounter.getValue()); + assertEquals(numRecordsWritten * sizePerRecordWithOverhead, + outputBytesWithOverheadCounter.getValue()); + long fileOutputBytes = fileOutputBytesCounter.getValue(); + if (numRecordsWritten > 0) { + assertTrue(fileOutputBytes > 0); + if (!shouldCompress) { + assertTrue(fileOutputBytes > outputRecordBytesCounter.getValue()); + } + } else { + assertEquals(0, fileOutputBytes); + } + assertEquals(recordsPerBuffer * numExpectedSpills, spilledRecordsCounter.getValue()); + long additionalSpillBytesWritten = additionalSpillBytesWritternCounter.getValue(); + long additionalSpillBytesRead = additionalSpillBytesReadCounter.getValue(); + + //No additional spill bytes written when final merge is disabled. + assertEquals(additionalSpillBytesWritten, 0); + + //No additional spills when final merge is disabled. + assertTrue(additionalSpillBytesWritten == additionalSpillBytesRead); + + //No additional spills when final merge is disabled. + assertEquals(numAdditionalSpillsCounter.getValue(), 0); + + BitSet emptyPartitionBits = null; + assertTrue(events.size() > 0); + //Get the last event + int index = events.size() - 1; + assertTrue(events.get(index) instanceof CompositeDataMovementEvent); + CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) events.get(index); + assertEquals(0, cdme.getSourceIndexStart()); + assertEquals(numOutputs, cdme.getCount()); + DataMovementEventPayloadProto eventProto = + DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(cdme.getUserPayload())); + assertFalse(eventProto.hasData()); + //Ensure that this is the last event + assertTrue(eventProto.getLastEvent()); + if (eventProto.hasEmptyPartitions()) { + byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(eventProto + .getEmptyPartitions()); + emptyPartitionBits = TezUtilsInternal.fromByteArray(emptyPartitions); + if (numRecordsWritten == 0) { + assertEquals(numPartitions, emptyPartitionBits.cardinality()); + } else { + if (skippedPartitions != null) { + for (Integer e : skippedPartitions) { + assertTrue(emptyPartitionBits.get(e)); + } + assertEquals(skippedPartitions.size(), emptyPartitionBits.cardinality()); + } + } + if (emptyPartitionBits.cardinality() != numPartitions) { + assertEquals(HOST_STRING, eventProto.getHost()); + assertEquals(SHUFFLE_PORT, eventProto.getPort()); + assertTrue(eventProto.hasPathComponent()); + } else { + assertFalse(eventProto.hasHost()); + assertFalse(eventProto.hasPort()); + assertFalse(eventProto.hasPathComponent()); + } + } else { + assertEquals(HOST_STRING, eventProto.getHost()); + assertEquals(SHUFFLE_PORT, eventProto.getPort()); + assertTrue(eventProto.hasPathComponent()); + } + + // Verify if all spill files are available. + TezTaskOutput taskOutput = new TezTaskOutputFiles(conf, uniqueId); + + if (numRecordsWritten > 0) { + int numSpills = kvWriter.numSpills.get(); + for (int i = 0; i < numSpills; i++) { + assertTrue(localFs.exists(taskOutput.getSpillFileForWrite(i, 10))); + assertTrue(localFs.exists(taskOutput.getSpillIndexFileForWrite(i, 10))); + } + } else { + return; + } + } + + private void baseTest(int numRecords, int numPartitions, Set<Integer> skippedPartitions, boolean shouldCompress) throws IOException, InterruptedException { PartitionerForTest partitioner = new PartitionerForTest(); - ApplicationId appId = ApplicationId.newInstance(10000, 1); + ApplicationId appId = ApplicationId.newInstance(10000000, 1); TezCounters counters = new TezCounters(); String uniqueId = UUID.randomUUID().toString(); OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId); @@ -582,12 +805,18 @@ public class TestUnorderedPartitionedKVWriter { doReturn(1).when(outputContext).getTaskVertexIndex(); doReturn("vertexName").when(outputContext).getTaskVertexName(); doReturn(uniqueId).when(outputContext).getUniqueIdentifier(); - ByteBuffer portBuffer = ByteBuffer.allocate(4); - portBuffer.mark(); - portBuffer.putInt(SHUFFLE_PORT); - portBuffer.reset(); - doReturn(portBuffer).when(outputContext).getServiceProviderMetaData( - eq(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID)); + + doAnswer(new Answer<ByteBuffer>() { + @Override + public ByteBuffer answer(InvocationOnMock invocation) throws Throwable { + ByteBuffer portBuffer = ByteBuffer.allocate(4); + portBuffer.mark(); + portBuffer.putInt(SHUFFLE_PORT); + portBuffer.reset(); + return portBuffer; + } + }).when(outputContext).getServiceProviderMetaData(eq(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID)); + Path outDirBase = new Path(TEST_ROOT_DIR, "outDir_" + uniqueId); String[] outDirs = new String[] { outDirBase.toString() }; doReturn(outDirs).when(outputContext).getWorkDirs(); http://git-wip-us.apache.org/repos/asf/tez/blob/e0e19122/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVOutputConfig.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVOutputConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVOutputConfig.java index 83ea707..f509acb 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVOutputConfig.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVOutputConfig.java @@ -80,6 +80,10 @@ public class TestUnorderedPartitionedKVOutputConfig { .setAvailableBufferSize(1111) .setAdditionalConfiguration("fs.shouldExist", "fs") .setAdditionalConfiguration("test.key.1", "key1") + .setAdditionalConfiguration(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, "true") + .setAdditionalConfiguration(TezRuntimeConfiguration + .TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, "true") .setAdditionalConfiguration(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD, String.valueOf(false)) .setAdditionalConfiguration(additionalConf) @@ -94,6 +98,11 @@ public class TestUnorderedPartitionedKVOutputConfig { Configuration conf = rebuilt.conf; // Verify programmatic API usage + assertEquals(true, conf.getBoolean(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, false)); + //unorderedpartitioned writer ignores this value. + assertEquals(false, conf.getBoolean(TezRuntimeConfiguration + .TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false)); assertEquals(1111, conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB, 0)); assertEquals("KEY", conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, "")); assertEquals("VALUE", conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, "")); http://git-wip-us.apache.org/repos/asf/tez/blob/e0e19122/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java index e0b75b8..721673b 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java @@ -166,7 +166,7 @@ public class TestOnFileSortedOutput { conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 3); conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, 2); - conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER, false); + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false); conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, true); OutputContext context = createTezOutputContext(); UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf); @@ -186,7 +186,7 @@ public class TestOnFileSortedOutput { conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 3); conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, 2); - conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER, true); + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, true); OutputContext context = createTezOutputContext(); UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf); @@ -208,7 +208,7 @@ public class TestOnFileSortedOutput { conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 3); //negative. with sort threads-1 conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, 1); - conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER, false); + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false); conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, true); OutputContext context = createTezOutputContext();
