Repository: tez Updated Branches: refs/heads/master 46e7706bb -> 6cbfd1e76
TEZ-3752. Reduce Object size of InMemoryMapOutput for large jobs (Muhammad Samir Khan via jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6cbfd1e7 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6cbfd1e7 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6cbfd1e7 Branch: refs/heads/master Commit: 6cbfd1e76352fee487fd2fd0ad2bc5b4713db91b Parents: 46e7706 Author: Jonathan Eagles <[email protected]> Authored: Fri Jul 28 12:21:38 2017 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Fri Jul 28 12:21:38 2017 -0500 ---------------------------------------------------------------------- .../shuffle/orderedgrouped/InMemoryWriter.java | 10 ++++++++++ .../common/shuffle/orderedgrouped/MapOutput.java | 19 +++++-------------- .../shuffle/orderedgrouped/MergeManager.java | 7 ++++--- 3 files changed, 19 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/6cbfd1e7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryWriter.java index d2778d8..46dc72e 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryWriter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryWriter.java @@ -37,6 +37,16 @@ public class InMemoryWriter extends Writer { // TODO Verify and fix counters if required. + private static class InMemoryBoundedByteArrayOutputStream extends BoundedByteArrayOutputStream { + InMemoryBoundedByteArrayOutputStream(byte[] array) { + super(array, 0, array.length); + } + } + + public InMemoryWriter(byte[] array) { + this(new InMemoryBoundedByteArrayOutputStream(array)); + } + public InMemoryWriter(BoundedByteArrayOutputStream arrayStream) { super(null, null); this.out = http://git-wip-us.apache.org/repos/asf/tez/blob/6cbfd1e7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java index 488dd80..9239a1f 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java @@ -119,10 +119,6 @@ abstract class MapOutput { public byte[] getMemory() { return null; } - - public BoundedByteArrayOutputStream getArrayStream() { - return null; - } public OutputStream getDisk() { return null; @@ -253,27 +249,22 @@ abstract class MapOutput { } private static class InMemoryMapOutput extends MapOutput { - private BoundedByteArrayOutputStream byteStream; + private byte[] byteArray; private InMemoryMapOutput(InputAttemptIdentifier attemptIdentifier, FetchedInputAllocatorOrderedGrouped callback, long size, boolean primaryMapOutput) { super(attemptIdentifier, callback, primaryMapOutput); - this.byteStream = new BoundedByteArrayOutputStream((int)size); + this.byteArray = new byte[(int)size]; } @Override public byte[] getMemory() { - return byteStream.getBuffer(); - } - - @Override - public BoundedByteArrayOutputStream getArrayStream() { - return byteStream; + return byteArray; } @Override public long getSize() { - return byteStream.getLimit(); + return byteArray.length; } @Override @@ -283,7 +274,7 @@ abstract class MapOutput { @Override public void abort() { - callback.unreserve(byteStream.getBuffer().length); + callback.unreserve(byteArray.length); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/6cbfd1e7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java index 9c39c7e..6ffdb56 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java @@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BoundedByteArrayOutputStream; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.FileChunk; import org.apache.hadoop.io.RawComparator; @@ -776,7 +777,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { int noInMemorySegments = inMemorySegments.size(); - Writer writer = new InMemoryWriter(mergedMapOutputs.getArrayStream()); + Writer writer = new InMemoryWriter(mergedMapOutputs.getMemory()); LOG.info(inputContext.getSourceVertexName() + ": " + "Initiating Memory-to-Memory merge with " + noInMemorySegments + " segments of total-size: " + mergeOutputSize); @@ -856,7 +857,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { srcTaskIdentifier = inputs.get(0).getAttemptIdentifier(); List<Segment> inMemorySegments = new ArrayList<Segment>(); - long mergeOutputSize = + long mergeOutputSize = createInMemorySegments(inputs, inMemorySegments,0); int noInMemorySegments = inMemorySegments.size(); @@ -1075,7 +1076,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { // closed but not yet present in inMemoryMapOutputs long fullSize = 0L; for (MapOutput mo : inMemoryMapOutputs) { - fullSize += mo.getMemory().length; + fullSize += mo.getSize(); } int inMemoryMapOutputsOffset = 0; while((fullSize > leaveBytes) && !Thread.currentThread().isInterrupted()) {
