Repository: tez Updated Branches: refs/heads/master 7e636a5e9 -> 3ff360aa1
TEZ-3076. Reduce merge memory overhead to support large number of in-memory mapoutputs (jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3ff360aa Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3ff360aa Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3ff360aa Branch: refs/heads/master Commit: 3ff360aa18373f2b4aa03648de905c690ce5a180 Parents: 7e636a5 Author: Jonathan Eagles <[email protected]> Authored: Fri Jan 29 13:43:59 2016 -0600 Committer: Jonathan Eagles <[email protected]> Committed: Fri Jan 29 13:43:59 2016 -0600 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../tez/dag/app/dag/impl/TestDAGRecovery.java | 1 - .../library/common/InputAttemptIdentifier.java | 28 ++--- .../common/shuffle/DiskFetchedInput.java | 2 +- .../library/common/shuffle/ShuffleUtils.java | 2 +- .../impl/ShuffleInputEventHandlerImpl.java | 3 +- .../common/shuffle/impl/ShuffleManager.java | 23 ++--- .../FetchedInputAllocatorOrderedGrouped.java | 3 + .../shuffle/orderedgrouped/InMemoryReader.java | 102 ++++++++++++++++++- .../shuffle/orderedgrouped/MapOutput.java | 32 +++--- .../shuffle/orderedgrouped/MergeManager.java | 11 +- .../ShuffleInputEventHandlerOrderedGrouped.java | 3 +- .../orderedgrouped/ShuffleScheduler.java | 29 +++--- .../runtime/library/common/sort/impl/IFile.java | 6 +- .../library/common/shuffle/TestFetcher.java | 27 +++-- .../impl/TestShuffleInputEventHandlerImpl.java | 11 +- .../shuffle/orderedgrouped/TestFetcher.java | 29 +++--- ...tShuffleInputEventHandlerOrderedGrouped.java | 17 ++-- .../orderedgrouped/TestShuffleScheduler.java | 65 ++++++------ 19 files changed, 238 insertions(+), 158 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d69390c..6570f8b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.8.3: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3076. Reduce merge memory overhead to support large number of in-memory mapoutputs TEZ-3079. Fix tez-tfile parser documentation. TEZ-3066. TaskAttemptFinishedEvent ConcurrentModificationException in recovery or history logging services. TEZ-3036. Tez AM can hang on startup with no indication of error @@ -322,6 +323,7 @@ INCOMPATIBLE CHANGES TEZ-2949. Allow duplicate dag names within session for Tez. ALL CHANGES + TEZ-3076. Reduce merge memory overhead to support large number of in-memory mapoutputs TEZ-3066. TaskAttemptFinishedEvent ConcurrentModificationException in recovery or history logging services. TEZ-3036. Tez AM can hang on startup with no indication of error TEZ-3052. Task internal error due to Invalid event: T_ATTEMPT_FAILED at FAILED http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java index 3a602bc..6be682d 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java @@ -132,7 +132,6 @@ import org.apache.tez.runtime.api.impl.EventMetaData; import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType; import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TezEvent; -import org.apache.tez.runtime.library.common.InputIdentifier; import org.junit.After; import org.junit.Assert; import org.junit.Before; http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java index d70942c..cc9c6ea 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java @@ -27,7 +27,7 @@ import org.apache.tez.dag.api.TezUncheckedException; @Private public class InputAttemptIdentifier { - private final InputIdentifier inputIdentifier; + private final int inputIdentifier; private final int attemptNumber; private final String pathComponent; private final boolean shared; @@ -49,18 +49,18 @@ public class InputAttemptIdentifier { private final int spillEventId; public InputAttemptIdentifier(int inputIndex, int attemptNumber) { - this(new InputIdentifier(inputIndex), attemptNumber, null); + this(inputIndex, attemptNumber, null); } - public InputAttemptIdentifier(InputIdentifier inputIdentifier, int attemptNumber, String pathComponent) { + public InputAttemptIdentifier(int inputIdentifier, int attemptNumber, String pathComponent) { this(inputIdentifier, attemptNumber, pathComponent, false); } - public InputAttemptIdentifier(InputIdentifier inputIdentifier, int attemptNumber, String pathComponent, boolean shared) { + public InputAttemptIdentifier(int inputIdentifier, int attemptNumber, String pathComponent, boolean shared) { this(inputIdentifier, attemptNumber, pathComponent, shared, SPILL_INFO.FINAL_MERGE_ENABLED, -1); } - public InputAttemptIdentifier(InputIdentifier inputIdentifier, int attemptNumber, String pathComponent, + public InputAttemptIdentifier(int inputIdentifier, int attemptNumber, String pathComponent, boolean shared, SPILL_INFO fetchTypeInfo, int spillEventId) { this.inputIdentifier = inputIdentifier; this.attemptNumber = attemptNumber; @@ -74,15 +74,7 @@ public class InputAttemptIdentifier { } } - public InputAttemptIdentifier(int taskIndex, int attemptNumber, String pathComponent) { - this(new InputIdentifier(taskIndex), attemptNumber, pathComponent); - } - - public InputAttemptIdentifier(int taskIndex, int attemptNumber, String pathComponent, boolean shared) { - this(new InputIdentifier(taskIndex), attemptNumber, pathComponent, shared); - } - - public InputIdentifier getInputIdentifier() { + public int getInputIdentifier() { return this.inputIdentifier; } @@ -117,8 +109,7 @@ public class InputAttemptIdentifier { final int prime = 31; int result = 1; result = prime * result + attemptNumber; - result = prime * result - + ((inputIdentifier == null) ? 0 : inputIdentifier.hashCode()); + result = prime * result + inputIdentifier; return result; } @@ -133,10 +124,7 @@ public class InputAttemptIdentifier { InputAttemptIdentifier other = (InputAttemptIdentifier) obj; if (attemptNumber != other.attemptNumber) return false; - if (inputIdentifier == null) { - if (other.inputIdentifier != null) - return false; - } else if (!inputIdentifier.equals(other.inputIdentifier)) + if (inputIdentifier != other.inputIdentifier) return false; // do not compare pathComponent as they may not always be present return true; http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/DiskFetchedInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/DiskFetchedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/DiskFetchedInput.java index dfad39d..c873af7 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/DiskFetchedInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/DiskFetchedInput.java @@ -50,7 +50,7 @@ public class DiskFetchedInput extends FetchedInput { this.localFS = FileSystem.getLocal(conf).getRaw(); this.outputPath = filenameAllocator.getInputFileForWrite( - this.inputAttemptIdentifier.getInputIdentifier().getInputIndex(), this + this.inputAttemptIdentifier.getInputIdentifier(), this .inputAttemptIdentifier.getSpillEventId(), actualSize); // Files are not clobbered due to the id being appended to the outputPath in the tmpPath, // otherwise fetches for the same task but from different attempts would clobber each other. http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java index 431ba38..e8bf6ae 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java @@ -498,7 +498,7 @@ public class ShuffleUtils { private static String toShortString(InputAttemptIdentifier inputAttemptIdentifier) { StringBuilder sb = new StringBuilder(); sb.append("{"); - sb.append(inputAttemptIdentifier.getInputIdentifier().getInputIndex()); + sb.append(inputAttemptIdentifier.getInputIdentifier()); sb.append(", ").append(inputAttemptIdentifier.getAttemptNumber()); sb.append(", ").append(inputAttemptIdentifier.getPathComponent()); if (inputAttemptIdentifier.getFetchTypeInfo() http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java index 8fb1568..adc3432 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java @@ -37,7 +37,6 @@ import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.api.events.InputFailedEvent; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; -import org.apache.tez.runtime.library.common.InputIdentifier; import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator; import org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; @@ -176,7 +175,7 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler { InputAttemptIdentifier.SPILL_INFO spillInfo = (lastEvent) ? InputAttemptIdentifier.SPILL_INFO .FINAL_UPDATE : InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE; srcAttemptIdentifier = - new InputAttemptIdentifier(new InputIdentifier(dmEvent.getTargetIndex()), dmEvent + new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent .getVersion(), pathComponent, isShared, spillInfo, spillEventId); } else { srcAttemptIdentifier = http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index b3e050a..7f2054b 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -67,7 +67,6 @@ import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.api.events.InputReadErrorEvent; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; -import org.apache.tez.runtime.library.common.InputIdentifier; import org.apache.tez.runtime.library.common.TezRuntimeUtils; import org.apache.tez.runtime.library.common.shuffle.FetchResult; import org.apache.tez.runtime.library.common.shuffle.FetchedInput; @@ -111,7 +110,7 @@ public class ShuffleManager implements FetcherCallback { private final BlockingQueue<FetchedInput> completedInputs; private final AtomicBoolean inputReadyNotificationSent = new AtomicBoolean(false); - private final Set<InputIdentifier> completedInputSet; + private final Set<Integer> completedInputSet; private final ConcurrentMap<String, InputHost> knownSrcHosts; private final BlockingQueue<InputHost> pendingHosts; private final Set<InputAttemptIdentifier> obsoletedInputs; @@ -171,7 +170,7 @@ public class ShuffleManager implements FetcherCallback { //To track shuffleInfo events when finalMerge is disabled OR pipelined shuffle is enabled in source. @VisibleForTesting - final Map<InputIdentifier, ShuffleEventInfo> shuffleInfoEventsMap; + final Map<Integer, ShuffleEventInfo> shuffleInfoEventsMap; // TODO More counters - FetchErrors, speed? @@ -205,7 +204,7 @@ public class ShuffleManager implements FetcherCallback { this.srcNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName()); - completedInputSet = Collections.newSetFromMap(new ConcurrentHashMap<InputIdentifier, Boolean>(numInputs)); + completedInputSet = Collections.newSetFromMap(new ConcurrentHashMap<Integer, Boolean>(numInputs)); /** * In case of pipelined shuffle, it is possible to get multiple FetchedInput per attempt. * We do not know upfront the number of spills from source. @@ -266,7 +265,7 @@ public class ShuffleManager implements FetcherCallback { Arrays.sort(this.localDisks); - shuffleInfoEventsMap = new ConcurrentHashMap<InputIdentifier, ShuffleEventInfo>(); + shuffleInfoEventsMap = new ConcurrentHashMap<Integer, ShuffleEventInfo>(); LOG.info(srcNameTrimmed + ": numInputs=" + numInputs + ", compressionCodec=" + (codec == null ? "NoCompressionCodec" : codec.getClass().getName()) + ", numFetchers=" @@ -479,7 +478,7 @@ public class ShuffleManager implements FetcherCallback { return; } - InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier(); + int inputIdentifier = srcAttemptIdentifier.getInputIdentifier(); if (shuffleInfoEventsMap.get(inputIdentifier) == null) { shuffleInfoEventsMap.put(inputIdentifier, new ShuffleEventInfo(srcAttemptIdentifier)); } @@ -501,7 +500,7 @@ public class ShuffleManager implements FetcherCallback { public void addCompletedInputWithNoData( InputAttemptIdentifier srcAttemptIdentifier) { - InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier(); + int inputIdentifier = srcAttemptIdentifier.getInputIdentifier(); if (LOG.isDebugEnabled()) { LOG.debug("No input data exists for SrcTask: " + inputIdentifier + ". Marking as complete."); } @@ -558,7 +557,7 @@ public class ShuffleManager implements FetcherCallback { ShuffleEventInfo(InputAttemptIdentifier input) { - this.id = input.getInputIdentifier().getInputIndex() + "_" + input.getAttemptNumber(); + this.id = input.getInputIdentifier() + "_" + input.getAttemptNumber(); this.eventsProcessed = new BitSet(); this.attemptNum = input.getAttemptNumber(); } @@ -594,7 +593,7 @@ public class ShuffleManager implements FetcherCallback { public void fetchSucceeded(String host, InputAttemptIdentifier srcAttemptIdentifier, FetchedInput fetchedInput, long fetchedBytes, long decompressedLength, long copyDuration) throws IOException { - InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier(); + int inputIdentifier = srcAttemptIdentifier.getInputIdentifier(); // Count irrespective of whether this is a copy of an already fetched input lock.lock(); @@ -706,7 +705,7 @@ public class ShuffleManager implements FetcherCallback { return; } - InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier(); + int inputIdentifier = srcAttemptIdentifier.getInputIdentifier(); ShuffleEventInfo eventInfo = shuffleInfoEventsMap.get(inputIdentifier); //for empty partition case @@ -769,9 +768,9 @@ public class ShuffleManager implements FetcherCallback { "Fetch failure while fetching from " + TezRuntimeUtils.getTaskAttemptIdentifier( inputContext.getSourceVertexName(), - srcAttemptIdentifier.getInputIdentifier().getInputIndex(), + srcAttemptIdentifier.getInputIdentifier(), srcAttemptIdentifier.getAttemptNumber()), - srcAttemptIdentifier.getInputIdentifier().getInputIndex(), + srcAttemptIdentifier.getInputIdentifier(), srcAttemptIdentifier.getAttemptNumber()); List<Event> failedEvents = Lists.newArrayListWithCapacity(1); http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java index ec1f8eb..7276f74 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java @@ -16,6 +16,7 @@ package org.apache.tez.runtime.library.common.shuffle.orderedgrouped; import java.io.IOException; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.FileChunk; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; @@ -29,6 +30,8 @@ public interface FetchedInputAllocatorOrderedGrouped { void closeInMemoryFile(MapOutput mapOutput); + FileSystem getLocalFileSystem(); + void closeOnDiskFile(FileChunk file); void unreserve(long bytes); http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java index 75c552e..7860377 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java @@ -18,6 +18,7 @@ package org.apache.tez.runtime.library.common.shuffle.orderedgrouped; +import java.io.ByteArrayInputStream; import java.io.DataInput; import java.io.File; import java.io.FileOutputStream; @@ -37,9 +38,103 @@ import org.apache.tez.runtime.library.common.sort.impl.IFile.Reader; @InterfaceStability.Unstable public class InMemoryReader extends Reader { + private static class ByteArrayDataInput extends ByteArrayInputStream implements DataInput { + + public ByteArrayDataInput(byte buf[], int offset, int length) { + super(buf, offset, length); + } + + public void reset(byte[] input, int start, int length) { + this.buf = input; + this.count = start+length; + this.mark = start; + this.pos = start; + } + + public byte[] getData() { return buf; } + public int getPosition() { return pos; } + public int getLength() { return count; } + public int getMark() { return mark; } + + @Override + public void readFully(byte[] b) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void readFully(byte[] b, int off, int len) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int skipBytes(int n) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean readBoolean() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public byte readByte() throws IOException { + return (byte)read(); + } + + @Override + public int readUnsignedByte() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public short readShort() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int readUnsignedShort() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public char readChar() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int readInt() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long readLong() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public float readFloat() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public double readDouble() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public String readLine() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public String readUTF() throws IOException { + throw new UnsupportedOperationException(); + } + } + private final InputAttemptIdentifier taskAttemptId; private final MergeManager merger; - DataInputBuffer memDataIn = new DataInputBuffer(); + ByteArrayDataInput memDataIn; private int start; private int length; private int originalKeyPos; @@ -49,12 +144,12 @@ public class InMemoryReader extends Reader { int length) throws IOException { super(null, length - start, null, null, null, false, 0, -1); - this.merger = merger; this.taskAttemptId = taskAttemptId; + this.merger = merger; buffer = data; bufferSize = (int) length; - memDataIn.reset(buffer, start, length); + memDataIn = new ByteArrayDataInput(buffer, start, length); this.start = start; this.length = length; } @@ -160,7 +255,6 @@ public class InMemoryReader extends Reader { public void close() { // Release - dataIn = null; buffer = null; // Inform the MergeManager if (merger != null) { http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java index f19cd55..7e3d983 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java @@ -47,17 +47,14 @@ class MapOutput { private final int id; private final Type type; private InputAttemptIdentifier attemptIdentifier; - private final long size; private final boolean primaryMapOutput; private final FetchedInputAllocatorOrderedGrouped callback; // MEMORY - private final byte[] memory; private BoundedByteArrayOutputStream byteStream; // DISK - private final FileSystem localFS; private final Path tmpOutputPath; private final FileChunk outputPath; private OutputStream disk; @@ -71,18 +68,13 @@ class MapOutput { this.callback = callback; this.primaryMapOutput = primaryMapOutput; - this.localFS = fs; - this.size = size; - // Other type specific values if (type == Type.MEMORY) { // since we are passing an int from createMemoryMapOutput, its safe to cast to int this.byteStream = new BoundedByteArrayOutputStream((int)size); - this.memory = byteStream.getBuffer(); } else { this.byteStream = null; - this.memory = null; } this.tmpOutputPath = tmpOutputPath; @@ -97,7 +89,6 @@ class MapOutput { } else { this.outputPath = null; } - } public static MapOutput createDiskMapOutput(InputAttemptIdentifier attemptIdentifier, @@ -107,7 +98,7 @@ class MapOutput { IOException { FileSystem fs = FileSystem.getLocal(conf).getRaw(); Path outputpath = mapOutputFile.getInputFileForWrite( - attemptIdentifier.getInputIdentifier().getInputIndex(), attemptIdentifier.getSpillEventId(), size); + attemptIdentifier.getInputIdentifier(), attemptIdentifier.getSpillEventId(), size); // Files are not clobbered due to the id being appended to the outputPath in the tmpPath, // otherwise fetches for the same task but from different attempts would clobber each other. Path tmpOuputPath = outputpath.suffix(String.valueOf(fetcher)); @@ -115,7 +106,7 @@ class MapOutput { MapOutput mapOutput = new MapOutput(Type.DISK, attemptIdentifier, callback, size, outputpath, offset, primaryMapOutput, fs, tmpOuputPath); - mapOutput.disk = mapOutput.localFS.create(tmpOuputPath); + mapOutput.disk = fs.create(tmpOuputPath); return mapOutput; } @@ -160,7 +151,7 @@ class MapOutput { } public byte[] getMemory() { - return memory; + return byteStream.getBuffer(); } public BoundedByteArrayOutputStream getArrayStream() { @@ -180,14 +171,19 @@ class MapOutput { } public long getSize() { - return size; + if (type == Type.MEMORY) { + return byteStream.getLimit(); + } else if (type == Type.DISK || type == Type.DISK_DIRECT) { + return outputPath.getLength(); + } + return -1; } public void commit() throws IOException { if (type == Type.MEMORY) { callback.closeInMemoryFile(this); } else if (type == Type.DISK) { - localFS.rename(tmpOutputPath, outputPath.getPath()); + callback.getLocalFileSystem().rename(tmpOutputPath, outputPath.getPath()); callback.closeOnDiskFile(outputPath); } else if (type == Type.DISK_DIRECT) { callback.closeOnDiskFile(outputPath); @@ -198,10 +194,10 @@ class MapOutput { public void abort() { if (type == Type.MEMORY) { - callback.unreserve(memory.length); + callback.unreserve(byteStream.getBuffer().length); } else if (type == Type.DISK) { try { - localFS.delete(tmpOutputPath, false); + callback.getLocalFileSystem().delete(tmpOutputPath, true); } catch (IOException ie) { LOG.info("failure to clean up " + tmpOutputPath, ie); } @@ -223,9 +219,9 @@ class MapOutput { return 0; } - if (o1.size < o2.size) { + if (o1.getSize() < o2.getSize()) { return -1; - } else if (o1.size > o2.size) { + } else if (o1.getSize() > o2.getSize()) { return 1; } http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java index 61ff338..dfa509f 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java @@ -503,6 +503,11 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { } @Override + public FileSystem getLocalFileSystem() { + return localFS; + } + + @Override public synchronized void closeOnDiskFile(FileChunk file) { //including only path & offset for valdiations. for (FileChunk fileChunk : onDiskMapOutputs) { @@ -726,7 +731,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { // All disk writes done by this merge are overhead - due to the lack of // adequate memory to keep all segments in memory. outputPath = mapOutputFile.getInputFileForWrite( - srcTaskIdentifier.getInputIdentifier().getInputIndex(), srcTaskIdentifier.getSpillEventId(), + srcTaskIdentifier.getInputIdentifier(), srcTaskIdentifier.getSpillEventId(), mergeOutputSize).suffix(Constants.MERGED_OUTPUT_PREFIX); Writer writer = null; @@ -863,7 +868,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { if (file0.isLocalFile()) { // This is setup the same way a type DISK MapOutput is setup when fetching. namePart = mapOutputFile.getSpillFileName( - file0.getInputAttemptIdentifier().getInputIdentifier().getInputIndex(), + file0.getInputAttemptIdentifier().getInputIdentifier(), file0.getInputAttemptIdentifier().getSpillEventId()); } else { namePart = file0.getPath().getName().toString(); @@ -1032,7 +1037,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { long inMemToDiskBytes = 0; boolean mergePhaseFinished = false; if (inMemoryMapOutputs.size() > 0) { - int srcTaskId = inMemoryMapOutputs.get(0).getAttemptIdentifier().getInputIdentifier().getInputIndex(); + int srcTaskId = inMemoryMapOutputs.get(0).getAttemptIdentifier().getInputIdentifier(); inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs, memDiskSegments, this.postMergeMemLimit); http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java index f8c9553..6e6d967 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java @@ -37,7 +37,6 @@ import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.api.events.InputFailedEvent; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; -import org.apache.tez.runtime.library.common.InputIdentifier; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto; @@ -170,7 +169,7 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl InputAttemptIdentifier.SPILL_INFO info = (lastEvent) ? InputAttemptIdentifier.SPILL_INFO .FINAL_UPDATE : InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE; srcAttemptIdentifier = - new InputAttemptIdentifier(new InputIdentifier(dmEvent.getTargetIndex()), dmEvent + new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent .getVersion(), pathComponent, false, info, spillEventId); } else { srcAttemptIdentifier = http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index dcfb274..8cba2a6 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -71,7 +71,6 @@ import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.api.events.InputReadErrorEvent; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; -import org.apache.tez.runtime.library.common.InputIdentifier; import org.apache.tez.runtime.library.common.TezRuntimeUtils; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput.Type; @@ -111,7 +110,7 @@ class ShuffleScheduler { //To track shuffleInfo events when finalMerge is disabled in source or pipelined shuffle is // enabled in source. @VisibleForTesting - final Map<InputIdentifier, ShuffleEventInfo> pipelinedShuffleInfoEventsMap; + final Map<Integer, ShuffleEventInfo> pipelinedShuffleInfoEventsMap; @VisibleForTesting final Set<MapHost> pendingHosts = new HashSet<MapHost>(); @@ -349,7 +348,7 @@ class ShuffleScheduler { this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED); this.lastEventReceived = inputContext.getCounters().findCounter(TaskCounter.LAST_EVENT_RECEIVED); - pipelinedShuffleInfoEventsMap = new HashMap<InputIdentifier, ShuffleEventInfo>(); + pipelinedShuffleInfoEventsMap = new HashMap<Integer, ShuffleEventInfo>(); LOG.info("ShuffleScheduler running for sourceVertex: " + inputContext.getSourceVertexName() + " with configuration: " + "maxFetchFailuresBeforeReporting=" + maxFetchFailuresBeforeReporting @@ -429,7 +428,7 @@ class ShuffleScheduler { ShuffleEventInfo(InputAttemptIdentifier input) { - this.id = input.getInputIdentifier().getInputIndex() + "_" + input.getAttemptNumber(); + this.id = input.getInputIdentifier() + "_" + input.getAttemptNumber(); this.eventsProcessed = new BitSet(); this.attemptNum = input.getAttemptNumber(); } @@ -467,7 +466,7 @@ class ShuffleScheduler { ) throws IOException { inputContext.notifyProgress(); - if (!isInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex())) { + if (!isInputFinished(srcAttemptIdentifier.getInputIdentifier())) { if (!isLocalFetch) { /** * Reset it only when it is a non-local-disk copy. @@ -505,10 +504,10 @@ class ShuffleScheduler { */ if (!srcAttemptIdentifier.canRetrieveInputInChunks()) { remainingMaps.decrementAndGet(); - setInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex()); + setInputFinished(srcAttemptIdentifier.getInputIdentifier()); numFetchedSpills++; } else { - InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier(); + int inputIdentifier = srcAttemptIdentifier.getInputIdentifier(); //Allow only one task attempt to proceed. if (!validateInputAttemptForPipelinedShuffle(srcAttemptIdentifier)) { return; @@ -533,7 +532,7 @@ class ShuffleScheduler { //check if we downloaded all spills pertaining to this InputAttemptIdentifier if (eventInfo.isDone()) { remainingMaps.decrementAndGet(); - setInputFinished(inputIdentifier.getInputIndex()); + setInputFinished(inputIdentifier); pipelinedShuffleInfoEventsMap.remove(inputIdentifier); if (LOG.isTraceEnabled()) { LOG.trace("Removing : " + srcAttemptIdentifier + ", pending: " + @@ -560,7 +559,7 @@ class ShuffleScheduler { if (LOG.isDebugEnabled()) { LOG.debug("src task: " + TezRuntimeUtils.getTaskAttemptIdentifier( - inputContext.getSourceVertexName(), srcAttemptIdentifier.getInputIdentifier().getInputIndex(), + inputContext.getSourceVertexName(), srcAttemptIdentifier.getInputIdentifier(), srcAttemptIdentifier.getAttemptNumber()) + " done"); } } else { @@ -679,7 +678,7 @@ class ShuffleScheduler { String errorMsg = "Failed " + attemptFailures + " times trying to " + "download from " + TezRuntimeUtils.getTaskAttemptIdentifier( inputContext.getSourceVertexName(), - srcAttempt.getInputIdentifier().getInputIndex(), + srcAttempt.getInputIdentifier(), srcAttempt.getAttemptNumber()) + ". threshold=" + abortFailureLimit; IOException ioe = new IOException(errorMsg); // Shuffle knows how to deal with failures post shutdown via the onFailure hook @@ -738,15 +737,15 @@ class ShuffleScheduler { srcNameTrimmed + ": " + "Reporting fetch failure for InputIdentifier: " + srcAttempt + " taskAttemptIdentifier: " + TezRuntimeUtils .getTaskAttemptIdentifier(inputContext.getSourceVertexName(), - srcAttempt.getInputIdentifier().getInputIndex(), + srcAttempt.getInputIdentifier(), srcAttempt.getAttemptNumber()) + " to AM."); List<Event> failedEvents = Lists.newArrayListWithCapacity(1); failedEvents.add(InputReadErrorEvent.create( "Fetch failure for " + TezRuntimeUtils .getTaskAttemptIdentifier(inputContext.getSourceVertexName(), - srcAttempt.getInputIdentifier().getInputIndex(), + srcAttempt.getInputIdentifier(), srcAttempt.getAttemptNumber()) + " to jobtracker.", - srcAttempt.getInputIdentifier().getInputIndex(), + srcAttempt.getInputIdentifier(), srcAttempt.getAttemptNumber())); inputContext.sendEvents(failedEvents); @@ -1014,7 +1013,7 @@ class ShuffleScheduler { private boolean inputShouldBeConsumed(InputAttemptIdentifier id) { return (!obsoleteInputs.contains(id) && - !isInputFinished(id.getInputIdentifier().getInputIndex())); + !isInputFinished(id.getInputIdentifier())); } public synchronized List<InputAttemptIdentifier> getMapsForHost(MapHost host) { @@ -1029,7 +1028,7 @@ class ShuffleScheduler { // This may be removed after TEZ-914 InputAttemptIdentifier id = listItr.next(); if (inputShouldBeConsumed(id)) { - Integer inputNumber = Integer.valueOf(id.getInputIdentifier().getInputIndex()); + Integer inputNumber = Integer.valueOf(id.getInputIdentifier()); List<InputAttemptIdentifier> oldIdList = dedupedList.get(inputNumber); if (oldIdList == null || oldIdList.isEmpty()) { http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java index 20f44dd..a99eb5e 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java @@ -496,7 +496,7 @@ public class IFile { protected byte[] buffer = null; protected int bufferSize = DEFAULT_BUFFER_SIZE; - protected DataInputStream dataIn; + protected DataInputStream dataIn = null; protected int recNo = 1; protected int originalKeyLength; @@ -583,7 +583,9 @@ public class IFile { this.in = null; } - this.dataIn = new DataInputStream(this.in); + if (in != null) { + this.dataIn = new DataInputStream(this.in); + } this.readRecordsCounter = readsCounter; this.bytesReadCounter = bytesReadCounter; this.fileLength = length; http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java index 917dbcb..0aa112e 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java @@ -46,7 +46,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; -import org.apache.tez.runtime.library.common.InputIdentifier; import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord; import org.junit.Assert; import org.junit.Test; @@ -236,36 +235,36 @@ public class TestFetcher { @Test(timeout=5000) public void testInputAttemptIdentifierMap() { InputAttemptIdentifier[] srcAttempts = { - new InputAttemptIdentifier(new InputIdentifier(0), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0", + new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0", false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0), //duplicate entry - new InputAttemptIdentifier(new InputIdentifier(0), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0", + new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0", false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0), // pipeline shuffle based identifiers, with multiple attempts - new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1", + new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1", false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0), - new InputAttemptIdentifier(new InputIdentifier(1), 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1", + new InputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1", false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0), - new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2", + new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2", false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1), - new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3", + new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3", false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 2), - new InputAttemptIdentifier(new InputIdentifier(2), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3", + new InputAttemptIdentifier(2, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3", false, InputAttemptIdentifier.SPILL_INFO.FINAL_MERGE_ENABLED, 0) }; InputAttemptIdentifier[] expectedSrcAttempts = { - new InputAttemptIdentifier(new InputIdentifier(0), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0", + new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0", false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0), // pipeline shuffle based identifiers - new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1", + new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1", false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0), - new InputAttemptIdentifier(new InputIdentifier(1), 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1", + new InputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1", false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0), - new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2", + new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2", false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1), - new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3", + new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3", false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 2), - new InputAttemptIdentifier(new InputIdentifier(2), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3", + new InputAttemptIdentifier(2, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3", false, InputAttemptIdentifier.SPILL_INFO.FINAL_MERGE_ENABLED, 0) }; TezConfiguration conf = new TezConfiguration(); http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java index c452898..5bbf0fb 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java @@ -53,7 +53,6 @@ import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; -import org.apache.tez.runtime.library.common.InputIdentifier; import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto; @@ -215,7 +214,7 @@ public class TestShuffleInputEventHandlerImpl { Event dme = createDataMovementEvent(true, 0, 1, 0, false, new BitSet(), 4, 0); handler.handleEvents(Collections.singletonList(dme)); - InputAttemptIdentifier expectedId1 = new InputAttemptIdentifier(new InputIdentifier(1), 0, + InputAttemptIdentifier expectedId1 = new InputAttemptIdentifier(1, 0, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0); verify(shuffleManager, times(1)).addKnownInput(eq(HOST), eq(PORT), eq(expectedId1), eq(0)); @@ -223,7 +222,7 @@ public class TestShuffleInputEventHandlerImpl { dme = createDataMovementEvent(true, 0, 1, 1, false, new BitSet(), 4, 0); handler.handleEvents(Collections.singletonList(dme)); - InputAttemptIdentifier expectedId2 = new InputAttemptIdentifier(new InputIdentifier(1), 0, + InputAttemptIdentifier expectedId2 = new InputAttemptIdentifier(1, 0, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1); verify(shuffleManager, times(2)).addKnownInput(eq(HOST), eq(PORT), eq(expectedId2), eq(0)); @@ -252,7 +251,7 @@ public class TestShuffleInputEventHandlerImpl { Event dme = createDataMovementEvent(true, 0, 1, 0, false, new BitSet(), 4, 1); handler.handleEvents(Collections.singletonList(dme)); - InputAttemptIdentifier expected = new InputAttemptIdentifier(new InputIdentifier(1), 1, + InputAttemptIdentifier expected = new InputAttemptIdentifier(1, 1, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1); verify(shuffleManager, times(1)).addKnownInput(eq(HOST), eq(PORT), eq(expected), eq(0)); @@ -283,14 +282,14 @@ public class TestShuffleInputEventHandlerImpl { Event dme = createDataMovementEvent(true, 0, 1, 0, false, bitSet, 4, 0); handler.handleEvents(Collections.singletonList(dme)); - InputAttemptIdentifier expected = new InputAttemptIdentifier(new InputIdentifier(1), 0, + InputAttemptIdentifier expected = new InputAttemptIdentifier(1, 0, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0); verify(shuffleManager, times(1)).addCompletedInputWithNoData(expected); //0--> 1 with spill id 1 (attemptNum 0) handler.handleEvents(Collections.singletonList(dme)); dme = createDataMovementEvent(true, 0, 1, 1, false, new BitSet(), 4, 0); - expected = new InputAttemptIdentifier(new InputIdentifier(1), 0, + expected = new InputAttemptIdentifier(1, 0, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1); verify(shuffleManager, times(2)).addCompletedInputWithNoData(expected); http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java index faa2d31..20fb9a9 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java @@ -53,7 +53,6 @@ import com.google.common.collect.Lists; import org.apache.tez.http.HttpConnection; import org.apache.tez.http.HttpConnectionParams; import org.apache.tez.common.counters.TezCounter; -import org.apache.tez.runtime.library.common.InputIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -116,7 +115,7 @@ public class TestFetcher { doReturn("src vertex").when(inputContext).getSourceVertexName(); MapHost mapHost = new MapHost(0, HOST + ":" + PORT, "baseurl"); - InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(new InputIdentifier(0), 0, "attempt"); + InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(0, 0, "attempt"); mapHost.addKnownMap(inputAttemptIdentifier); List<InputAttemptIdentifier> mapsForHost = Lists.newArrayList(inputAttemptIdentifier); doReturn(mapsForHost).when(scheduler).getMapsForHost(mapHost); @@ -484,36 +483,36 @@ public class TestFetcher { @Test(timeout = 5000) public void testInputAttemptIdentifierMap() { InputAttemptIdentifier[] srcAttempts = { - new InputAttemptIdentifier(new InputIdentifier(0), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0", + new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0", false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0), //duplicate entry - new InputAttemptIdentifier(new InputIdentifier(0), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0", + new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0", false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0), // pipeline shuffle based identifiers, with multiple attempts - new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1", + new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1", false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0), - new InputAttemptIdentifier(new InputIdentifier(1), 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1", + new InputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1", false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0), - new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2", + new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2", false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1), - new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3", + new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3", false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 2), - new InputAttemptIdentifier(new InputIdentifier(2), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3", + new InputAttemptIdentifier(2, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3", false, InputAttemptIdentifier.SPILL_INFO.FINAL_MERGE_ENABLED, 0) }; InputAttemptIdentifier[] expectedSrcAttempts = { - new InputAttemptIdentifier(new InputIdentifier(0), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0", + new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0", false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0), // pipeline shuffle based identifiers - new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1", + new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1", false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0), - new InputAttemptIdentifier(new InputIdentifier(1), 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1", + new InputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1", false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0), - new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2", + new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2", false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1), - new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3", + new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3", false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 2), - new InputAttemptIdentifier(new InputIdentifier(2), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3", + new InputAttemptIdentifier(2, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3", false, InputAttemptIdentifier.SPILL_INFO.FINAL_MERGE_ENABLED, 0) }; http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java index 88a1d20..de066fe 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java @@ -17,7 +17,6 @@ import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.api.events.InputFailedEvent; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; -import org.apache.tez.runtime.library.common.InputIdentifier; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; import org.junit.Before; import org.junit.Test; @@ -165,7 +164,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped { int inputIdx = 0; Event dme1 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, true, 0); InputAttemptIdentifier id1 = - new InputAttemptIdentifier(new InputIdentifier(inputIdx), attemptNum, + new InputAttemptIdentifier(inputIdx, attemptNum, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0); handler.handleEvents(Collections.singletonList(dme1)); String baseUri = handler.getBaseURI(HOST, PORT, attemptNum).toString(); @@ -176,7 +175,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped { //Send final_update event. Event dme2 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, false, 1); InputAttemptIdentifier id2 = - new InputAttemptIdentifier(new InputIdentifier(inputIdx), attemptNum, + new InputAttemptIdentifier(inputIdx, attemptNum, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 1); handler.handleEvents(Collections.singletonList(dme2)); baseUri = handler.getBaseURI(HOST, PORT, attemptNum).toString(); @@ -202,14 +201,14 @@ public class TestShuffleInputEventHandlerOrderedGrouped { inputIdx = 1; Event dme3 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, true, 1); - InputAttemptIdentifier id3 = new InputAttemptIdentifier(new InputIdentifier(inputIdx), + InputAttemptIdentifier id3 = new InputAttemptIdentifier(inputIdx, attemptNum, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0); handler.handleEvents(Collections.singletonList(dme3)); //Send final_update event (empty partition directly invoking copySucceeded). - InputAttemptIdentifier id4 = new InputAttemptIdentifier(new InputIdentifier(inputIdx), + InputAttemptIdentifier id4 = new InputAttemptIdentifier(inputIdx, attemptNum, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 1); - assertTrue(!scheduler.isInputFinished(id4.getInputIdentifier().getInputIndex())); + assertTrue(!scheduler.isInputFinished(id4.getInputIdentifier())); scheduler.copySucceeded(id4, null, 0, 0, 0, null, false); assertTrue(!scheduler.isDone()); //we haven't downloaded another id yet //Let the incremental event pass @@ -229,7 +228,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped { handler.handleEvents(Collections.singletonList(dme1)); InputAttemptIdentifier id1 = - new InputAttemptIdentifier(new InputIdentifier(inputIdx), attemptNum, + new InputAttemptIdentifier(inputIdx, attemptNum, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0); verify(scheduler, times(1)).addKnownMapOutput(eq(HOST), eq(PORT), eq(1), eq(baseUri), eq(id1)); @@ -243,7 +242,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped { handler.handleEvents(Collections.singletonList(dme2)); InputAttemptIdentifier id2 = - new InputAttemptIdentifier(new InputIdentifier(inputIdx), attemptNum, + new InputAttemptIdentifier(inputIdx, attemptNum, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0); verify(scheduler, times(1)).reportExceptionForInput(any(IOException.class)); } @@ -329,4 +328,4 @@ public class TestShuffleInputEventHandlerOrderedGrouped { } return TezCommonUtils.compressByteArrayToByteString(TezUtilsInternal.toByteArray(bitSet)); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java index 1a6c3be..f7ef309 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java @@ -50,7 +50,6 @@ import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; -import org.apache.tez.runtime.library.common.InputIdentifier; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -89,7 +88,7 @@ public class TestShuffleScheduler { // Schedule all copies. for (int i = 0; i < numInputs; i++) { InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + new InputAttemptIdentifier(i, 0, "attempt_"); scheduler.addKnownMapOutput("host" + i, 10000, 1, "hostUrl", inputAttemptIdentifier); identifiers[i] = inputAttemptIdentifier; } @@ -134,7 +133,7 @@ public class TestShuffleScheduler { for (int i = 0; i < numInputs; i++) { InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + new InputAttemptIdentifier(i, 0, "attempt_"); scheduler.addKnownMapOutput("host" + i, 10000, 1, "hostUrl", inputAttemptIdentifier); identifiers[i] = inputAttemptIdentifier; } @@ -191,7 +190,7 @@ public class TestShuffleScheduler { //Generate 320 events for (int i = 0; i < 320; i++) { InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + new InputAttemptIdentifier(i, 0, "attempt_"); scheduler.addKnownMapOutput("host" + (i % totalProducerNodes), 10000, i, "hostUrl", inputAttemptIdentifier); } @@ -199,7 +198,7 @@ public class TestShuffleScheduler { //100 succeeds for (int i = 0; i < 100; i++) { InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + new InputAttemptIdentifier(i, 0, "attempt_"); MapOutput mapOutput = MapOutput .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class), 100, false); @@ -210,14 +209,14 @@ public class TestShuffleScheduler { //99 fails for (int i = 100; i < 199; i++) { InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + new InputAttemptIdentifier(i, 0, "attempt_"); scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) + ":" + 10000, ""), false, true, false); } InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(new InputIdentifier(200), 0, "attempt_"); + new InputAttemptIdentifier(200, 0, "attempt_"); //Should fail here and report exception as reducer is not healthy scheduler.copyFailed(inputAttemptIdentifier, new MapHost(200, "host" + (200 % @@ -260,7 +259,7 @@ public class TestShuffleScheduler { //Generate 0-200 events for (int i = 0; i < 200; i++) { InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + new InputAttemptIdentifier(i, 0, "attempt_"); scheduler.addKnownMapOutput("host" + (i % totalProducerNodes), 10000, i, "hostUrl", inputAttemptIdentifier); } @@ -269,7 +268,7 @@ public class TestShuffleScheduler { //Generate 200-320 events with empty partitions for (int i = 200; i < 320; i++) { InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + new InputAttemptIdentifier(i, 0, "attempt_"); scheduler.copySucceeded(inputAttemptIdentifier, null, 0, 0, 0, null, true); } //120 are successful. so remaining is 200 @@ -279,7 +278,7 @@ public class TestShuffleScheduler { //200 pending to be downloaded. Download 190. for (int i = 0; i < 190; i++) { InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + new InputAttemptIdentifier(i, 0, "attempt_"); MapOutput mapOutput = MapOutput .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class), 100, false); @@ -292,7 +291,7 @@ public class TestShuffleScheduler { //10 fails for (int i = 190; i < 200; i++) { InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + new InputAttemptIdentifier(i, 0, "attempt_"); scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) + ":" + 10000, ""), false, true, false); } @@ -304,7 +303,7 @@ public class TestShuffleScheduler { scheduler.lastProgressTime = System.currentTimeMillis() - 250000; InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(new InputIdentifier(190), 0, "attempt_"); + new InputAttemptIdentifier(190, 0, "attempt_"); scheduler.copyFailed(inputAttemptIdentifier, new MapHost(190, "host" + (190 % totalProducerNodes) + ":" + 10000, ""), false, true, false); @@ -317,7 +316,7 @@ public class TestShuffleScheduler { //fail to download 50 more times across attempts for (int i = 190; i < 200; i++) { inputAttemptIdentifier = - new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + new InputAttemptIdentifier(i, 0, "attempt_"); scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) + ":" + 10000, ""), false, true, false); scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) @@ -338,7 +337,7 @@ public class TestShuffleScheduler { //fail another 30 for (int i = 110; i < 120; i++) { inputAttemptIdentifier = - new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + new InputAttemptIdentifier(i, 0, "attempt_"); scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) + ":" + 10000, ""), false, true, false); scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) @@ -376,7 +375,7 @@ public class TestShuffleScheduler { //Generate 320 events for (int i = 0; i < 320; i++) { InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + new InputAttemptIdentifier(i, 0, "attempt_"); scheduler.addKnownMapOutput("host" + (i % totalProducerNodes), 10000, i, "hostUrl", inputAttemptIdentifier); } @@ -384,7 +383,7 @@ public class TestShuffleScheduler { //319 succeeds for (int i = 0; i < 319; i++) { InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + new InputAttemptIdentifier(i, 0, "attempt_"); MapOutput mapOutput = MapOutput .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class), 100, false); @@ -394,7 +393,7 @@ public class TestShuffleScheduler { //1 fails (last fetch) InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(new InputIdentifier(319), 0, "attempt_"); + new InputAttemptIdentifier(319, 0, "attempt_"); scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 % totalProducerNodes) + ":" + 10000, ""), false, true, false); @@ -441,7 +440,7 @@ public class TestShuffleScheduler { //Generate 320 events for (int i = 0; i < 320; i++) { InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + new InputAttemptIdentifier(i, 0, "attempt_"); scheduler.addKnownMapOutput("host" + (i % totalProducerNodes), 10000, i, "hostUrl", inputAttemptIdentifier); } @@ -449,7 +448,7 @@ public class TestShuffleScheduler { //Tasks fail in 20% of nodes 3 times, but are able to proceed further for (int i = 0; i < 64; i++) { InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + new InputAttemptIdentifier(i, 0, "attempt_"); scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) + ":" + 10000, ""), false, true, false); @@ -470,7 +469,7 @@ public class TestShuffleScheduler { //319 succeeds for (int i = 64; i < 319; i++) { InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + new InputAttemptIdentifier(i, 0, "attempt_"); MapOutput mapOutput = MapOutput .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class), 100, false); @@ -480,7 +479,7 @@ public class TestShuffleScheduler { //1 fails (last fetch) InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(new InputIdentifier(319), 0, "attempt_"); + new InputAttemptIdentifier(319, 0, "attempt_"); scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 % totalProducerNodes) + ":" + 10000, ""), false, true, false); @@ -536,7 +535,7 @@ public class TestShuffleScheduler { //Generate 319 events (last event has not arrived) for (int i = 0; i < 319; i++) { InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + new InputAttemptIdentifier(i, 0, "attempt_"); scheduler.addKnownMapOutput("host" + (i % totalProducerNodes), 10000, i, "hostUrl", inputAttemptIdentifier); } @@ -544,7 +543,7 @@ public class TestShuffleScheduler { //318 succeeds for (int i = 0; i < 319; i++) { InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + new InputAttemptIdentifier(i, 0, "attempt_"); MapOutput mapOutput = MapOutput .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class), 100, false); @@ -554,7 +553,7 @@ public class TestShuffleScheduler { //1 fails (last fetch) InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(new InputIdentifier(318), 0, "attempt_"); + new InputAttemptIdentifier(318, 0, "attempt_"); scheduler.copyFailed(inputAttemptIdentifier, new MapHost(318, "host" + (318 % totalProducerNodes) + ":" + 10000, ""), false, true, false); @@ -615,7 +614,7 @@ public class TestShuffleScheduler { //Generate 320 events (last event has not arrived) for (int i = 0; i < 320; i++) { InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + new InputAttemptIdentifier(i, 0, "attempt_"); scheduler.addKnownMapOutput("host" + (i % totalProducerNodes), 10000, i, "hostUrl", inputAttemptIdentifier); } @@ -623,7 +622,7 @@ public class TestShuffleScheduler { //10 succeeds for (int i = 0; i < 10; i++) { InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + new InputAttemptIdentifier(i, 0, "attempt_"); MapOutput mapOutput = MapOutput .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class), 100, false); @@ -634,7 +633,7 @@ public class TestShuffleScheduler { //5 fetches fail once for (int i = 10; i < 15; i++) { InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + new InputAttemptIdentifier(i, 0, "attempt_"); scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) + ":" + 10000, ""), false, true, false); } @@ -648,7 +647,7 @@ public class TestShuffleScheduler { //5 fetches fail repeatedly for (int i = 10; i < 15; i++) { InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + new InputAttemptIdentifier(i, 0, "attempt_"); scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) + ":" + 10000, ""), false, true, false); scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) @@ -691,7 +690,7 @@ public class TestShuffleScheduler { //Generate 320 events for (int i = 0; i < 320; i++) { InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + new InputAttemptIdentifier(i, 0, "attempt_"); scheduler.addKnownMapOutput("host" + (i % totalProducerNodes), 10000, i, "hostUrl", inputAttemptIdentifier); } @@ -699,7 +698,7 @@ public class TestShuffleScheduler { //100 succeeds for (int i = 0; i < 100; i++) { InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + new InputAttemptIdentifier(i, 0, "attempt_"); MapOutput mapOutput = MapOutput .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class), 100, false); @@ -711,7 +710,7 @@ public class TestShuffleScheduler { //99 fails for (int i = 100; i < 199; i++) { InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + new InputAttemptIdentifier(i, 0, "attempt_"); scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) + ":" + 10000, ""), false, true, false); @@ -754,7 +753,7 @@ public class TestShuffleScheduler { final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 1, shuffle); InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(new InputIdentifier(0), 0, "attempt_"); + new InputAttemptIdentifier(0, 0, "attempt_"); scheduler.addKnownMapOutput("host0", 10000, 0, "hostUrl", inputAttemptIdentifier); assertTrue(scheduler.pendingHosts.size() == 1); @@ -801,7 +800,7 @@ public class TestShuffleScheduler { for (int i = 0; i < numInputs; i++) { InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + new InputAttemptIdentifier(i, 0, "attempt_"); scheduler.addKnownMapOutput("host" + i, 10000, 1, "hostUrl", inputAttemptIdentifier); identifiers[i] = inputAttemptIdentifier; }
