Repository: tez Updated Branches: refs/heads/master 0ee044c0e -> 29b45bc11
TEZ-3732. Reduce Object size of InputAttemptIdentifier and MapOutput for large jobs (Jonathan Eagles via kshukla) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/29b45bc1 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/29b45bc1 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/29b45bc1 Branch: refs/heads/master Commit: 29b45bc1140a3a3a44a59a53375db1941184ba18 Parents: 0ee044c Author: Kuhu Shukla <[email protected]> Authored: Fri Jun 2 17:43:54 2017 -0500 Committer: Kuhu Shukla <[email protected]> Committed: Fri Jun 2 17:43:54 2017 -0500 ---------------------------------------------------------------------- .../library/common/InputAttemptIdentifier.java | 17 +- .../shuffle/orderedgrouped/MapOutput.java | 242 ++++++++++++------- 2 files changed, 172 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/29b45bc1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java index cc9c6ea..16172e1 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java @@ -45,7 +45,7 @@ public class InputAttemptIdentifier { * These fields are added for additional information about the source and are not meant to * alter the way these sources would be stored in hashmap. */ - private final SPILL_INFO fetchTypeInfo; + private final byte fetchTypeInfo; private final int spillEventId; public InputAttemptIdentifier(int inputIndex, int attemptNumber) { @@ -66,7 +66,7 @@ public class InputAttemptIdentifier { this.attemptNumber = attemptNumber; this.pathComponent = pathComponent; this.shared = shared; - this.fetchTypeInfo = fetchTypeInfo; + this.fetchTypeInfo = (byte)fetchTypeInfo.ordinal(); this.spillEventId = spillEventId; if (pathComponent != null && !pathComponent.startsWith(PATH_PREFIX)) { throw new TezUncheckedException( @@ -91,7 +91,12 @@ public class InputAttemptIdentifier { } public SPILL_INFO getFetchTypeInfo() { - return fetchTypeInfo; + if (fetchTypeInfo == SPILL_INFO.INCREMENTAL_UPDATE.ordinal()) { + return SPILL_INFO.INCREMENTAL_UPDATE; + } else if (fetchTypeInfo == SPILL_INFO.FINAL_UPDATE.ordinal()) { + return SPILL_INFO.FINAL_UPDATE; + } + return SPILL_INFO.FINAL_MERGE_ENABLED; } public int getSpillEventId() { @@ -99,8 +104,8 @@ public class InputAttemptIdentifier { } public boolean canRetrieveInputInChunks() { - return (fetchTypeInfo == SPILL_INFO.INCREMENTAL_UPDATE) || - (fetchTypeInfo == SPILL_INFO.FINAL_UPDATE); + return (fetchTypeInfo == SPILL_INFO.INCREMENTAL_UPDATE.ordinal()) || + (fetchTypeInfo == SPILL_INFO.FINAL_UPDATE.ordinal()); } // PathComponent & shared does not need to be part of the hashCode and equals computation. @@ -134,6 +139,6 @@ public class InputAttemptIdentifier { public String toString() { return "InputAttemptIdentifier [inputIdentifier=" + inputIdentifier + ", attemptNumber=" + attemptNumber + ", pathComponent=" - + pathComponent + ", spillType=" + fetchTypeInfo.ordinal() + ", spillId=" + spillEventId +"]"; + + pathComponent + ", spillType=" + fetchTypeInfo + ", spillId=" + spillEventId +"]"; } } http://git-wip-us.apache.org/repos/asf/tez/blob/29b45bc1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java index b8dacef..488dd80 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java @@ -33,11 +33,11 @@ import org.apache.tez.runtime.library.common.InputAttemptIdentifier; import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles; -class MapOutput { +abstract class MapOutput { private static final Logger LOG = LoggerFactory.getLogger(MapOutput.class); private static AtomicInteger ID = new AtomicInteger(0); - public static enum Type { + public enum Type { WAIT, MEMORY, DISK, @@ -45,50 +45,17 @@ class MapOutput { } private final int id; - private final Type type; private InputAttemptIdentifier attemptIdentifier; private final boolean primaryMapOutput; - private final FetchedInputAllocatorOrderedGrouped callback; + protected final FetchedInputAllocatorOrderedGrouped callback; - // MEMORY - private BoundedByteArrayOutputStream byteStream; - - // DISK - private final Path tmpOutputPath; - private final FileChunk outputPath; - private OutputStream disk; - - private MapOutput(Type type, InputAttemptIdentifier attemptIdentifier, FetchedInputAllocatorOrderedGrouped callback, - long size, Path outputPath, long offset, boolean primaryMapOutput, - FileSystem fs, Path tmpOutputPath) { + private MapOutput(InputAttemptIdentifier attemptIdentifier, FetchedInputAllocatorOrderedGrouped callback, + boolean primaryMapOutput) { this.id = ID.incrementAndGet(); - this.type = type; this.attemptIdentifier = attemptIdentifier; this.callback = callback; this.primaryMapOutput = primaryMapOutput; - - // Other type specific values - - if (type == Type.MEMORY) { - // since we are passing an int from createMemoryMapOutput, its safe to cast to int - this.byteStream = new BoundedByteArrayOutputStream((int)size); - } else { - this.byteStream = null; - } - - this.tmpOutputPath = tmpOutputPath; - this.disk = null; - - if (type == Type.DISK || type == Type.DISK_DIRECT) { - if (type == Type.DISK_DIRECT) { - this.outputPath = new FileChunk(outputPath, offset, size, true, attemptIdentifier); - } else { - this.outputPath = new FileChunk(outputPath, offset, size, false, attemptIdentifier); - } - } else { - this.outputPath = null; - } } public static MapOutput createDiskMapOutput(InputAttemptIdentifier attemptIdentifier, @@ -104,8 +71,8 @@ class MapOutput { Path tmpOutputPath = outputPath.suffix(String.valueOf(fetcher)); long offset = 0; - MapOutput mapOutput = new MapOutput(Type.DISK, attemptIdentifier, callback, size, outputPath, offset, - primaryMapOutput, fs, tmpOutputPath); + DiskMapOutput mapOutput = new DiskMapOutput(attemptIdentifier, callback, size, outputPath, offset, + primaryMapOutput, tmpOutputPath); mapOutput.disk = fs.create(tmpOutputPath); return mapOutput; @@ -114,19 +81,18 @@ class MapOutput { public static MapOutput createLocalDiskMapOutput(InputAttemptIdentifier attemptIdentifier, FetchedInputAllocatorOrderedGrouped callback, Path path, long offset, long size, boolean primaryMapOutput) { - return new MapOutput(Type.DISK_DIRECT, attemptIdentifier, callback, size, path, offset, - primaryMapOutput, null, null); + return new DiskDirectMapOutput(attemptIdentifier, callback, size, path, offset, + primaryMapOutput); } public static MapOutput createMemoryMapOutput(InputAttemptIdentifier attemptIdentifier, FetchedInputAllocatorOrderedGrouped callback, int size, boolean primaryMapOutput) { - return new MapOutput(Type.MEMORY, attemptIdentifier, callback, size, null, -1, primaryMapOutput, - null, null); + return new InMemoryMapOutput(attemptIdentifier, callback, size, primaryMapOutput); } public static MapOutput createWaitMapOutput(InputAttemptIdentifier attemptIdentifier) { - return new MapOutput(Type.WAIT, attemptIdentifier, null, -1, null, -1, false, null, null); + return new WaitMapOutput(attemptIdentifier); } public boolean isPrimaryMapOutput() { @@ -147,69 +113,39 @@ class MapOutput { } public FileChunk getOutputPath() { - return outputPath; + return null; } public byte[] getMemory() { - return byteStream.getBuffer(); + return null; } public BoundedByteArrayOutputStream getArrayStream() { - return byteStream; + return null; } public OutputStream getDisk() { - return disk; + return null; } public InputAttemptIdentifier getAttemptIdentifier() { return this.attemptIdentifier; } - public Type getType() { - return type; - } + public abstract Type getType(); public long getSize() { - if (type == Type.MEMORY) { - return byteStream.getLimit(); - } else if (type == Type.DISK || type == Type.DISK_DIRECT) { - return outputPath.getLength(); - } return -1; } public void commit() throws IOException { - if (type == Type.MEMORY) { - callback.closeInMemoryFile(this); - } else if (type == Type.DISK) { - callback.getLocalFileSystem().rename(tmpOutputPath, outputPath.getPath()); - callback.closeOnDiskFile(outputPath); - } else if (type == Type.DISK_DIRECT) { - callback.closeOnDiskFile(outputPath); - } else { - throw new IOException("Cannot commit MapOutput of type WAIT!"); - } } public void abort() { - if (type == Type.MEMORY) { - callback.unreserve(byteStream.getBuffer().length); - } else if (type == Type.DISK) { - try { - callback.getLocalFileSystem().delete(tmpOutputPath, true); - } catch (IOException ie) { - LOG.info("failure to clean up " + tmpOutputPath, ie); - } - } else if (type == Type.DISK_DIRECT) { //nothing to do. - } else { - throw new IllegalArgumentException - ("Cannot commit MapOutput with of type WAIT!"); - } } public String toString() { - return "MapOutput( AttemptIdentifier: " + attemptIdentifier + ", Type: " + type + ")"; + return "MapOutput( AttemptIdentifier: " + attemptIdentifier + ", Type: " + getType() + ")"; } public static class MapOutputComparator @@ -232,4 +168,148 @@ class MapOutput { } } } + + private static class DiskDirectMapOutput extends MapOutput { + private final FileChunk outputPath; + private DiskDirectMapOutput(InputAttemptIdentifier attemptIdentifier, FetchedInputAllocatorOrderedGrouped callback, + long size, Path outputPath, long offset, boolean primaryMapOutput) { + super(attemptIdentifier, callback, primaryMapOutput); + this.outputPath = new FileChunk(outputPath, offset, size, true, attemptIdentifier); + } + + @Override + public FileChunk getOutputPath() { + return outputPath; + } + + @Override + public long getSize() { + return outputPath.getLength(); + } + + @Override + public void commit() throws IOException { + callback.closeOnDiskFile(outputPath); + } + + @Override + public void abort() { + // nothing to do + } + + @Override + public Type getType() { + return Type.DISK_DIRECT; + } + } + + private static class DiskMapOutput extends MapOutput { + private final Path tmpOutputPath; + private final FileChunk outputPath; + private OutputStream disk; + private DiskMapOutput(InputAttemptIdentifier attemptIdentifier, FetchedInputAllocatorOrderedGrouped callback, + long size, Path outputPath, long offset, boolean primaryMapOutput, Path tmpOutputPath) { + super(attemptIdentifier, callback, primaryMapOutput); + + this.tmpOutputPath = tmpOutputPath; + this.disk = null; + this.outputPath = new FileChunk(outputPath, offset, size, false, attemptIdentifier); + } + + @Override + public FileChunk getOutputPath() { + return outputPath; + } + + @Override + public OutputStream getDisk() { + return disk; + } + + @Override + public long getSize() { + return outputPath.getLength(); + } + + @Override + public void commit() throws IOException { + callback.getLocalFileSystem().rename(tmpOutputPath, outputPath.getPath()); + callback.closeOnDiskFile(outputPath); + } + + @Override + public void abort() { + try { + callback.getLocalFileSystem().delete(tmpOutputPath, true); + } catch (IOException ie) { + LOG.info("failure to clean up " + tmpOutputPath, ie); + } + } + + @Override + public Type getType() { + return Type.DISK; + } + } + + private static class InMemoryMapOutput extends MapOutput { + private BoundedByteArrayOutputStream byteStream; + private InMemoryMapOutput(InputAttemptIdentifier attemptIdentifier, + FetchedInputAllocatorOrderedGrouped callback, + long size, boolean primaryMapOutput) { + super(attemptIdentifier, callback, primaryMapOutput); + this.byteStream = new BoundedByteArrayOutputStream((int)size); + } + + @Override + public byte[] getMemory() { + return byteStream.getBuffer(); + } + + @Override + public BoundedByteArrayOutputStream getArrayStream() { + return byteStream; + } + + @Override + public long getSize() { + return byteStream.getLimit(); + } + + @Override + public void commit() throws IOException { + callback.closeInMemoryFile(this); + } + + @Override + public void abort() { + callback.unreserve(byteStream.getBuffer().length); + } + + @Override + public Type getType() { + return Type.MEMORY; + } + } + + private static class WaitMapOutput extends MapOutput { + private WaitMapOutput(InputAttemptIdentifier attemptIdentifier) { + super(attemptIdentifier, null, false); + } + + @Override + public void commit() throws IOException { + throw new IOException("Cannot commit MapOutput of type WAIT!"); + } + + @Override + public void abort() { + throw new IllegalArgumentException("Cannot commit MapOutput of type WAIT!"); + } + + @Override + public Type getType() { + return Type.WAIT; + } + } }
