Repository: tez Updated Branches: refs/heads/master cc9dd2799 -> 15d7339e9
TEZ-1911. MergeManager's unconditionalReserve() should check for memory limits before allocating memory to IntermediateMemoryToMemoryMerger (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/15d7339e Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/15d7339e Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/15d7339e Branch: refs/heads/master Commit: 15d7339e9fb64cdd0d995da9832dff721c14eacf Parents: cc9dd27 Author: Rajesh Balamohan <[email protected]> Authored: Fri Feb 26 11:56:48 2016 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Fri Feb 26 11:56:54 2016 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../shuffle/orderedgrouped/MergeManager.java | 63 +++++- .../orderedgrouped/TestMergeManager.java | 216 +++++++++++++++++++ 3 files changed, 269 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/15d7339e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 8de1383..e8e72b7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES TEZ-3029. Add an onError method to service plugin contexts. ALL CHANGES: + TEZ-1911. MergeManager's unconditionalReserve() should check for memory limits before allocating. TEZ-3102. Fetch failure of a speculated task causes job hang TEZ-3124. Running task hangs due to missing event to initialize input in recovery. TEZ-3135. tez-ext-service-tests, tez-plugins/tez-yarn-timeline-history and tez-tools/tez-javadoc-tools missing dependencies. http://git-wip-us.apache.org/repos/asf/tez/blob/15d7339e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java index b56a9a8..b01609c 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java @@ -51,7 +51,6 @@ import org.apache.tez.runtime.library.common.sort.impl.TezMerger; import org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment; import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator; import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles; -import org.apache.tez.runtime.library.hadoop.compat.NullProgressable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,6 +59,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.TreeSet; @@ -92,11 +92,13 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { }; private final Combiner combiner; - private final Set<MapOutput> inMemoryMergedMapOutputs = + @VisibleForTesting + final Set<MapOutput> inMemoryMergedMapOutputs = new TreeSet<MapOutput>(new MapOutput.MapOutputComparator()); private final IntermediateMemoryToMemoryMerger memToMemMerger; - private final Set<MapOutput> inMemoryMapOutputs = + @VisibleForTesting + final Set<MapOutput> inMemoryMapOutputs = new TreeSet<MapOutput>(new MapOutput.MapOutputComparator()); private final InMemoryMerger inMemoryMerger; @@ -644,19 +646,58 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { InputAttemptIdentifier dummyMapId = inputs.get(0).getAttemptIdentifier(); List<Segment> inMemorySegments = new ArrayList<Segment>(); - long mergeOutputSize = - createInMemorySegments(inputs, inMemorySegments, 0); + + MapOutput mergedMapOutputs = null; + + long mergeOutputSize = 0l; + //Lock manager so that fetcher threads can not change the mem size + synchronized (manager) { + + Iterator<MapOutput> it = inputs.iterator(); + while(it.hasNext() && !Thread.currentThread().isInterrupted()) { + MapOutput mo = it.next(); + if ((mergeOutputSize + mo.getSize() + usedMemory) > memoryLimit) { + //Search for smaller segments that can fit into existing mem + if (LOG.isDebugEnabled()) { + LOG.debug("Size is greater than usedMemory. " + + "mergeOutputSize=" + mergeOutputSize + + ", moSize=" + mo.getSize() + + ", usedMemory=" + usedMemory + + ", memoryLimit=" + memoryLimit); + } + continue; + } else { + mergeOutputSize += mo.getSize(); + IFile.Reader reader = new InMemoryReader(MergeManager.this, + mo.getAttemptIdentifier(), mo.getMemory(), 0, mo.getMemory().length); + inMemorySegments.add(new Segment(reader, true, + (mo.isPrimaryMapOutput() ? mergedMapOutputsCounter : null))); + it.remove(); + LOG.debug("Added segment for merging. mergeOutputSize=" + mergeOutputSize); + } + } + + //Add any unused MapOutput back + inMemoryMapOutputs.addAll(inputs); + + if (inMemorySegments.size() <= 1) { + return; //no need to proceed further. + } + + mergedMapOutputs = unconditionalReserve(dummyMapId, mergeOutputSize, false); + } + int noInMemorySegments = inMemorySegments.size(); - MapOutput mergedMapOutputs = - unconditionalReserve(dummyMapId, mergeOutputSize, false); - - Writer writer = - new InMemoryWriter(mergedMapOutputs.getArrayStream()); + Writer writer = new InMemoryWriter(mergedMapOutputs.getArrayStream()); LOG.info(inputContext.getSourceVertexName() + ": " + "Initiating Memory-to-Memory merge with " + noInMemorySegments + " segments of total-size: " + mergeOutputSize); + if (Thread.currentThread().isInterrupted()) { + return; // early exit + } + // Nothing will be materialized to disk because the sort factor is being // set to the number of in memory segments. // TODO Is this doing any combination ? @@ -673,7 +714,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { LOG.info(inputContext.getSourceVertexName() + " Memory-to-Memory merge of the " + noInMemorySegments + - " files in-memory complete."); + " files in-memory complete with mergeOutputSize=" + mergeOutputSize); // Note the output of the merge closeInMemoryMergedFile(mergedMapOutputs); http://git-wip-us.apache.org/repos/asf/tez/blob/15d7339e/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java index c62c116..4112b99 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java @@ -231,6 +231,222 @@ public class TestMergeManager { assertEquals(data1.length + data2.length, mergeManager.getUsedMemory()); } + @Test(timeout = 60000l) + public void testIntermediateMemoryMerge() throws Throwable { + Configuration conf = new TezConfiguration(defaultConf); + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, false); + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, IntWritable.class.getName()); + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, IntWritable.class.getName()); + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM, true); + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMTOMEM_SEGMENTS, 3); + + Path localDir = new Path(workDir, "local"); + Path srcDir = new Path(workDir, "srcData"); + localFs.mkdirs(localDir); + localFs.mkdirs(srcDir); + + conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDir.toString()); + + FileSystem localFs = FileSystem.getLocal(conf); + LocalDirAllocator localDirAllocator = + new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS); + InputContext inputContext = createMockInputContext(UUID.randomUUID().toString()); + + ExceptionReporter exceptionReporter = mock(ExceptionReporter.class); + + MergeManager mergeManager = + new MergeManager(conf, localFs, localDirAllocator, inputContext, null, null, null, null, + exceptionReporter, 2000000, null, false, -1); + mergeManager.configureAndStart(); + + assertEquals(0, mergeManager.getUsedMemory()); + assertEquals(0, mergeManager.getCommitMemory()); + + /** + * Test #1 + * - Have 4 segments where all of them can fit into memory. + * - After 3 segment commits, it would trigger mem-to-mem merge. + * - All of them can be merged in memory. + */ + byte[] data1 = generateDataBySize(conf, 10); + byte[] data2 = generateDataBySize(conf, 20); + byte[] data3 = generateDataBySize(conf, 200); + byte[] data4 = generateDataBySize(conf, 20000); + + MapOutput mo1 = mergeManager.reserve(new InputAttemptIdentifier(0,0), data1.length, data1.length, 0); + MapOutput mo2 = mergeManager.reserve(new InputAttemptIdentifier(1,0), data2.length, data2.length, 0); + MapOutput mo3 = mergeManager.reserve(new InputAttemptIdentifier(2,0), data3.length, data3.length, 0); + MapOutput mo4 = mergeManager.reserve(new InputAttemptIdentifier(3,0), data4.length, data4.length, 0); + + assertEquals(MapOutput.Type.MEMORY, mo1.getType()); + assertEquals(MapOutput.Type.MEMORY, mo2.getType()); + assertEquals(MapOutput.Type.MEMORY, mo3.getType()); + assertEquals(MapOutput.Type.MEMORY, mo4.getType()); + assertEquals(0, mergeManager.getCommitMemory()); + + //size should be ~20230. + assertEquals(data1.length + data2.length + data3.length + data4.length, + mergeManager.getUsedMemory()); + + + System.arraycopy(data1, 0, mo1.getMemory(), 0, data1.length); + System.arraycopy(data2, 0, mo2.getMemory(), 0, data2.length); + System.arraycopy(data3, 0, mo3.getMemory(), 0, data3.length); + System.arraycopy(data4, 0, mo4.getMemory(), 0, data4.length); + + //Committing 3 segments should trigger mem-to-mem merge + mo1.commit(); + mo2.commit(); + mo3.commit(); + mo4.commit(); + + //Wait for mem-to-mem to complete + mergeManager.waitForMemToMemMerge(); + + assertEquals(1, mergeManager.inMemoryMergedMapOutputs.size()); + assertEquals(1, mergeManager.inMemoryMapOutputs.size()); + + mergeManager.close(); + + + /** + * Test #2 + * - Have 4 segments where all of them can fit into memory, but one of + * them would be big enough that it can not be fit in memory during + * mem-to-mem merging. + * + * - After 3 segment commits, it would trigger mem-to-mem merge. + * - Smaller segments which can be fit in additional memory allocated gets + * merged. + */ + mergeManager = + new MergeManager(conf, localFs, localDirAllocator, inputContext, null, null, null, null, + exceptionReporter, 2000000, null, false, -1); + mergeManager.configureAndStart(); + + //Single shuffle limit is 25% of 2000000 + data1 = generateDataBySize(conf, 10); + data2 = generateDataBySize(conf, 400000); + data3 = generateDataBySize(conf, 400000); + data4 = generateDataBySize(conf, 400000); + + mo1 = mergeManager.reserve(new InputAttemptIdentifier(0,0), data1.length, data1.length, 0); + mo2 = mergeManager.reserve(new InputAttemptIdentifier(1,0), data2.length, data2.length, 0); + mo3 = mergeManager.reserve(new InputAttemptIdentifier(2,0), data3.length, data3.length, 0); + mo4 = mergeManager.reserve(new InputAttemptIdentifier(3,0), data4.length, data4.length, 0); + + assertEquals(MapOutput.Type.MEMORY, mo1.getType()); + assertEquals(MapOutput.Type.MEMORY, mo2.getType()); + assertEquals(MapOutput.Type.MEMORY, mo3.getType()); + assertEquals(MapOutput.Type.MEMORY, mo4.getType()); + assertEquals(0, mergeManager.getCommitMemory()); + + assertEquals(data1.length + data2.length + data3.length + data4.length, + mergeManager.getUsedMemory()); + + System.arraycopy(data1, 0, mo1.getMemory(), 0, data1.length); + System.arraycopy(data2, 0, mo2.getMemory(), 0, data2.length); + System.arraycopy(data3, 0, mo3.getMemory(), 0, data3.length); + System.arraycopy(data4, 0, mo4.getMemory(), 0, data4.length); + + //Committing 3 segments should trigger mem-to-mem merge + mo1.commit(); + mo2.commit(); + mo3.commit(); + mo4.commit(); + + //Wait for mem-to-mem to complete + mergeManager.waitForMemToMemMerge(); + + /** + * Already all segments are in memory which is around 120000. It + * would not be able to allocate more than 800000 for mem-to-mem. So it + * would pick up only 2 small segments which can be accomodated within + * 800000. + */ + assertEquals(1, mergeManager.inMemoryMergedMapOutputs.size()); + assertEquals(2, mergeManager.inMemoryMapOutputs.size()); + + mergeManager.close(); + + /** + * Test #3 + * - Set number of segments for merging to 4. + * - Have 4 in-memory segments of size 400000 each + * - Committing 4 segments would trigger mem-to-mem + * - But none of them can be merged as there is no enough head room for + * merging in memory. + */ + mergeManager = + new MergeManager(conf, localFs, localDirAllocator, inputContext, null, null, null, null, + exceptionReporter, 2000000, null, false, -1); + mergeManager.configureAndStart(); + + //Single shuffle limit is 25% of 2000000 + data1 = generateDataBySize(conf, 400000); + data2 = generateDataBySize(conf, 400000); + data3 = generateDataBySize(conf, 400000); + data4 = generateDataBySize(conf, 400000); + + mo1 = mergeManager.reserve(new InputAttemptIdentifier(0,0), data1.length, data1.length, 0); + mo2 = mergeManager.reserve(new InputAttemptIdentifier(1,0), data2.length, data2.length, 0); + mo3 = mergeManager.reserve(new InputAttemptIdentifier(2,0), data3.length, data3.length, 0); + mo4 = mergeManager.reserve(new InputAttemptIdentifier(3,0), data4.length, data4.length, 0); + + assertEquals(MapOutput.Type.MEMORY, mo1.getType()); + assertEquals(MapOutput.Type.MEMORY, mo2.getType()); + assertEquals(MapOutput.Type.MEMORY, mo3.getType()); + assertEquals(MapOutput.Type.MEMORY, mo4.getType()); + assertEquals(0, mergeManager.getCommitMemory()); + + assertEquals(data1.length + data2.length + data3.length + data4.length, + mergeManager.getUsedMemory()); + + System.arraycopy(data1, 0, mo1.getMemory(), 0, data1.length); + System.arraycopy(data2, 0, mo2.getMemory(), 0, data2.length); + System.arraycopy(data3, 0, mo3.getMemory(), 0, data3.length); + System.arraycopy(data4, 0, mo4.getMemory(), 0, data4.length); + + //Committing 3 segments should trigger mem-to-mem merge + mo1.commit(); + mo2.commit(); + mo3.commit(); + mo4.commit(); + + //Wait for mem-to-mem to complete + mergeManager.waitForMemToMemMerge(); + + // None of them can be merged as new mem needed for mem-to-mem can't + // accomodate any segements + assertEquals(0, mergeManager.inMemoryMergedMapOutputs.size()); + assertEquals(4, mergeManager.inMemoryMapOutputs.size()); + + mergeManager.close(); + + } + + private byte[] generateDataBySize(Configuration conf, int rawLen) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + FSDataOutputStream fsdos = new FSDataOutputStream(baos, null); + IFile.Writer writer = + new IFile.Writer(conf, fsdos, IntWritable.class, IntWritable.class, null, null, null); + int i = 0; + while(true) { + writer.append(new IntWritable(i), new IntWritable(i)); + i++; + if (writer.getRawLength() > rawLen) { + break; + } + } + writer.close(); + int compressedLength = (int)writer.getCompressedLength(); + int rawLength = (int)writer.getRawLength(); + byte[] data = new byte[rawLength]; + ShuffleUtils.shuffleToMemory(data, new ByteArrayInputStream(baos.toByteArray()), + rawLength, compressedLength, null, false, 0, LOG, "sometask"); + return data; + } + private byte[] generateData(Configuration conf, int numEntries) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); FSDataOutputStream fsdos = new FSDataOutputStream(baos, null);
