Repository: tez
Updated Branches:
  refs/heads/master cc9dd2799 -> 15d7339e9


TEZ-1911. MergeManager's unconditionalReserve() should check for memory limits 
before allocating memory to IntermediateMemoryToMemoryMerger (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/15d7339e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/15d7339e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/15d7339e

Branch: refs/heads/master
Commit: 15d7339e9fb64cdd0d995da9832dff721c14eacf
Parents: cc9dd27
Author: Rajesh Balamohan <[email protected]>
Authored: Fri Feb 26 11:56:48 2016 +0530
Committer: Rajesh Balamohan <[email protected]>
Committed: Fri Feb 26 11:56:54 2016 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../shuffle/orderedgrouped/MergeManager.java    |  63 +++++-
 .../orderedgrouped/TestMergeManager.java        | 216 +++++++++++++++++++
 3 files changed, 269 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/15d7339e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8de1383..e8e72b7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
   TEZ-3029. Add an onError method to service plugin contexts.
 
 ALL CHANGES:
+  TEZ-1911. MergeManager's unconditionalReserve() should check for memory 
limits before allocating.
   TEZ-3102. Fetch failure of a speculated task causes job hang
   TEZ-3124. Running task hangs due to missing event to initialize input in 
recovery.
   TEZ-3135. tez-ext-service-tests, tez-plugins/tez-yarn-timeline-history and 
tez-tools/tez-javadoc-tools missing dependencies.

http://git-wip-us.apache.org/repos/asf/tez/blob/15d7339e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index b56a9a8..b01609c 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -51,7 +51,6 @@ import 
org.apache.tez.runtime.library.common.sort.impl.TezMerger;
 import org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment;
 import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
 import 
org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
-import org.apache.tez.runtime.library.hadoop.compat.NullProgressable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,6 +59,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
@@ -92,11 +92,13 @@ public class MergeManager implements 
FetchedInputAllocatorOrderedGrouped {
   };
   private final Combiner combiner;  
   
-  private final Set<MapOutput> inMemoryMergedMapOutputs = 
+  @VisibleForTesting
+  final Set<MapOutput> inMemoryMergedMapOutputs =
     new TreeSet<MapOutput>(new MapOutput.MapOutputComparator());
   private final IntermediateMemoryToMemoryMerger memToMemMerger;
 
-  private final Set<MapOutput> inMemoryMapOutputs = 
+  @VisibleForTesting
+  final Set<MapOutput> inMemoryMapOutputs =
     new TreeSet<MapOutput>(new MapOutput.MapOutputComparator());
   private final InMemoryMerger inMemoryMerger;
 
@@ -644,19 +646,58 @@ public class MergeManager implements 
FetchedInputAllocatorOrderedGrouped {
 
       InputAttemptIdentifier dummyMapId = 
inputs.get(0).getAttemptIdentifier(); 
       List<Segment> inMemorySegments = new ArrayList<Segment>();
-      long mergeOutputSize = 
-        createInMemorySegments(inputs, inMemorySegments, 0);
+
+      MapOutput mergedMapOutputs = null;
+
+      long mergeOutputSize = 0l;
+      //Lock manager so that fetcher threads can not change the mem size
+      synchronized (manager) {
+
+        Iterator<MapOutput> it = inputs.iterator();
+        while(it.hasNext() && !Thread.currentThread().isInterrupted()) {
+          MapOutput mo = it.next();
+          if ((mergeOutputSize + mo.getSize() + usedMemory) > memoryLimit) {
+            //Search for smaller segments that can fit into existing mem
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Size is greater than usedMemory. "
+                  + "mergeOutputSize=" + mergeOutputSize
+                  + ", moSize=" + mo.getSize()
+                  + ", usedMemory=" + usedMemory
+                  + ", memoryLimit=" + memoryLimit);
+            }
+            continue;
+          } else {
+            mergeOutputSize += mo.getSize();
+            IFile.Reader reader = new InMemoryReader(MergeManager.this,
+                mo.getAttemptIdentifier(), mo.getMemory(), 0, 
mo.getMemory().length);
+            inMemorySegments.add(new Segment(reader, true,
+                (mo.isPrimaryMapOutput() ? mergedMapOutputsCounter : null)));
+            it.remove();
+            LOG.debug("Added segment for merging. mergeOutputSize=" + 
mergeOutputSize);
+          }
+        }
+
+        //Add any unused MapOutput back
+        inMemoryMapOutputs.addAll(inputs);
+
+        if (inMemorySegments.size() <= 1) {
+          return; //no need to proceed further.
+        }
+
+        mergedMapOutputs = unconditionalReserve(dummyMapId, mergeOutputSize, 
false);
+      }
+
       int noInMemorySegments = inMemorySegments.size();
 
