Repository: tez Updated Branches: refs/heads/branch-0.7 b80cea95f -> c9ef246a6
TEZ-3076. Reduce merge memory overhead to support large number of in-memory mapoutputs (jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c9ef246a Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c9ef246a Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c9ef246a Branch: refs/heads/branch-0.7 Commit: c9ef246a610d5d1745f91b660267cc51c0abf760 Parents: b80cea9 Author: Jonathan Eagles <[email protected]> Authored: Fri Jan 29 13:48:58 2016 -0600 Committer: Jonathan Eagles <[email protected]> Committed: Fri Jan 29 13:48:58 2016 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../library/common/InputAttemptIdentifier.java | 28 ++--- .../common/shuffle/DiskFetchedInput.java | 2 +- .../library/common/shuffle/ShuffleUtils.java | 2 +- .../impl/ShuffleInputEventHandlerImpl.java | 3 +- .../common/shuffle/impl/ShuffleManager.java | 24 ++--- .../shuffle/orderedgrouped/InMemoryReader.java | 102 ++++++++++++++++++- .../shuffle/orderedgrouped/MapOutput.java | 32 +++--- .../shuffle/orderedgrouped/MergeManager.java | 12 ++- .../ShuffleInputEventHandlerOrderedGrouped.java | 3 +- .../orderedgrouped/ShuffleScheduler.java | 29 +++--- .../runtime/library/common/sort/impl/IFile.java | 6 +- .../impl/TestShuffleInputEventHandlerImpl.java | 11 +- ...tShuffleInputEventHandlerOrderedGrouped.java | 17 ++-- 14 files changed, 175 insertions(+), 97 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/c9ef246a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 014a852..f97bf5c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES TEZ-2972. Avoid task rescheduling when a node turns unhealthy ALL CHANGES + TEZ-3076. Reduce merge memory overhead to support large number of in-memory mapoutputs TEZ-3066. TaskAttemptFinishedEvent ConcurrentModificationException in recovery or history logging services. TEZ-3036. Tez AM can hang on startup with no indication of error TEZ-3052. Task internal error due to Invalid event: T_ATTEMPT_FAILED at FAILED http://git-wip-us.apache.org/repos/asf/tez/blob/c9ef246a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java index d70942c..cc9c6ea 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java @@ -27,7 +27,7 @@ import org.apache.tez.dag.api.TezUncheckedException; @Private public class InputAttemptIdentifier { - private final InputIdentifier inputIdentifier; + private final int inputIdentifier; private final int attemptNumber; private final String pathComponent; private final boolean shared; @@ -49,18 +49,18 @@ public class InputAttemptIdentifier { private final int spillEventId; public InputAttemptIdentifier(int inputIndex, int attemptNumber) { - this(new InputIdentifier(inputIndex), attemptNumber, null); + this(inputIndex, attemptNumber, null); } - public InputAttemptIdentifier(InputIdentifier inputIdentifier, int attemptNumber, String pathComponent) { + public InputAttemptIdentifier(int inputIdentifier, int attemptNumber, String pathComponent) { this(inputIdentifier, attemptNumber, pathComponent, false); } - public InputAttemptIdentifier(InputIdentifier inputIdentifier, int attemptNumber, String pathComponent, boolean shared) { + public InputAttemptIdentifier(int inputIdentifier, int attemptNumber, String pathComponent, boolean shared) { this(inputIdentifier, attemptNumber, pathComponent, shared, SPILL_INFO.FINAL_MERGE_ENABLED, -1); } - public InputAttemptIdentifier(InputIdentifier inputIdentifier, int attemptNumber, String pathComponent, + public InputAttemptIdentifier(int inputIdentifier, int attemptNumber, String pathComponent, boolean shared, SPILL_INFO fetchTypeInfo, int spillEventId) { this.inputIdentifier = inputIdentifier; this.attemptNumber = attemptNumber; @@ -74,15 +74,7 @@ public class InputAttemptIdentifier { } } - public InputAttemptIdentifier(int taskIndex, int attemptNumber, String pathComponent) { - this(new InputIdentifier(taskIndex), attemptNumber, pathComponent); - } - - public InputAttemptIdentifier(int taskIndex, int attemptNumber, String pathComponent, boolean shared) { - this(new InputIdentifier(taskIndex), attemptNumber, pathComponent, shared); - } - - public InputIdentifier getInputIdentifier() { + public int getInputIdentifier() { return this.inputIdentifier; } @@ -117,8 +109,7 @@ public class InputAttemptIdentifier { final int prime = 31; int result = 1; result = prime * result + attemptNumber; - result = prime * result - + ((inputIdentifier == null) ? 0 : inputIdentifier.hashCode()); + result = prime * result + inputIdentifier; return result; } @@ -133,10 +124,7 @@ public class InputAttemptIdentifier { InputAttemptIdentifier other = (InputAttemptIdentifier) obj; if (attemptNumber != other.attemptNumber) return false; - if (inputIdentifier == null) { - if (other.inputIdentifier != null) - return false; - } else if (!inputIdentifier.equals(other.inputIdentifier)) + if (inputIdentifier != other.inputIdentifier) return false; // do not compare pathComponent as they may not always be present return true; http://git-wip-us.apache.org/repos/asf/tez/blob/c9ef246a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/DiskFetchedInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/DiskFetchedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/DiskFetchedInput.java index 6432b55..c03d263 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/DiskFetchedInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/DiskFetchedInput.java @@ -50,7 +50,7 @@ public class DiskFetchedInput extends FetchedInput { this.localFS = FileSystem.getLocal(conf); this.outputPath = filenameAllocator.getInputFileForWrite( - this.inputAttemptIdentifier.getInputIdentifier().getInputIndex(), this + this.inputAttemptIdentifier.getInputIdentifier(), this .inputAttemptIdentifier.getSpillEventId(), actualSize); // Files are not clobbered due to the id being appended to the outputPath in the tmpPath, // otherwise fetches for the same task but from different attempts would clobber each other. http://git-wip-us.apache.org/repos/asf/tez/blob/c9ef246a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java index 48d7eb2..5c9ff77 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java @@ -520,7 +520,7 @@ public class ShuffleUtils { private static String toShortString(InputAttemptIdentifier inputAttemptIdentifier) { StringBuilder sb = new StringBuilder(); sb.append("{"); - sb.append(inputAttemptIdentifier.getInputIdentifier().getInputIndex()); + sb.append(inputAttemptIdentifier.getInputIdentifier()); sb.append(", ").append(inputAttemptIdentifier.getAttemptNumber()); sb.append(", ").append(inputAttemptIdentifier.getPathComponent()); if (inputAttemptIdentifier.getFetchTypeInfo() http://git-wip-us.apache.org/repos/asf/tez/blob/c9ef246a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java index 8fb1568..adc3432 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java @@ -37,7 +37,6 @@ import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.api.events.InputFailedEvent; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; -import org.apache.tez.runtime.library.common.InputIdentifier; import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator; import org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; @@ -176,7 +175,7 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler { InputAttemptIdentifier.SPILL_INFO spillInfo = (lastEvent) ? InputAttemptIdentifier.SPILL_INFO .FINAL_UPDATE : InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE; srcAttemptIdentifier = - new InputAttemptIdentifier(new InputIdentifier(dmEvent.getTargetIndex()), dmEvent + new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent .getVersion(), pathComponent, isShared, spillInfo, spillEventId); } else { srcAttemptIdentifier = http://git-wip-us.apache.org/repos/asf/tez/blob/c9ef246a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index 151cac4..e5ecf06 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -26,7 +26,6 @@ import java.text.DecimalFormat; import java.util.Arrays; import java.util.BitSet; import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -69,7 +68,6 @@ import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.api.events.InputReadErrorEvent; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; -import org.apache.tez.runtime.library.common.InputIdentifier; import org.apache.tez.runtime.library.common.TezRuntimeUtils; import org.apache.tez.runtime.library.common.shuffle.FetchResult; import org.apache.tez.runtime.library.common.shuffle.FetchedInput; @@ -114,7 +112,7 @@ public class ShuffleManager implements FetcherCallback { private final BlockingQueue<FetchedInput> completedInputs; private final AtomicBoolean inputReadyNotificationSent = new AtomicBoolean(false); - private final Set<InputIdentifier> completedInputSet; + private final Set<Integer> completedInputSet; private final ConcurrentMap<String, InputHost> knownSrcHosts; private final BlockingQueue<InputHost> pendingHosts; private final Set<InputAttemptIdentifier> obsoletedInputs; @@ -173,7 +171,7 @@ public class ShuffleManager implements FetcherCallback { //To track shuffleInfo events when finalMerge is disabled OR pipelined shuffle is enabled in source. @VisibleForTesting - final Map<InputIdentifier, ShuffleEventInfo> shuffleInfoEventsMap; + final Map<Integer, ShuffleEventInfo> shuffleInfoEventsMap; // TODO More counters - FetchErrors, speed? @@ -207,7 +205,7 @@ public class ShuffleManager implements FetcherCallback { this.srcNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName()); - completedInputSet = Collections.newSetFromMap(new ConcurrentHashMap<InputIdentifier, Boolean>(numInputs)); + completedInputSet = Collections.newSetFromMap(new ConcurrentHashMap<Integer, Boolean>(numInputs)); /** * In case of pipelined shuffle, it is possible to get multiple FetchedInput per attempt. * We do not know upfront the number of spills from source. @@ -268,7 +266,7 @@ public class ShuffleManager implements FetcherCallback { Arrays.sort(this.localDisks); - shuffleInfoEventsMap = new ConcurrentHashMap<InputIdentifier, ShuffleEventInfo>(); + shuffleInfoEventsMap = new ConcurrentHashMap<Integer, ShuffleEventInfo>(); LOG.info(srcNameTrimmed + ": numInputs=" + numInputs + ", compressionCodec=" + (codec == null ? "NoCompressionCodec" : codec.getClass().getName()) + ", numFetchers=" @@ -479,7 +477,7 @@ public class ShuffleManager implements FetcherCallback { return; } - InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier(); + int inputIdentifier = srcAttemptIdentifier.getInputIdentifier(); if (shuffleInfoEventsMap.get(inputIdentifier) == null) { shuffleInfoEventsMap.put(inputIdentifier, new ShuffleEventInfo(srcAttemptIdentifier)); } @@ -501,7 +499,7 @@ public class ShuffleManager implements FetcherCallback { public void addCompletedInputWithNoData( InputAttemptIdentifier srcAttemptIdentifier) { - InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier(); + int inputIdentifier = srcAttemptIdentifier.getInputIdentifier(); if (LOG.isDebugEnabled()) { LOG.debug("No input data exists for SrcTask: " + inputIdentifier + ". Marking as complete."); } @@ -558,7 +556,7 @@ public class ShuffleManager implements FetcherCallback { ShuffleEventInfo(InputAttemptIdentifier input) { - this.id = input.getInputIdentifier().getInputIndex() + "_" + input.getAttemptNumber(); + this.id = input.getInputIdentifier() + "_" + input.getAttemptNumber(); this.eventsProcessed = new BitSet(); this.attemptNum = input.getAttemptNumber(); } @@ -594,7 +592,7 @@ public class ShuffleManager implements FetcherCallback { public void fetchSucceeded(String host, InputAttemptIdentifier srcAttemptIdentifier, FetchedInput fetchedInput, long fetchedBytes, long decompressedLength, long copyDuration) throws IOException { - InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier(); + int inputIdentifier = srcAttemptIdentifier.getInputIdentifier(); // Count irrespective of whether this is a copy of an already fetched input lock.lock(); @@ -706,7 +704,7 @@ public class ShuffleManager implements FetcherCallback { return; } - InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier(); + int inputIdentifier = srcAttemptIdentifier.getInputIdentifier(); ShuffleEventInfo eventInfo = shuffleInfoEventsMap.get(inputIdentifier); //for empty partition case @@ -769,9 +767,9 @@ public class ShuffleManager implements FetcherCallback { "Fetch failure while fetching from " + TezRuntimeUtils.getTaskAttemptIdentifier( inputContext.getSourceVertexName(), - srcAttemptIdentifier.getInputIdentifier().getInputIndex(), + srcAttemptIdentifier.getInputIdentifier(), srcAttemptIdentifier.getAttemptNumber()), - srcAttemptIdentifier.getInputIdentifier().getInputIndex(), + srcAttemptIdentifier.getInputIdentifier(), srcAttemptIdentifier.getAttemptNumber()); List<Event> failedEvents = Lists.newArrayListWithCapacity(1); http://git-wip-us.apache.org/repos/asf/tez/blob/c9ef246a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java index 75c552e..7860377 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java @@ -18,6 +18,7 @@ package org.apache.tez.runtime.library.common.shuffle.orderedgrouped; +import java.io.ByteArrayInputStream; import java.io.DataInput; import java.io.File; import java.io.FileOutputStream; @@ -37,9 +38,103 @@ import org.apache.tez.runtime.library.common.sort.impl.IFile.Reader; @InterfaceStability.Unstable public class InMemoryReader extends Reader { + private static class ByteArrayDataInput extends ByteArrayInputStream implements DataInput { + + public ByteArrayDataInput(byte buf[], int offset, int length) { + super(buf, offset, length); + } + + public void reset(byte[] input, int start, int length) { + this.buf = input; + this.count = start+length; + this.mark = start; + this.pos = start; + } + + public byte[] getData() { return buf; } + public int getPosition() { return pos; } + public int getLength() { return count; } + public int getMark() { return mark; } + + @Override + public void readFully(byte[] b) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void readFully(byte[] b, int off, int len) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int skipBytes(int n) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean readBoolean() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public byte readByte() throws IOException { + return (byte)read(); + } + + @Override + public int readUnsignedByte() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public short readShort() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int readUnsignedShort() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public char readChar() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int readInt() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long readLong() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public float readFloat() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public double readDouble() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public String readLine() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public String readUTF() throws IOException { + throw new UnsupportedOperationException(); + } + } + private final InputAttemptIdentifier taskAttemptId; private final MergeManager merger; - DataInputBuffer memDataIn = new DataInputBuffer(); + ByteArrayDataInput memDataIn; private int start; private int length; private int originalKeyPos; @@ -49,12 +144,12 @@ public class InMemoryReader extends Reader { int length) throws IOException { super(null, length - start, null, null, null, false, 0, -1); - this.merger = merger; this.taskAttemptId = taskAttemptId; + this.merger = merger; buffer = data; bufferSize = (int) length; - memDataIn.reset(buffer, start, length); + memDataIn = new ByteArrayDataInput(buffer, start, length); this.start = start; this.length = length; } @@ -160,7 +255,6 @@ public class InMemoryReader extends Reader { public void close() { // Release - dataIn = null; buffer = null; // Inform the MergeManager if (merger != null) { http://git-wip-us.apache.org/repos/asf/tez/blob/c9ef246a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java index 55c80aa..682ba75 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java @@ -47,17 +47,14 @@ class MapOutput { private final int id; private final Type type; private InputAttemptIdentifier attemptIdentifier; - private final long size; private final boolean primaryMapOutput; private final MergeManager merger; // MEMORY - private final byte[] memory; private BoundedByteArrayOutputStream byteStream; // DISK - private final FileSystem localFS; private final Path tmpOutputPath; private final FileChunk outputPath; private OutputStream disk; @@ -71,18 +68,13 @@ class MapOutput { this.merger = merger; this.primaryMapOutput = primaryMapOutput; - this.localFS = fs; - this.size = size; - // Other type specific values if (type == Type.MEMORY) { // since we are passing an int from createMemoryMapOutput, its safe to cast to int this.byteStream = new BoundedByteArrayOutputStream((int)size); - this.memory = byteStream.getBuffer(); } else { this.byteStream = null; - this.memory = null; } this.tmpOutputPath = tmpOutputPath; @@ -97,7 +89,6 @@ class MapOutput { } else { this.outputPath = null; } - } public static MapOutput createDiskMapOutput(InputAttemptIdentifier attemptIdentifier, @@ -107,7 +98,7 @@ class MapOutput { IOException { FileSystem fs = FileSystem.getLocal(conf); Path outputpath = mapOutputFile.getInputFileForWrite( - attemptIdentifier.getInputIdentifier().getInputIndex(), attemptIdentifier.getSpillEventId(), size); + attemptIdentifier.getInputIdentifier(), attemptIdentifier.getSpillEventId(), size); // Files are not clobbered due to the id being appended to the outputPath in the tmpPath, // otherwise fetches for the same task but from different attempts would clobber each other. Path tmpOuputPath = outputpath.suffix(String.valueOf(fetcher)); @@ -115,7 +106,7 @@ class MapOutput { MapOutput mapOutput = new MapOutput(Type.DISK, attemptIdentifier, merger, size, outputpath, offset, primaryMapOutput, fs, tmpOuputPath); - mapOutput.disk = mapOutput.localFS.create(tmpOuputPath); + mapOutput.disk = fs.create(tmpOuputPath); return mapOutput; } @@ -160,7 +151,7 @@ class MapOutput { } public byte[] getMemory() { - return memory; + return byteStream.getBuffer(); } public BoundedByteArrayOutputStream getArrayStream() { @@ -180,14 +171,19 @@ class MapOutput { } public long getSize() { - return size; + if (type == Type.MEMORY) { + return byteStream.getLimit(); + } else if (type == Type.DISK || type == Type.DISK_DIRECT) { + return outputPath.getLength(); + } + return -1; } public void commit() throws IOException { if (type == Type.MEMORY) { merger.closeInMemoryFile(this); } else if (type == Type.DISK) { - localFS.rename(tmpOutputPath, outputPath.getPath()); + merger.getLocalFileSystem().rename(tmpOutputPath, outputPath.getPath()); merger.closeOnDiskFile(outputPath); } else if (type == Type.DISK_DIRECT) { merger.closeOnDiskFile(outputPath); @@ -198,10 +194,10 @@ class MapOutput { public void abort() { if (type == Type.MEMORY) { - merger.unreserve(memory.length); + merger.unreserve(byteStream.getBuffer().length); } else if (type == Type.DISK) { try { - localFS.delete(tmpOutputPath, false); + merger.getLocalFileSystem().delete(tmpOutputPath, true); } catch (IOException ie) { LOG.info("failure to clean up " + tmpOutputPath, ie); } @@ -223,9 +219,9 @@ class MapOutput { return 0; } - if (o1.size < o2.size) { + if (o1.getSize() < o2.getSize()) { return -1; - } else if (o1.size > o2.size) { + } else if (o1.getSize() > o2.getSize()) { return 1; } http://git-wip-us.apache.org/repos/asf/tez/blob/c9ef246a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java index 9f5287d..f5e95cb 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java @@ -489,7 +489,11 @@ public class MergeManager { ", inMemoryMergedMapOutputs.size() -> " + inMemoryMergedMapOutputs.size()); } - + + public FileSystem getLocalFileSystem() { + return localFS; + } + public synchronized void closeOnDiskFile(FileChunk file) { //including only path & offset for valdiations. for (FileChunk fileChunk : onDiskMapOutputs) { @@ -654,7 +658,7 @@ public class MergeManager { // All disk writes done by this merge are overhead - due to the lack of // adequate memory to keep all segments in memory. Path outputPath = mapOutputFile.getInputFileForWrite( - srcTaskIdentifier.getInputIdentifier().getInputIndex(), srcTaskIdentifier.getSpillEventId(), + srcTaskIdentifier.getInputIdentifier(), srcTaskIdentifier.getSpillEventId(), mergeOutputSize).suffix(Constants.MERGED_OUTPUT_PREFIX); Writer writer = null; @@ -774,7 +778,7 @@ public class MergeManager { if (file0.isLocalFile()) { // This is setup the same way a type DISK MapOutput is setup when fetching. namePart = mapOutputFile.getSpillFileName( - file0.getInputAttemptIdentifier().getInputIdentifier().getInputIndex(), + file0.getInputAttemptIdentifier().getInputIdentifier(), file0.getInputAttemptIdentifier().getSpillEventId()); } else { namePart = file0.getPath().getName().toString(); @@ -929,7 +933,7 @@ public class MergeManager { long inMemToDiskBytes = 0; boolean mergePhaseFinished = false; if (inMemoryMapOutputs.size() > 0) { - int srcTaskId = inMemoryMapOutputs.get(0).getAttemptIdentifier().getInputIdentifier().getInputIndex(); + int srcTaskId = inMemoryMapOutputs.get(0).getAttemptIdentifier().getInputIdentifier(); inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs, memDiskSegments, this.postMergeMemLimit); http://git-wip-us.apache.org/repos/asf/tez/blob/c9ef246a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java index 2755358..7c4eb98 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java @@ -37,7 +37,6 @@ import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.api.events.InputFailedEvent; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; -import org.apache.tez.runtime.library.common.InputIdentifier; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto; @@ -170,7 +169,7 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl InputAttemptIdentifier.SPILL_INFO info = (lastEvent) ? InputAttemptIdentifier.SPILL_INFO .FINAL_UPDATE : InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE; srcAttemptIdentifier = - new InputAttemptIdentifier(new InputIdentifier(dmEvent.getTargetIndex()), dmEvent + new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent .getVersion(), pathComponent, false, info, spillEventId); } else { srcAttemptIdentifier = http://git-wip-us.apache.org/repos/asf/tez/blob/c9ef246a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index dc83eae..b8b6cf2 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -52,7 +52,6 @@ import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.api.events.InputReadErrorEvent; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; -import org.apache.tez.runtime.library.common.InputIdentifier; import org.apache.tez.runtime.library.common.TezRuntimeUtils; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput.Type; @@ -82,7 +81,7 @@ class ShuffleScheduler { //To track shuffleInfo events when finalMerge is disabled in source or pipelined shuffle is // enabled in source. @VisibleForTesting - final Map<InputIdentifier, ShuffleEventInfo> pipelinedShuffleInfoEventsMap; + final Map<Integer, ShuffleEventInfo> pipelinedShuffleInfoEventsMap; @VisibleForTesting final Set<MapHost> pendingHosts = new HashSet<MapHost>(); @@ -236,7 +235,7 @@ class ShuffleScheduler { this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED); this.lastEventReceived = inputContext.getCounters().findCounter(TaskCounter.LAST_EVENT_RECEIVED); - pipelinedShuffleInfoEventsMap = new HashMap<InputIdentifier, ShuffleEventInfo>(); + pipelinedShuffleInfoEventsMap = new HashMap<Integer, ShuffleEventInfo>(); LOG.info("ShuffleScheduler running for sourceVertex: " + inputContext.getSourceVertexName() + " with configuration: " + "maxFetchFailuresBeforeReporting=" + maxFetchFailuresBeforeReporting @@ -274,7 +273,7 @@ class ShuffleScheduler { ShuffleEventInfo(InputAttemptIdentifier input) { - this.id = input.getInputIdentifier().getInputIndex() + "_" + input.getAttemptNumber(); + this.id = input.getInputIdentifier() + "_" + input.getAttemptNumber(); this.eventsProcessed = new BitSet(); this.attemptNum = input.getAttemptNumber(); } @@ -312,7 +311,7 @@ class ShuffleScheduler { ) throws IOException { inputContext.notifyProgress(); - if (!isInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex())) { + if (!isInputFinished(srcAttemptIdentifier.getInputIdentifier())) { if (!isLocalFetch) { /** * Reset it only when it is a non-local-disk copy. @@ -350,10 +349,10 @@ class ShuffleScheduler { */ if (!srcAttemptIdentifier.canRetrieveInputInChunks()) { remainingMaps.decrementAndGet(); - setInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex()); + setInputFinished(srcAttemptIdentifier.getInputIdentifier()); numFetchedSpills++; } else { - InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier(); + int inputIdentifier = srcAttemptIdentifier.getInputIdentifier(); //Allow only one task attempt to proceed. if (!validateInputAttemptForPipelinedShuffle(srcAttemptIdentifier)) { return; @@ -378,7 +377,7 @@ class ShuffleScheduler { //check if we downloaded all spills pertaining to this InputAttemptIdentifier if (eventInfo.isDone()) { remainingMaps.decrementAndGet(); - setInputFinished(inputIdentifier.getInputIndex()); + setInputFinished(inputIdentifier); pipelinedShuffleInfoEventsMap.remove(inputIdentifier); if (LOG.isTraceEnabled()) { LOG.trace("Removing : " + srcAttemptIdentifier + ", pending: " + @@ -405,7 +404,7 @@ class ShuffleScheduler { if (LOG.isDebugEnabled()) { LOG.debug("src task: " + TezRuntimeUtils.getTaskAttemptIdentifier( - inputContext.getSourceVertexName(), srcAttemptIdentifier.getInputIdentifier().getInputIndex(), + inputContext.getSourceVertexName(), srcAttemptIdentifier.getInputIdentifier(), srcAttemptIdentifier.getAttemptNumber()) + " done"); } } else { @@ -524,7 +523,7 @@ class ShuffleScheduler { String errorMsg = "Failed " + attemptFailures + " times trying to " + "download from " + TezRuntimeUtils.getTaskAttemptIdentifier( inputContext.getSourceVertexName(), - srcAttempt.getInputIdentifier().getInputIndex(), + srcAttempt.getInputIdentifier(), srcAttempt.getAttemptNumber()) + ". threshold=" + abortFailureLimit; IOException ioe = new IOException(errorMsg); // Shuffle knows how to deal with failures post shutdown via the onFailure hook @@ -583,15 +582,15 @@ class ShuffleScheduler { srcNameTrimmed + ": " + "Reporting fetch failure for InputIdentifier: " + srcAttempt + " taskAttemptIdentifier: " + TezRuntimeUtils .getTaskAttemptIdentifier(inputContext.getSourceVertexName(), - srcAttempt.getInputIdentifier().getInputIndex(), + srcAttempt.getInputIdentifier(), srcAttempt.getAttemptNumber()) + " to AM."); List<Event> failedEvents = Lists.newArrayListWithCapacity(1); failedEvents.add(InputReadErrorEvent.create( "Fetch failure for " + TezRuntimeUtils .getTaskAttemptIdentifier(inputContext.getSourceVertexName(), - srcAttempt.getInputIdentifier().getInputIndex(), + srcAttempt.getInputIdentifier(), srcAttempt.getAttemptNumber()) + " to jobtracker.", - srcAttempt.getInputIdentifier().getInputIndex(), + srcAttempt.getInputIdentifier(), srcAttempt.getAttemptNumber())); inputContext.sendEvents(failedEvents); @@ -855,7 +854,7 @@ class ShuffleScheduler { private boolean inputShouldBeConsumed(InputAttemptIdentifier id) { return (!obsoleteInputs.contains(id) && - !isInputFinished(id.getInputIdentifier().getInputIndex())); + !isInputFinished(id.getInputIdentifier())); } public synchronized List<InputAttemptIdentifier> getMapsForHost(MapHost host) { @@ -870,7 +869,7 @@ class ShuffleScheduler { // This may be removed after TEZ-914 InputAttemptIdentifier id = listItr.next(); if (inputShouldBeConsumed(id)) { - Integer inputNumber = Integer.valueOf(id.getInputIdentifier().getInputIndex()); + Integer inputNumber = Integer.valueOf(id.getInputIdentifier()); List<InputAttemptIdentifier> oldIdList = dedupedList.get(inputNumber); if (oldIdList == null || oldIdList.isEmpty()) { http://git-wip-us.apache.org/repos/asf/tez/blob/c9ef246a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java index 20f44dd..a99eb5e 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java @@ -496,7 +496,7 @@ public class IFile { protected byte[] buffer = null; protected int bufferSize = DEFAULT_BUFFER_SIZE; - protected DataInputStream dataIn; + protected DataInputStream dataIn = null; protected int recNo = 1; protected int originalKeyLength; @@ -583,7 +583,9 @@ public class IFile { this.in = null; } - this.dataIn = new DataInputStream(this.in); + if (in != null) { + this.dataIn = new DataInputStream(this.in); + } this.readRecordsCounter = readsCounter; this.bytesReadCounter = bytesReadCounter; this.fileLength = length; http://git-wip-us.apache.org/repos/asf/tez/blob/c9ef246a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java index c452898..5bbf0fb 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java @@ -53,7 +53,6 @@ import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; -import org.apache.tez.runtime.library.common.InputIdentifier; import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto; @@ -215,7 +214,7 @@ public class TestShuffleInputEventHandlerImpl { Event dme = createDataMovementEvent(true, 0, 1, 0, false, new BitSet(), 4, 0); handler.handleEvents(Collections.singletonList(dme)); - InputAttemptIdentifier expectedId1 = new InputAttemptIdentifier(new InputIdentifier(1), 0, + InputAttemptIdentifier expectedId1 = new InputAttemptIdentifier(1, 0, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0); verify(shuffleManager, times(1)).addKnownInput(eq(HOST), eq(PORT), eq(expectedId1), eq(0)); @@ -223,7 +222,7 @@ public class TestShuffleInputEventHandlerImpl { dme = createDataMovementEvent(true, 0, 1, 1, false, new BitSet(), 4, 0); handler.handleEvents(Collections.singletonList(dme)); - InputAttemptIdentifier expectedId2 = new InputAttemptIdentifier(new InputIdentifier(1), 0, + InputAttemptIdentifier expectedId2 = new InputAttemptIdentifier(1, 0, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1); verify(shuffleManager, times(2)).addKnownInput(eq(HOST), eq(PORT), eq(expectedId2), eq(0)); @@ -252,7 +251,7 @@ public class TestShuffleInputEventHandlerImpl { Event dme = createDataMovementEvent(true, 0, 1, 0, false, new BitSet(), 4, 1); handler.handleEvents(Collections.singletonList(dme)); - InputAttemptIdentifier expected = new InputAttemptIdentifier(new InputIdentifier(1), 1, + InputAttemptIdentifier expected = new InputAttemptIdentifier(1, 1, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1); verify(shuffleManager, times(1)).addKnownInput(eq(HOST), eq(PORT), eq(expected), eq(0)); @@ -283,14 +282,14 @@ public class TestShuffleInputEventHandlerImpl { Event dme = createDataMovementEvent(true, 0, 1, 0, false, bitSet, 4, 0); handler.handleEvents(Collections.singletonList(dme)); - InputAttemptIdentifier expected = new InputAttemptIdentifier(new InputIdentifier(1), 0, + InputAttemptIdentifier expected = new InputAttemptIdentifier(1, 0, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0); verify(shuffleManager, times(1)).addCompletedInputWithNoData(expected); //0--> 1 with spill id 1 (attemptNum 0) handler.handleEvents(Collections.singletonList(dme)); dme = createDataMovementEvent(true, 0, 1, 1, false, new BitSet(), 4, 0); - expected = new InputAttemptIdentifier(new InputIdentifier(1), 0, + expected = new InputAttemptIdentifier(1, 0, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1); verify(shuffleManager, times(2)).addCompletedInputWithNoData(expected); http://git-wip-us.apache.org/repos/asf/tez/blob/c9ef246a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java index 2c6b37f..90fbf2f 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java @@ -14,7 +14,6 @@ import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.api.events.InputFailedEvent; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; -import org.apache.tez.runtime.library.common.InputIdentifier; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; import org.junit.Before; import org.junit.Test; @@ -169,7 +168,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped { int inputIdx = 0; Event dme1 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, true, 0); InputAttemptIdentifier id1 = - new InputAttemptIdentifier(new InputIdentifier(inputIdx), attemptNum, + new InputAttemptIdentifier(inputIdx, attemptNum, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0); handler.handleEvents(Collections.singletonList(dme1)); String baseUri = handler.getBaseURI(HOST, PORT, attemptNum).toString(); @@ -180,7 +179,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped { //Send final_update event. Event dme2 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, false, 1); InputAttemptIdentifier id2 = - new InputAttemptIdentifier(new InputIdentifier(inputIdx), attemptNum, + new InputAttemptIdentifier(inputIdx, attemptNum, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 1); handler.handleEvents(Collections.singletonList(dme2)); baseUri = handler.getBaseURI(HOST, PORT, attemptNum).toString(); @@ -206,14 +205,14 @@ public class TestShuffleInputEventHandlerOrderedGrouped { inputIdx = 1; Event dme3 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, true, 1); - InputAttemptIdentifier id3 = new InputAttemptIdentifier(new InputIdentifier(inputIdx), + InputAttemptIdentifier id3 = new InputAttemptIdentifier(inputIdx, attemptNum, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0); handler.handleEvents(Collections.singletonList(dme3)); //Send final_update event (empty partition directly invoking copySucceeded). - InputAttemptIdentifier id4 = new InputAttemptIdentifier(new InputIdentifier(inputIdx), + InputAttemptIdentifier id4 = new InputAttemptIdentifier(inputIdx, attemptNum, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 1); - assertTrue(!scheduler.isInputFinished(id4.getInputIdentifier().getInputIndex())); + assertTrue(!scheduler.isInputFinished(id4.getInputIdentifier())); scheduler.copySucceeded(id4, null, 0, 0, 0, null, false); assertTrue(!scheduler.isDone()); //we haven't downloaded another id yet //Let the incremental event pass @@ -233,7 +232,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped { handler.handleEvents(Collections.singletonList(dme1)); InputAttemptIdentifier id1 = - new InputAttemptIdentifier(new InputIdentifier(inputIdx), attemptNum, + new InputAttemptIdentifier(inputIdx, attemptNum, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0); verify(scheduler, times(1)).addKnownMapOutput(eq(HOST), eq(PORT), eq(1), eq(baseUri), eq(id1)); @@ -247,7 +246,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped { handler.handleEvents(Collections.singletonList(dme2)); InputAttemptIdentifier id2 = - new InputAttemptIdentifier(new InputIdentifier(inputIdx), attemptNum, + new InputAttemptIdentifier(inputIdx, attemptNum, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0); verify(scheduler, times(1)).reportExceptionForInput(any(IOException.class)); } @@ -333,4 +332,4 @@ public class TestShuffleInputEventHandlerOrderedGrouped { } return TezCommonUtils.compressByteArrayToByteString(TezUtilsInternal.toByteArray(bitSet)); } -} \ No newline at end of file +}
