Repository: tez Updated Branches: refs/heads/master 7e3d5461c -> a2c590bcb
TEZ-3103. Shuffle can hang when memory to memory merging enabled (jlowe) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a2c590bc Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a2c590bc Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a2c590bc Branch: refs/heads/master Commit: a2c590bcb00de093e8365c4c423d5014777cfacf Parents: 7e3d546 Author: Jason Lowe <[email protected]> Authored: Fri Feb 12 18:19:29 2016 +0000 Committer: Jason Lowe <[email protected]> Committed: Fri Feb 12 18:19:29 2016 +0000 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../shuffle/orderedgrouped/MergeManager.java | 21 ++++++ .../orderedgrouped/TestMergeManager.java | 74 ++++++++++++++++++++ 3 files changed, 97 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/a2c590bc/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 8cb7505..5f09280 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.8.3: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3103. Shuffle can hang when memory to memory merging enabled TEZ-3107. tez-tools: Log warn msgs in case ATS has wrong values (e.g startTime > finishTime). TEZ-3104. Tez fails on Bzip2 intermediate output format on hadoop 2.7.1 and earlier TEZ-3090. MRInput should make dagIdentifier, vertexIdentifier, etc available to the InputFormat jobConf. @@ -331,6 +332,7 @@ INCOMPATIBLE CHANGES TEZ-2949. Allow duplicate dag names within session for Tez. ALL CHANGES + TEZ-3103. Shuffle can hang when memory to memory merging enabled TEZ-3104. Tez fails on Bzip2 intermediate output format on hadoop 2.7.1 and earlier TEZ-3093. CriticalPathAnalyzer should be accessible via zeppelin. TEZ-3089. TaskConcurrencyAnalyzer can return negative task count with very large jobs. http://git-wip-us.apache.org/repos/asf/tez/blob/a2c590bc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java index dfa509f..b56a9a8 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java @@ -500,6 +500,12 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { LOG.info("closeInMemoryMergedFile -> size: " + mapOutput.getSize() + ", inMemoryMergedMapOutputs.size() -> " + inMemoryMergedMapOutputs.size()); + + commitMemory += mapOutput.getSize(); + + if (commitMemory >= mergeThreshold) { + startMemToDiskMerge(); + } } @Override @@ -1155,4 +1161,19 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { comparator, progressable, spilledRecordsCounter, null, additionalBytesRead, null); } + + @VisibleForTesting + long getCommitMemory() { + return commitMemory; + } + + @VisibleForTesting + long getUsedMemory() { + return usedMemory; + } + + @VisibleForTesting + void waitForMemToMemMerge() throws InterruptedException { + memToMemMerger.waitForMerge(); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/a2c590bc/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java index b8f99de..c62c116 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java @@ -29,12 +29,15 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.LinkedList; import java.util.List; import java.util.UUID; import com.google.common.collect.Sets; + import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.slf4j.Logger; @@ -52,6 +55,7 @@ import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; +import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.runtime.library.common.sort.impl.IFile; import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord; import org.junit.After; @@ -174,6 +178,76 @@ public class TestMergeManager { Assert.assertTrue(mergeManager.postMergeMemLimit == initialMemoryAvailable); } + @Test(timeout=20000) + public void testIntermediateMemoryMergeAccounting() throws Exception { + Configuration conf = new TezConfiguration(defaultConf); + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, false); + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, IntWritable.class.getName()); + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, IntWritable.class.getName()); + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM, true); + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMTOMEM_SEGMENTS, 2); + + Path localDir = new Path(workDir, "local"); + Path srcDir = new Path(workDir, "srcData"); + localFs.mkdirs(localDir); + localFs.mkdirs(srcDir); + + conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDir.toString()); + + FileSystem localFs = FileSystem.getLocal(conf); + LocalDirAllocator localDirAllocator = + new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS); + InputContext inputContext = createMockInputContext(UUID.randomUUID().toString()); + + ExceptionReporter exceptionReporter = mock(ExceptionReporter.class); + + MergeManager mergeManager = + new MergeManager(conf, localFs, localDirAllocator, inputContext, null, null, null, null, + exceptionReporter, 2000000, null, false, -1); + mergeManager.configureAndStart(); + + assertEquals(0, mergeManager.getUsedMemory()); + assertEquals(0, mergeManager.getCommitMemory()); + + byte[] data1 = generateData(conf, 10); + byte[] data2 = generateData(conf, 20); + MapOutput firstMapOutput = mergeManager.reserve(null, data1.length, data1.length, 0); + MapOutput secondMapOutput = mergeManager.reserve(null, data2.length, data2.length, 0); + assertEquals(MapOutput.Type.MEMORY, firstMapOutput.getType()); + assertEquals(MapOutput.Type.MEMORY, secondMapOutput.getType()); + assertEquals(0, mergeManager.getCommitMemory()); + assertEquals(data1.length + data2.length, mergeManager.getUsedMemory()); + + System.arraycopy(data1, 0, firstMapOutput.getMemory(), 0, data1.length); + System.arraycopy(data2, 0, secondMapOutput.getMemory(), 0, data2.length); + + secondMapOutput.commit(); + assertEquals(data2.length, mergeManager.getCommitMemory()); + assertEquals(data1.length + data2.length, mergeManager.getUsedMemory()); + firstMapOutput.commit(); + + mergeManager.waitForMemToMemMerge(); + assertEquals(data1.length + data2.length, mergeManager.getCommitMemory()); + assertEquals(data1.length + data2.length, mergeManager.getUsedMemory()); + } + + private byte[] generateData(Configuration conf, int numEntries) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + FSDataOutputStream fsdos = new FSDataOutputStream(baos, null); + IFile.Writer writer = + new IFile.Writer(conf, fsdos, IntWritable.class, IntWritable.class, null, null, null); + for (int i = 0; i < numEntries; ++i) { + writer.append(new IntWritable(i), new IntWritable(i)); + } + writer.close(); + int compressedLength = (int)writer.getCompressedLength(); + int rawLength = (int)writer.getRawLength(); + byte[] data = new byte[rawLength]; + ShuffleUtils.shuffleToMemory(data, new ByteArrayInputStream(baos.toByteArray()), + rawLength, compressedLength, null, false, 0, LOG, "sometask"); + return data; + } + class InterruptingThread implements Runnable { MergeManager.OnDiskMerger mergeThread;