-      MapOutput mergedMapOutputs =
-        unconditionalReserve(dummyMapId, mergeOutputSize, false);
-      
-      Writer writer = 
-        new InMemoryWriter(mergedMapOutputs.getArrayStream());
+      Writer writer = new InMemoryWriter(mergedMapOutputs.getArrayStream());
 
       LOG.info(inputContext.getSourceVertexName() + ": " + "Initiating 
Memory-to-Memory merge with " + noInMemorySegments +
                " segments of total-size: " + mergeOutputSize);
 
+      if (Thread.currentThread().isInterrupted()) {
+        return; // early exit
+      }
+
       // Nothing will be materialized to disk because the sort factor is being
       // set to the number of in memory segments.
       // TODO Is this doing any combination ?
@@ -673,7 +714,7 @@ public class MergeManager implements 
FetchedInputAllocatorOrderedGrouped {
 
       LOG.info(inputContext.getSourceVertexName() +
                " Memory-to-Memory merge of the " + noInMemorySegments +
-               " files in-memory complete.");
+               " files in-memory complete with mergeOutputSize=" + 
mergeOutputSize);
 
       // Note the output of the merge
       closeInMemoryMergedFile(mergedMapOutputs);

http://git-wip-us.apache.org/repos/asf/tez/blob/15d7339e/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
index c62c116..4112b99 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
@@ -231,6 +231,222 @@ public class TestMergeManager {
     assertEquals(data1.length + data2.length, mergeManager.getUsedMemory());
   }
 
