Repository: tez
Updated Branches:
refs/heads/branch-0.7 6852f79aa -> 126217147
TEZ-3103. Shuffle can hang when memory to memory merging enabled (jlowe)
Conflicts:
CHANGES.txt
tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/12621714
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/12621714
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/12621714
Branch: refs/heads/branch-0.7
Commit: 1262171477cfe5ad4dfeba7d3fbcdd6d497850af
Parents: 6852f79
Author: Jason Lowe <[email protected]>
Authored: Fri Feb 12 18:34:04 2016 +0000
Committer: Jason Lowe <[email protected]>
Committed: Fri Feb 12 18:35:17 2016 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../shuffle/orderedgrouped/MergeManager.java | 21 ++++++
.../orderedgrouped/TestMergeManager.java | 73 ++++++++++++++++++++
3 files changed, 95 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/12621714/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e991566..adf23a9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES
TEZ-3107. tez-tools: Log warn msgs in case ATS has wrong values (e.g
startTime > finishTime).
+ TEZ-3103. Shuffle can hang when memory to memory merging enabled
TEZ-3104. Tez fails on Bzip2 intermediate output format on hadoop 2.7.1 and
earlier
TEZ-3093. CriticalPathAnalyzer should be accessible via zeppelin
TEZ-3089. TaskConcurrencyAnalyzer can return negative task count with very
large jobs
http://git-wip-us.apache.org/repos/asf/tez/blob/12621714/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
----------------------------------------------------------------------
diff --git
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index f5e95cb..def8175 100644
---
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -488,6 +488,12 @@ public class MergeManager {
LOG.info("closeInMemoryMergedFile -> size: " + mapOutput.getSize() +
", inMemoryMergedMapOutputs.size() -> " +
inMemoryMergedMapOutputs.size());
+
+ commitMemory += mapOutput.getSize();
+
+ if (commitMemory >= mergeThreshold) {
+ startMemToDiskMerge();
+ }
}
public FileSystem getLocalFileSystem() {
@@ -1051,4 +1057,19 @@ public class MergeManager {
comparator, progressable, spilledRecordsCounter, null,
additionalBytesRead, null);
}
+
+ @VisibleForTesting
+ long getCommitMemory() {
+ return commitMemory;
+ }
+
+ @VisibleForTesting
+ long getUsedMemory() {
+ return usedMemory;
+ }
+
+ @VisibleForTesting
+ void waitForMemToMemMerge() throws InterruptedException {
+ memToMemMerger.waitForMerge();
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/12621714/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
----------------------------------------------------------------------
diff --git
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
index a1b3003..72273e0 100644
---
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
+++
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
@@ -29,6 +29,8 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
@@ -50,6 +52,7 @@ import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.Constants;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.common.sort.impl.IFile;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.junit.After;
@@ -172,6 +175,76 @@ public class TestMergeManager {
Assert.assertTrue(mergeManager.postMergeMemLimit ==
initialMemoryAvailable);
}
+ @Test(timeout=20000)
+ public void testIntermediateMemoryMergeAccounting() throws Exception {
+ Configuration conf = new TezConfiguration(defaultConf);
+ conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, false);
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS,
IntWritable.class.getName());
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS,
IntWritable.class.getName());
+
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM,
true);
+ conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMTOMEM_SEGMENTS,
2);
+
+ Path localDir = new Path(workDir, "local");
+ Path srcDir = new Path(workDir, "srcData");
+ localFs.mkdirs(localDir);
+ localFs.mkdirs(srcDir);
+
+ conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS,
localDir.toString());
+
+ FileSystem localFs = FileSystem.getLocal(conf);
+ LocalDirAllocator localDirAllocator =
+ new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
+ InputContext inputContext =
createMockInputContext(UUID.randomUUID().toString());
+
+ ExceptionReporter exceptionReporter = mock(ExceptionReporter.class);
+
+ MergeManager mergeManager =
+ new MergeManager(conf, localFs, localDirAllocator, inputContext, null,
null, null, null,
+ exceptionReporter, 2000000, null, false, -1);
+ mergeManager.configureAndStart();
+
+ assertEquals(0, mergeManager.getUsedMemory());
+ assertEquals(0, mergeManager.getCommitMemory());
+
+ byte[] data1 = generateData(conf, 10);
+ byte[] data2 = generateData(conf, 20);
+ MapOutput firstMapOutput = mergeManager.reserve(null, data1.length,
data1.length, 0);
+ MapOutput secondMapOutput = mergeManager.reserve(null, data2.length,
data2.length, 0);
+ assertEquals(MapOutput.Type.MEMORY, firstMapOutput.getType());
+ assertEquals(MapOutput.Type.MEMORY, secondMapOutput.getType());
+ assertEquals(0, mergeManager.getCommitMemory());
+ assertEquals(data1.length + data2.length, mergeManager.getUsedMemory());
+
+ System.arraycopy(data1, 0, firstMapOutput.getMemory(), 0, data1.length);
+ System.arraycopy(data2, 0, secondMapOutput.getMemory(), 0, data2.length);
+
+ secondMapOutput.commit();
+ assertEquals(data2.length, mergeManager.getCommitMemory());
+ assertEquals(data1.length + data2.length, mergeManager.getUsedMemory());
+ firstMapOutput.commit();
+
+ mergeManager.waitForMemToMemMerge();
+ assertEquals(data1.length + data2.length, mergeManager.getCommitMemory());
+ assertEquals(data1.length + data2.length, mergeManager.getUsedMemory());
+ }
+
+ private byte[] generateData(Configuration conf, int numEntries) throws
IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ FSDataOutputStream fsdos = new FSDataOutputStream(baos, null);
+ IFile.Writer writer =
+ new IFile.Writer(conf, fsdos, IntWritable.class, IntWritable.class,
null, null, null);
+ for (int i = 0; i < numEntries; ++i) {
+ writer.append(new IntWritable(i), new IntWritable(i));
+ }
+ writer.close();
+ int compressedLength = (int)writer.getCompressedLength();
+ int rawLength = (int)writer.getRawLength();
+ byte[] data = new byte[rawLength];
+ ShuffleUtils.shuffleToMemory(data, new
ByteArrayInputStream(baos.toByteArray()),
+ rawLength, compressedLength, null, false, 0, LOG, "sometask");
+ return data;
+ }
+
@Test(timeout = 10000)
public void testLocalDiskMergeMultipleTasks() throws IOException {