Repository: tez Updated Branches: refs/heads/master 2af886b50 -> 91e24d7c6
TEZ-2756. MergeManager close should not try merging files on close if invoked after a shuffle exception (Tsuyoshi Ozawa via rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/91e24d7c Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/91e24d7c Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/91e24d7c Branch: refs/heads/master Commit: 91e24d7c65b150150125992095268aed77adc386 Parents: 2af886b Author: Rajesh Balamohan <[email protected]> Authored: Fri Mar 4 11:28:15 2016 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Fri Mar 4 11:29:08 2016 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../shuffle/orderedgrouped/MergeManager.java | 31 +++++----- .../common/shuffle/orderedgrouped/Shuffle.java | 4 +- .../orderedgrouped/TestMergeManager.java | 64 ++++++++++++++++++-- 4 files changed, 80 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/91e24d7c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0c6cacc..e84dc5a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES TEZ-3029. Add an onError method to service plugin contexts. ALL CHANGES: + TEZ-2756. MergeManager close should not try merging files on close if invoked after a shuffle exception. TEZ-3156. Tez client keeps trying to talk to RM even if RM does not know about the application. TEZ-3115. Shuffle string handling adds significant memory overhead TEZ-3151. Expose DAG credentials to plugins. http://git-wip-us.apache.org/repos/asf/tez/blob/91e24d7c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java index 0b0f6b6..4c999b4 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java @@ -553,9 +553,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { return finalMergeComplete; } - public TezRawKeyValueIterator close() throws Throwable { - // TODO TEZ-2756. Don't attempt a final merge if close is invoked as a result of a previous - // shuffle exception / error. + public TezRawKeyValueIterator close(boolean tryFinalMerge) throws Throwable { if (!isShutdown.getAndSet(true)) { // Wait for on-going merges to complete if (memToMemMerger != null) { @@ -571,18 +569,23 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { inMemoryMapOutputs.clear(); List<FileChunk> disk = new ArrayList<FileChunk>(onDiskMapOutputs); onDiskMapOutputs.clear(); - try { - TezRawKeyValueIterator kvIter = finalMerge(conf, rfs, memory, disk); - this.finalMergeComplete = true; - return kvIter; - } catch (InterruptedException e) { - //Cleanup the disk segments - if (cleanup) { - cleanup(localFS, disk); - cleanup(localFS, onDiskMapOutputs); + + // Don't attempt a final merge if close is invoked as a result of a previous + // shuffle exception / error. + if (tryFinalMerge) { + try { + TezRawKeyValueIterator kvIter = finalMerge(conf, rfs, memory, disk); + this.finalMergeComplete = true; + return kvIter; + } catch (InterruptedException e) { + //Cleanup the disk segments + if (cleanup) { + cleanup(localFS, disk); + cleanup(localFS, onDiskMapOutputs); + } + Thread.currentThread().interrupt(); //reset interrupt status + throw e; } - Thread.currentThread().interrupt(); //reset interrupt status - throw e; } } return null; http://git-wip-us.apache.org/repos/asf/tez/blob/91e24d7c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java index fa66b7e..f40c49a 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java @@ -310,7 +310,7 @@ public class Shuffle implements ExceptionReporter { TezRawKeyValueIterator kvIter = null; inputContext.notifyProgress(); try { - kvIter = merger.close(); + kvIter = merger.close(true); } catch (Throwable e) { // Set the throwable so that future.get() sees the reported errror. throwable.set(e); @@ -351,7 +351,7 @@ public class Shuffle implements ExceptionReporter { private void cleanupMerger(boolean ignoreErrors) throws Throwable { if (!mergerClosed.getAndSet(true)) { try { - merger.close(); + merger.close(false); } catch (InterruptedException e) { if (ignoreErrors) { //Reset the status http://git-wip-us.apache.org/repos/asf/tez/blob/91e24d7c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java index c84794d..9eb4cae 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java @@ -306,7 +306,7 @@ public class TestMergeManager { assertEquals(1, mergeManager.inMemoryMergedMapOutputs.size()); assertEquals(1, mergeManager.inMemoryMapOutputs.size()); - mergeManager.close(); + mergeManager.close(true); /** @@ -367,7 +367,7 @@ public class TestMergeManager { assertEquals(1, mergeManager.inMemoryMergedMapOutputs.size()); assertEquals(2, mergeManager.inMemoryMapOutputs.size()); - mergeManager.close(); + mergeManager.close(true); /** * Test #3 @@ -421,7 +421,7 @@ public class TestMergeManager { assertEquals(0, mergeManager.inMemoryMergedMapOutputs.size()); assertEquals(4, mergeManager.inMemoryMapOutputs.size()); - mergeManager.close(); + mergeManager.close(true); /** * Test #4 @@ -481,7 +481,63 @@ public class TestMergeManager { //Check if inMemorySegment has got the MapOutput back for merging later assertEquals(numberOfMapOutputs, mergeManager.inMemoryMapOutputs.size()); - mergeManager.close(); + mergeManager.close(true); + + /** + * Test #5 + * - Same to #4, but calling mergeManager.close(false) and confirm that final merge doesn't occur. + */ + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMTOMEM_SEGMENTS, 4); + mergeManager = + new MergeManager(conf, localFs, localDirAllocator, inputContext, null, null, null, null, + exceptionReporter, 2000000, null, false, -1); + mergeManager.configureAndStart(); + + //Single shuffle limit is 25% of 2000000 + data1 = generateDataBySize(conf, 490000); + data2 = generateDataBySize(conf, 490000); + data3 = generateDataBySize(conf, 490000); + data4 = generateDataBySize(conf, 230000); + + mo1 = mergeManager.reserve(new InputAttemptIdentifier(0,0), data1.length, data1.length, 0); + mo2 = mergeManager.reserve(new InputAttemptIdentifier(1,0), data2.length, data2.length, 0); + mo3 = mergeManager.reserve(new InputAttemptIdentifier(2,0), data3.length, data3.length, 0); + mo4 = mergeManager.reserve(new InputAttemptIdentifier(3,0), data4.length, data4.length, 0); + + assertTrue(mergeManager.getUsedMemory() >= (490000 + 490000 + 490000 + 23000)); + + assertEquals(MapOutput.Type.MEMORY, mo1.getType()); + assertEquals(MapOutput.Type.MEMORY, mo2.getType()); + assertEquals(MapOutput.Type.MEMORY, mo3.getType()); + assertEquals(MapOutput.Type.MEMORY, mo4.getType()); + assertEquals(0, mergeManager.getCommitMemory()); + + assertEquals(data1.length + data2.length + data3.length + data4.length, + mergeManager.getUsedMemory()); + + System.arraycopy(data1, 0, mo1.getMemory(), 0, data1.length); + System.arraycopy(data2, 0, mo2.getMemory(), 0, data2.length); + System.arraycopy(data3, 0, mo3.getMemory(), 0, data3.length); + System.arraycopy(data4, 0, mo4.getMemory(), 0, data4.length); + + //Committing 4 segments should trigger mem-to-mem merge + mo1.commit(); + mo2.commit(); + mo3.commit(); + mo4.commit(); + + //4 segments were there originally in inMemoryMapOutput. + numberOfMapOutputs = 4; + + //Wait for mem-to-mem to complete. Since only 1 segment (230000) can fit + //into memory, it should return early + mergeManager.waitForMemToMemMerge(); + + //Check if inMemorySegment has got the MapOutput back for merging later + assertEquals(numberOfMapOutputs, mergeManager.inMemoryMapOutputs.size()); + + Assert.assertNull(mergeManager.close(false)); + Assert.assertFalse(mergeManager.isMergeComplete()); } private byte[] generateDataBySize(Configuration conf, int rawLen) throws IOException {
