Repository: tez Updated Branches: refs/heads/branch-0.8 883e76fd5 -> a83479b1c
TEZ-3293. Fetch failures can cause a shuffle hang waiting for memory merge that never starts. Contributed by Jason Lowe. (cherry picked from commit 71bb2defe97e55e3bf7dbb299fe33ab8a667e7a1) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a83479b1 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a83479b1 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a83479b1 Branch: refs/heads/branch-0.8 Commit: a83479b1cd114a8771caa3041a1babcdeed3a975 Parents: 883e76f Author: Siddharth Seth <[email protected]> Authored: Thu Jun 30 16:13:23 2016 -0700 Committer: Siddharth Seth <[email protected]> Committed: Thu Jun 30 16:18:13 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../FetchedInputAllocatorOrderedGrouped.java | 1 + .../shuffle/orderedgrouped/InMemoryReader.java | 2 +- .../shuffle/orderedgrouped/MergeManager.java | 7 +++++- .../orderedgrouped/TestMergeManager.java | 26 ++++++++++++++++++++ 5 files changed, 36 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/a83479b1/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9e0b751..0e50032 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3293. Fetch failures can cause a shuffle hang waiting for memory merge that never starts. TEZ-3314. Double counting input bytes in MultiMRInput. TEZ-3308. Add counters to capture input split length. TEZ-3302. Add a version of processorContext.waitForAllInputsReady and waitForAnyInputReady with a timeout. @@ -471,6 +472,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3293. Fetch failures can cause a shuffle hang waiting for memory merge that never starts. TEZ-3305. TestAnalyzer fails on Hadoop 2.7. TEZ-3304. TestHistoryParser fails with Hadoop 2.7. TEZ-3296. Tez job can hang if two vertices at the same root distance have different task requirements http://git-wip-us.apache.org/repos/asf/tez/blob/a83479b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java index 7276f74..e145632 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java @@ -36,4 +36,5 @@ public interface FetchedInputAllocatorOrderedGrouped { void unreserve(long bytes); + void releaseCommittedMemory(long bytes); } http://git-wip-us.apache.org/repos/asf/tez/blob/a83479b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java index 7860377..12fe057 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java @@ -258,7 +258,7 @@ public class InMemoryReader extends Reader { buffer = null; // Inform the MergeManager if (merger != null) { - merger.unreserve(bufferSize); + merger.releaseCommittedMemory(bufferSize); } } } http://git-wip-us.apache.org/repos/asf/tez/blob/a83479b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java index 9e2fbd4..26bdca7 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java @@ -451,7 +451,6 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { @Override public synchronized void unreserve(long size) { - commitMemory -= size; usedMemory -= size; if (LOG.isDebugEnabled()) { LOG.debug("Notifying unreserve : size=" + size + ", commitMemory=" + commitMemory + ", usedMemory=" + usedMemory @@ -461,6 +460,12 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { } @Override + public synchronized void releaseCommittedMemory(long size) { + commitMemory -= size; + unreserve(size); + } + + @Override public synchronized void closeInMemoryFile(MapOutput mapOutput) { inMemoryMapOutputs.add(mapOutput); LOG.info("closeInMemoryFile -> map-output of size: " + mapOutput.getSize() http://git-wip-us.apache.org/repos/asf/tez/blob/a83479b1/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java index 9eb4cae..9209ff4 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java @@ -178,6 +178,32 @@ public class TestMergeManager { Assert.assertTrue(mergeManager.postMergeMemLimit == initialMemoryAvailable); } + @Test(timeout = 10000) + public void testReservationAccounting() throws IOException { + Configuration conf = new TezConfiguration(defaultConf); + FileSystem localFs = FileSystem.getLocal(conf); + InputContext inputContext = createMockInputContext(UUID.randomUUID().toString()); + MergeManager mergeManager = + new MergeManager(conf, localFs, null, inputContext, null, null, null, null, + mock(ExceptionReporter.class), 2000000, null, false, -1); + mergeManager.configureAndStart(); + assertEquals(0, mergeManager.getUsedMemory()); + assertEquals(0, mergeManager.getCommitMemory()); + MapOutput mapOutput = mergeManager.reserve(null, 1, 1, 0); + assertEquals(1, mergeManager.getUsedMemory()); + assertEquals(0, mergeManager.getCommitMemory()); + mapOutput.abort(); + assertEquals(0, mergeManager.getUsedMemory()); + assertEquals(0, mergeManager.getCommitMemory()); + mapOutput = mergeManager.reserve(null, 2, 2, 0); + mergeManager.closeInMemoryFile(mapOutput); + assertEquals(2, mergeManager.getUsedMemory()); + assertEquals(2, mergeManager.getCommitMemory()); + mergeManager.releaseCommittedMemory(2); + assertEquals(0, mergeManager.getUsedMemory()); + assertEquals(0, mergeManager.getCommitMemory()); + } + @Test(timeout=20000) public void testIntermediateMemoryMergeAccounting() throws Exception { Configuration conf = new TezConfiguration(defaultConf);
