Repository: tez
Updated Branches:
  refs/heads/master 46e7706bb -> 6cbfd1e76


TEZ-3752. Reduce Object size of InMemoryMapOutput for large jobs (Muhammad 
Samir Khan via jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6cbfd1e7
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6cbfd1e7
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6cbfd1e7

Branch: refs/heads/master
Commit: 6cbfd1e76352fee487fd2fd0ad2bc5b4713db91b
Parents: 46e7706
Author: Jonathan Eagles <[email protected]>
Authored: Fri Jul 28 12:21:38 2017 -0500
Committer: Jonathan Eagles <[email protected]>
Committed: Fri Jul 28 12:21:38 2017 -0500

----------------------------------------------------------------------
 .../shuffle/orderedgrouped/InMemoryWriter.java   | 10 ++++++++++
 .../common/shuffle/orderedgrouped/MapOutput.java | 19 +++++--------------
 .../shuffle/orderedgrouped/MergeManager.java     |  7 ++++---
 3 files changed, 19 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/6cbfd1e7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryWriter.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryWriter.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryWriter.java
index d2778d8..46dc72e 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryWriter.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryWriter.java
@@ -37,6 +37,16 @@ public class InMemoryWriter extends Writer {
 
   // TODO Verify and fix counters if required.
 
+  private static class InMemoryBoundedByteArrayOutputStream extends 
BoundedByteArrayOutputStream {
+    InMemoryBoundedByteArrayOutputStream(byte[] array) {
+      super(array, 0, array.length);
+    }
+  }
+
+  public InMemoryWriter(byte[] array) {
+    this(new InMemoryBoundedByteArrayOutputStream(array));
+  }
+
   public InMemoryWriter(BoundedByteArrayOutputStream arrayStream) {
     super(null, null);
     this.out =

http://git-wip-us.apache.org/repos/asf/tez/blob/6cbfd1e7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
index 488dd80..9239a1f 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
@@ -119,10 +119,6 @@ abstract class MapOutput {
   public byte[] getMemory() {
     return null;
   }
-
-  public BoundedByteArrayOutputStream getArrayStream() {
-    return null;
-  }
   
   public OutputStream getDisk() {
     return null;
@@ -253,27 +249,22 @@ abstract class MapOutput {
   }
 
   private static class InMemoryMapOutput extends MapOutput {
-    private BoundedByteArrayOutputStream byteStream;
+    private byte[] byteArray;
     private InMemoryMapOutput(InputAttemptIdentifier attemptIdentifier,
                               FetchedInputAllocatorOrderedGrouped callback,
                               long size, boolean primaryMapOutput) {
       super(attemptIdentifier, callback, primaryMapOutput);
-      this.byteStream = new BoundedByteArrayOutputStream((int)size);
+      this.byteArray = new byte[(int)size];
     }
 
     @Override
     public byte[] getMemory() {
-      return byteStream.getBuffer();
-    }
-
-    @Override
-    public BoundedByteArrayOutputStream getArrayStream() {
-      return byteStream;
+      return byteArray;
     }
 
     @Override
     public long getSize() {
-      return byteStream.getLimit();
+      return byteArray.length;
     }
 
     @Override
@@ -283,7 +274,7 @@ abstract class MapOutput {
 
     @Override
     public void abort() {
-      callback.unreserve(byteStream.getBuffer().length);
+      callback.unreserve(byteArray.length);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/6cbfd1e7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index 9c39c7e..6ffdb56 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BoundedByteArrayOutputStream;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.FileChunk;
 import org.apache.hadoop.io.RawComparator;
@@ -776,7 +777,7 @@ public class MergeManager implements 
FetchedInputAllocatorOrderedGrouped {
 
       int noInMemorySegments = inMemorySegments.size();
 
-      Writer writer = new InMemoryWriter(mergedMapOutputs.getArrayStream());
+      Writer writer = new InMemoryWriter(mergedMapOutputs.getMemory());
 
       LOG.info(inputContext.getSourceVertexName() + ": " + "Initiating 
Memory-to-Memory merge with " + noInMemorySegments +
                " segments of total-size: " + mergeOutputSize);
@@ -856,7 +857,7 @@ public class MergeManager implements 
FetchedInputAllocatorOrderedGrouped {
       srcTaskIdentifier = inputs.get(0).getAttemptIdentifier();
 
       List<Segment> inMemorySegments = new ArrayList<Segment>();
-      long mergeOutputSize = 
+      long mergeOutputSize =
         createInMemorySegments(inputs, inMemorySegments,0);
       int noInMemorySegments = inMemorySegments.size();
 
@@ -1075,7 +1076,7 @@ public class MergeManager implements 
FetchedInputAllocatorOrderedGrouped {
     // closed but not yet present in inMemoryMapOutputs
     long fullSize = 0L;
     for (MapOutput mo : inMemoryMapOutputs) {
-      fullSize += mo.getMemory().length;
+      fullSize += mo.getSize();
     }
     int inMemoryMapOutputsOffset = 0;
     while((fullSize > leaveBytes) && !Thread.currentThread().isInterrupted()) {

Reply via email to