+  @Test(timeout = 60000l)
+  public void testIntermediateMemoryMerge() throws Throwable {
+    Configuration conf = new TezConfiguration(defaultConf);
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, false);
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, 
IntWritable.class.getName());
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, 
IntWritable.class.getName());
+    
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM, 
true);
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMTOMEM_SEGMENTS, 
3);
+
+    Path localDir = new Path(workDir, "local");
+    Path srcDir = new Path(workDir, "srcData");
+    localFs.mkdirs(localDir);
+    localFs.mkdirs(srcDir);
+
+    conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, 
localDir.toString());
+
+    FileSystem localFs = FileSystem.getLocal(conf);
+    LocalDirAllocator localDirAllocator =
+        new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
+    InputContext inputContext = 
createMockInputContext(UUID.randomUUID().toString());
+
+    ExceptionReporter exceptionReporter = mock(ExceptionReporter.class);
+
+    MergeManager mergeManager =
+        new MergeManager(conf, localFs, localDirAllocator, inputContext, null, 
null, null, null,
+            exceptionReporter, 2000000, null, false, -1);
+    mergeManager.configureAndStart();
+
+    assertEquals(0, mergeManager.getUsedMemory());
+    assertEquals(0, mergeManager.getCommitMemory());
+
+    /**
+     * Test #1
+     * - Have 4 segments where all of them can fit into memory.
+     * - After 3 segment commits, it would trigger mem-to-mem merge.
+     * - All of them can be merged in memory.
+     */
+    byte[] data1 = generateDataBySize(conf, 10);
+    byte[] data2 = generateDataBySize(conf, 20);
+    byte[] data3 = generateDataBySize(conf, 200);
+    byte[] data4 = generateDataBySize(conf, 20000);
+
+    MapOutput mo1 = mergeManager.reserve(new InputAttemptIdentifier(0,0), 
data1.length, data1.length, 0);
+    MapOutput mo2 = mergeManager.reserve(new InputAttemptIdentifier(1,0), 
data2.length, data2.length, 0);
+    MapOutput mo3 = mergeManager.reserve(new InputAttemptIdentifier(2,0), 
data3.length, data3.length, 0);
+    MapOutput mo4 = mergeManager.reserve(new InputAttemptIdentifier(3,0), 
data4.length, data4.length, 0);
+
+    assertEquals(MapOutput.Type.MEMORY, mo1.getType());
+    assertEquals(MapOutput.Type.MEMORY, mo2.getType());
+    assertEquals(MapOutput.Type.MEMORY, mo3.getType());
+    assertEquals(MapOutput.Type.MEMORY, mo4.getType());
+    assertEquals(0, mergeManager.getCommitMemory());
+
+    //size should be ~20230.
+    assertEquals(data1.length + data2.length + data3.length + data4.length,
+        mergeManager.getUsedMemory());
+
+
+    System.arraycopy(data1, 0, mo1.getMemory(), 0, data1.length);
+    System.arraycopy(data2, 0, mo2.getMemory(), 0, data2.length);
+    System.arraycopy(data3, 0, mo3.getMemory(), 0, data3.length);
+    System.arraycopy(data4, 0, mo4.getMemory(), 0, data4.length);
+
+    //Committing 3 segments should trigger mem-to-mem merge
+    mo1.commit();
+    mo2.commit();
+    mo3.commit();
+    mo4.commit();
+
+    //Wait for mem-to-mem to complete
+    mergeManager.waitForMemToMemMerge();
+
+    assertEquals(1, mergeManager.inMemoryMergedMapOutputs.size());
+    assertEquals(1, mergeManager.inMemoryMapOutputs.size());
+
+    mergeManager.close();
+
+
+    /**
+     * Test #2
+     * - Have 4 segments where all of them can fit into memory, but one of
+     * them would be big enough that it can not be fit in memory during
+     * mem-to-mem merging.
+     *
+     * - After 3 segment commits, it would trigger mem-to-mem merge.
+     * - Smaller segments which can be fit in additional memory allocated gets
+     * merged.
+     */
+    mergeManager =
+        new MergeManager(conf, localFs, localDirAllocator, inputContext, null, 
null, null, null,
+            exceptionReporter, 2000000, null, false, -1);
+    mergeManager.configureAndStart();
+
+    //Single shuffle limit is 25% of 2000000
+    data1 = generateDataBySize(conf, 10);
+    data2 = generateDataBySize(conf, 400000);
+    data3 = generateDataBySize(conf, 400000);
+    data4 = generateDataBySize(conf, 400000);
+
+    mo1 = mergeManager.reserve(new InputAttemptIdentifier(0,0), data1.length, 
data1.length, 0);
+    mo2 = mergeManager.reserve(new InputAttemptIdentifier(1,0), data2.length, 
data2.length, 0);
+    mo3 = mergeManager.reserve(new InputAttemptIdentifier(2,0), data3.length, 
data3.length, 0);
+    mo4 = mergeManager.reserve(new InputAttemptIdentifier(3,0), data4.length, 
data4.length, 0);
+
+    assertEquals(MapOutput.Type.MEMORY, mo1.getType());
+    assertEquals(MapOutput.Type.MEMORY, mo2.getType());
+    assertEquals(MapOutput.Type.MEMORY, mo3.getType());
+    assertEquals(MapOutput.Type.MEMORY, mo4.getType());
+    assertEquals(0, mergeManager.getCommitMemory());
+
+    assertEquals(data1.length + data2.length + data3.length + data4.length,
+        mergeManager.getUsedMemory());
+
+    System.arraycopy(data1, 0, mo1.getMemory(), 0, data1.length);
+    System.arraycopy(data2, 0, mo2.getMemory(), 0, data2.length);
+    System.arraycopy(data3, 0, mo3.getMemory(), 0, data3.length);
+    System.arraycopy(data4, 0, mo4.getMemory(), 0, data4.length);
+
+    //Committing 3 segments should trigger mem-to-mem merge
+    mo1.commit();
+    mo2.commit();
+    mo3.commit();
+    mo4.commit();
+
+    //Wait for mem-to-mem to complete
+    mergeManager.waitForMemToMemMerge();
+
+    /**
+     * Already all segments are in memory which is around 120000. It
+     * would not be able to allocate more than 800000 for mem-to-mem. So it
+     * would pick up only 2 small segments which can be accomodated within
+     * 800000.
+     */
+    assertEquals(1, mergeManager.inMemoryMergedMapOutputs.size());
+    assertEquals(2, mergeManager.inMemoryMapOutputs.size());
+
+    mergeManager.close();
+
+    /**
+     * Test #3
+     * - Set number of segments for merging to 4.
+     * - Have 4 in-memory segments of size 400000 each
+     * - Committing 4 segments would trigger mem-to-mem
+     * - But none of them can be merged as there is no enough head room for
+     * merging in memory.
+     */
+    mergeManager =
+        new MergeManager(conf, localFs, localDirAllocator, inputContext, null, 
null, null, null,
+            exceptionReporter, 2000000, null, false, -1);
+    mergeManager.configureAndStart();
+
+    //Single shuffle limit is 25% of 2000000
+    data1 = generateDataBySize(conf, 400000);
+    data2 = generateDataBySize(conf, 400000);
+    data3 = generateDataBySize(conf, 400000);
+    data4 = generateDataBySize(conf, 400000);
+
+    mo1 = mergeManager.reserve(new InputAttemptIdentifier(0,0), data1.length, 
data1.length, 0);
+    mo2 = mergeManager.reserve(new InputAttemptIdentifier(1,0), data2.length, 
data2.length, 0);
+    mo3 = mergeManager.reserve(new InputAttemptIdentifier(2,0), data3.length, 
data3.length, 0);
+    mo4 = mergeManager.reserve(new InputAttemptIdentifier(3,0), data4.length, 
data4.length, 0);
+
+    assertEquals(MapOutput.Type.MEMORY, mo1.getType());
+    assertEquals(MapOutput.Type.MEMORY, mo2.getType());
+    assertEquals(MapOutput.Type.MEMORY, mo3.getType());
+    assertEquals(MapOutput.Type.MEMORY, mo4.getType());
+    assertEquals(0, mergeManager.getCommitMemory());
+
+    assertEquals(data1.length + data2.length + data3.length + data4.length,
+        mergeManager.getUsedMemory());
+
+    System.arraycopy(data1, 0, mo1.getMemory(), 0, data1.length);
+    System.arraycopy(data2, 0, mo2.getMemory(), 0, data2.length);
+    System.arraycopy(data3, 0, mo3.getMemory(), 0, data3.length);
+    System.arraycopy(data4, 0, mo4.getMemory(), 0, data4.length);
+
+    //Committing 3 segments should trigger mem-to-mem merge
+    mo1.commit();
+    mo2.commit();
+    mo3.commit();
+    mo4.commit();
+
+    //Wait for mem-to-mem to complete
+    mergeManager.waitForMemToMemMerge();
+
+    // None of them can be merged as new mem needed for mem-to-mem can't
+    // accomodate any segements
+    assertEquals(0, mergeManager.inMemoryMergedMapOutputs.size());
+    assertEquals(4, mergeManager.inMemoryMapOutputs.size());
+
+    mergeManager.close();
+
+  }
+
+  private byte[] generateDataBySize(Configuration conf, int rawLen) throws 
IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    FSDataOutputStream fsdos = new FSDataOutputStream(baos, null);
+    IFile.Writer writer =
+        new IFile.Writer(conf, fsdos, IntWritable.class, IntWritable.class, 
null, null, null);
+    int i = 0;
+    while(true) {
+      writer.append(new IntWritable(i), new IntWritable(i));
+      i++;
+      if (writer.getRawLength() > rawLen) {
+        break;
+      }
+    }
+    writer.close();
+    int compressedLength = (int)writer.getCompressedLength();
+    int rawLength = (int)writer.getRawLength();
+    byte[] data = new byte[rawLength];
+    ShuffleUtils.shuffleToMemory(data, new 
ByteArrayInputStream(baos.toByteArray()),
+        rawLength, compressedLength, null, false, 0, LOG, "sometask");
+    return data;
+  }
+
   private byte[] generateData(Configuration conf, int numEntries) throws 
IOException {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     FSDataOutputStream fsdos = new FSDataOutputStream(baos, null);

Reply via email to