Repository: tez
Updated Branches:
  refs/heads/master 12695f3d0 -> 3e409ae0e


TEZ-3147. Intermediate mem-to-mem: Fix early exit when only one segment can fit 
into memory (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3e409ae0
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3e409ae0
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3e409ae0

Branch: refs/heads/master
Commit: 3e409ae0ee7233b4cf631cac1bc366679a08b7d1
Parents: 12695f3
Author: Rajesh Balamohan <[email protected]>
Authored: Sat Feb 27 10:57:09 2016 +0530
Committer: Rajesh Balamohan <[email protected]>
Committed: Sat Feb 27 10:57:09 2016 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../shuffle/orderedgrouped/MergeManager.java    |  8 ++-
 .../orderedgrouped/TestMergeManager.java        | 59 ++++++++++++++++++++
 3 files changed, 67 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/3e409ae0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 706a305..f3efab7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
   TEZ-3029. Add an onError method to service plugin contexts.
 
 ALL CHANGES:
+  TEZ-3147. Intermediate mem-to-mem: Fix early exit when only one segment can 
fit into memory
   TEZ-3141. mapreduce.task.timeout is not translated to container heartbeat 
timeout
   TEZ-3128. Avoid stopping containers on the AM shutdown thread.
   TEZ-3129. Tez task and task attempt UI needs application fails with 
NotFoundException

http://git-wip-us.apache.org/repos/asf/tez/blob/3e409ae0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index 6c60a80..0b0f6b6 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -654,6 +654,7 @@ public class MergeManager implements 
FetchedInputAllocatorOrderedGrouped {
       synchronized (manager) {
 
         Iterator<MapOutput> it = inputs.iterator();
+        MapOutput lastAddedMapOutput = null;
         while(it.hasNext() && !Thread.currentThread().isInterrupted()) {
           MapOutput mo = it.next();
           if ((mergeOutputSize + mo.getSize() + manager.getUsedMemory()) > 
memoryLimit) {
@@ -672,6 +673,7 @@ public class MergeManager implements 
FetchedInputAllocatorOrderedGrouped {
                 mo.getAttemptIdentifier(), mo.getMemory(), 0, 
mo.getMemory().length);
             inMemorySegments.add(new Segment(reader, true,
                 (mo.isPrimaryMapOutput() ? mergedMapOutputsCounter : null)));
+            lastAddedMapOutput = mo;
             it.remove();
             LOG.debug("Added segment for merging. mergeOutputSize=" + 
mergeOutputSize);
           }
@@ -680,8 +682,12 @@ public class MergeManager implements 
FetchedInputAllocatorOrderedGrouped {
         //Add any unused MapOutput back
         inMemoryMapOutputs.addAll(inputs);
 
+        //Exit early, if 0 or 1 segment is available
         if (inMemorySegments.size() <= 1) {
-          return; //no need to proceed further.
+          if (lastAddedMapOutput != null) {
+            inMemoryMapOutputs.add(lastAddedMapOutput);
+          }
+          return;
         }
 
         mergedMapOutputs = unconditionalReserve(dummyMapId, mergeOutputSize, 
false);

http://git-wip-us.apache.org/repos/asf/tez/blob/3e409ae0/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
index 4112b99..c84794d 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
@@ -423,6 +423,65 @@ public class TestMergeManager {
 
     mergeManager.close();
 
+    /**
+     * Test #4
+     * - Set number of segments for merging to 4.
+     * - Have 4 in-memory segments of size {490000,490000,490000,230000}
+     * - Committing 4 segments would trigger mem-to-mem
+     * - But only 300000 can fit into memory. This should not be
+     * merged as there is no point in merging single segment. It should be
+     * added back to the inMemorySegments
+     */
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMTOMEM_SEGMENTS, 
4);
+    mergeManager =
+        new MergeManager(conf, localFs, localDirAllocator, inputContext, null, 
null, null, null,
+            exceptionReporter, 2000000, null, false, -1);
+    mergeManager.configureAndStart();
+
+    //Single shuffle limit is 25% of 2000000
+    data1 = generateDataBySize(conf, 490000);
+    data2 = generateDataBySize(conf, 490000);
+    data3 = generateDataBySize(conf, 490000);
+    data4 = generateDataBySize(conf, 230000);
+
+    mo1 = mergeManager.reserve(new InputAttemptIdentifier(0,0), data1.length, 
data1.length, 0);
+    mo2 = mergeManager.reserve(new InputAttemptIdentifier(1,0), data2.length, 
data2.length, 0);
+    mo3 = mergeManager.reserve(new InputAttemptIdentifier(2,0), data3.length, 
data3.length, 0);
+    mo4 = mergeManager.reserve(new InputAttemptIdentifier(3,0), data4.length, 
data4.length, 0);
+
+    assertTrue(mergeManager.getUsedMemory() >= (490000 + 490000 + 490000 + 
23000));
+
+    assertEquals(MapOutput.Type.MEMORY, mo1.getType());
+    assertEquals(MapOutput.Type.MEMORY, mo2.getType());
+    assertEquals(MapOutput.Type.MEMORY, mo3.getType());
+    assertEquals(MapOutput.Type.MEMORY, mo4.getType());
+    assertEquals(0, mergeManager.getCommitMemory());
+
+    assertEquals(data1.length + data2.length + data3.length + data4.length,
+        mergeManager.getUsedMemory());
+
+    System.arraycopy(data1, 0, mo1.getMemory(), 0, data1.length);
+    System.arraycopy(data2, 0, mo2.getMemory(), 0, data2.length);
+    System.arraycopy(data3, 0, mo3.getMemory(), 0, data3.length);
+    System.arraycopy(data4, 0, mo4.getMemory(), 0, data4.length);
+
+    //Committing 4 segments should trigger mem-to-mem merge
+    mo1.commit();
+    mo2.commit();
+    mo3.commit();
+    mo4.commit();
+
+    //4 segments were there originally in inMemoryMapOutput.
+    int numberOfMapOutputs = 4;
+
+    //Wait for mem-to-mem to complete. Since only 1 segment (230000) can fit
+    //into memory, it should return early
+    mergeManager.waitForMemToMemMerge();
+
+    //Check if inMemorySegment has got the MapOutput back for merging later
+    assertEquals(numberOfMapOutputs, mergeManager.inMemoryMapOutputs.size());
+
+    mergeManager.close();
   }
 
   private byte[] generateDataBySize(Configuration conf, int rawLen) throws 
IOException {

Reply via email to