Repository: tez Updated Branches: refs/heads/master 12695f3d0 -> 3e409ae0e
TEZ-3147. Intermediate mem-to-mem: Fix early exit when only one segment can fit into memory (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3e409ae0 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3e409ae0 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3e409ae0 Branch: refs/heads/master Commit: 3e409ae0ee7233b4cf631cac1bc366679a08b7d1 Parents: 12695f3 Author: Rajesh Balamohan <[email protected]> Authored: Sat Feb 27 10:57:09 2016 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Sat Feb 27 10:57:09 2016 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../shuffle/orderedgrouped/MergeManager.java | 8 ++- .../orderedgrouped/TestMergeManager.java | 59 ++++++++++++++++++++ 3 files changed, 67 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/3e409ae0/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 706a305..f3efab7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES TEZ-3029. Add an onError method to service plugin contexts. ALL CHANGES: + TEZ-3147. Intermediate mem-to-mem: Fix early exit when only one segment can fit into memory TEZ-3141. mapreduce.task.timeout is not translated to container heartbeat timeout TEZ-3128. Avoid stopping containers on the AM shutdown thread. TEZ-3129. Tez task and task attempt UI needs application fails with NotFoundException http://git-wip-us.apache.org/repos/asf/tez/blob/3e409ae0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java index 6c60a80..0b0f6b6 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java @@ -654,6 +654,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { synchronized (manager) { Iterator<MapOutput> it = inputs.iterator(); + MapOutput lastAddedMapOutput = null; while(it.hasNext() && !Thread.currentThread().isInterrupted()) { MapOutput mo = it.next(); if ((mergeOutputSize + mo.getSize() + manager.getUsedMemory()) > memoryLimit) { @@ -672,6 +673,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { mo.getAttemptIdentifier(), mo.getMemory(), 0, mo.getMemory().length); inMemorySegments.add(new Segment(reader, true, (mo.isPrimaryMapOutput() ? mergedMapOutputsCounter : null))); + lastAddedMapOutput = mo; it.remove(); LOG.debug("Added segment for merging. mergeOutputSize=" + mergeOutputSize); } @@ -680,8 +682,12 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { //Add any unused MapOutput back inMemoryMapOutputs.addAll(inputs); + //Exit early, if 0 or 1 segment is available if (inMemorySegments.size() <= 1) { - return; //no need to proceed further. + if (lastAddedMapOutput != null) { + inMemoryMapOutputs.add(lastAddedMapOutput); + } + return; } mergedMapOutputs = unconditionalReserve(dummyMapId, mergeOutputSize, false); http://git-wip-us.apache.org/repos/asf/tez/blob/3e409ae0/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java index 4112b99..c84794d 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java @@ -423,6 +423,65 @@ public class TestMergeManager { mergeManager.close(); + /** + * Test #4 + * - Set number of segments for merging to 4. + * - Have 4 in-memory segments of size {490000,490000,490000,230000} + * - Committing 4 segments would trigger mem-to-mem + * - But only 300000 can fit into memory. This should not be + * merged as there is no point in merging single segment. It should be + * added back to the inMemorySegments + */ + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMTOMEM_SEGMENTS, 4); + mergeManager = + new MergeManager(conf, localFs, localDirAllocator, inputContext, null, null, null, null, + exceptionReporter, 2000000, null, false, -1); + mergeManager.configureAndStart(); + + //Single shuffle limit is 25% of 2000000 + data1 = generateDataBySize(conf, 490000); + data2 = generateDataBySize(conf, 490000); + data3 = generateDataBySize(conf, 490000); + data4 = generateDataBySize(conf, 230000); + + mo1 = mergeManager.reserve(new InputAttemptIdentifier(0,0), data1.length, data1.length, 0); + mo2 = mergeManager.reserve(new InputAttemptIdentifier(1,0), data2.length, data2.length, 0); + mo3 = mergeManager.reserve(new InputAttemptIdentifier(2,0), data3.length, data3.length, 0); + mo4 = mergeManager.reserve(new InputAttemptIdentifier(3,0), data4.length, data4.length, 0); + + assertTrue(mergeManager.getUsedMemory() >= (490000 + 490000 + 490000 + 23000)); + + assertEquals(MapOutput.Type.MEMORY, mo1.getType()); + assertEquals(MapOutput.Type.MEMORY, mo2.getType()); + assertEquals(MapOutput.Type.MEMORY, mo3.getType()); + assertEquals(MapOutput.Type.MEMORY, mo4.getType()); + assertEquals(0, mergeManager.getCommitMemory()); + + assertEquals(data1.length + data2.length + data3.length + data4.length, + mergeManager.getUsedMemory()); + + System.arraycopy(data1, 0, mo1.getMemory(), 0, data1.length); + System.arraycopy(data2, 0, mo2.getMemory(), 0, data2.length); + System.arraycopy(data3, 0, mo3.getMemory(), 0, data3.length); + System.arraycopy(data4, 0, mo4.getMemory(), 0, data4.length); + + //Committing 4 segments should trigger mem-to-mem merge + mo1.commit(); + mo2.commit(); + mo3.commit(); + mo4.commit(); + + //4 segments were there originally in inMemoryMapOutput. + int numberOfMapOutputs = 4; + + //Wait for mem-to-mem to complete. Since only 1 segment (230000) can fit + //into memory, it should return early + mergeManager.waitForMemToMemMerge(); + + //Check if inMemorySegment has got the MapOutput back for merging later + assertEquals(numberOfMapOutputs, mergeManager.inMemoryMapOutputs.size()); + + mergeManager.close(); } private byte[] generateDataBySize(Configuration conf, int rawLen) throws IOException {
