Repository: tez Updated Branches: refs/heads/branch-0.5 008ad30e7 -> e440b7482
TEZ-2734. Add a test to verify the filename generated by OnDiskMerge. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e440b748 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e440b748 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e440b748 Branch: refs/heads/branch-0.5 Commit: e440b7482891e4279212f75b1245a40d0add1bd7 Parents: 008ad30 Author: Siddharth Seth <[email protected]> Authored: Fri Aug 21 17:13:26 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Fri Aug 21 17:13:26 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../orderedgrouped/TestMergeManager.java | 138 +++++++++++++++++++ 2 files changed, 139 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/e440b748/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b7f935c..f3af88a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES TEZ-2552. CRC errors can cause job to run for very long time in large jobs. ALL CHANGES: + TEZ-2734. Add a test to verify the filename generated by OnDiskMerge TEZ-2687. ATS History shutdown happens before the min-held containers are released TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters TEZ-2719. Consider reducing logs in unordered fetcher with shared-fetch option http://git-wip-us.apache.org/repos/asf/tez/blob/e440b748/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java index 7615ba7..69cc7ca 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java @@ -18,6 +18,9 @@ package org.apache.tez.runtime.library.common.shuffle.orderedgrouped; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -189,6 +192,141 @@ public class TestMergeManager { } + @Test(timeout = 10000) + public void testOnDiskMergerFilenames() throws IOException, InterruptedException { + Configuration conf = new TezConfiguration(defaultConf); + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, false); + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, IntWritable.class.getName()); + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, IntWritable.class.getName()); + + Path localDir = new Path(workDir, "local"); + Path srcDir = new Path(workDir, "srcData"); + localFs.mkdirs(localDir); + localFs.mkdirs(srcDir); + + conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDir.toString()); + + FileSystem localFs = FileSystem.getLocal(conf); + + LocalDirAllocator localDirAllocator = + new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS); + InputContext inputContext = createMockInputContext(UUID.randomUUID().toString()); + + ExceptionReporter exceptionReporter = mock(ExceptionReporter.class); + + MergeManager mergeManagerReal = + new MergeManager(conf, localFs, localDirAllocator, inputContext, null, null, null, null, + exceptionReporter, 1 * 1024l * 1024l, null, false, -1); + MergeManager mergeManager = spy(mergeManagerReal); + + // Partition 0 Keys 0-2, Partition 1 Keys 3-5 + SrcFileInfo file1Info = + createFile(conf, localFs, new Path(srcDir, InputAttemptIdentifier.PATH_PREFIX + "src1.out"), + 2, 3, 6); + + SrcFileInfo file2Info = + createFile(conf, localFs, new Path(srcDir, InputAttemptIdentifier.PATH_PREFIX + "src2.out"), + 2, 3, 0); + + InputAttemptIdentifier iIdentifier1 = + new InputAttemptIdentifier(0, 0, file1Info.path.getName()); + InputAttemptIdentifier iIdentifier2 = + new InputAttemptIdentifier(1, 0, file2Info.path.getName()); + + MapOutput mapOutput1 = + getMapOutputForDirectDiskFetch(iIdentifier1, file1Info.path, file1Info.indexedRecords[0], + mergeManager); + MapOutput mapOutput2 = + getMapOutputForDirectDiskFetch(iIdentifier2, file2Info.path, file2Info.indexedRecords[0], + mergeManager); + + mapOutput1.commit(); + mapOutput2.commit(); + verify(mergeManager).closeOnDiskFile(mapOutput1.getOutputPath()); + verify(mergeManager).closeOnDiskFile(mapOutput2.getOutputPath()); + + List<FileChunk> mergeFiles = new LinkedList<FileChunk>(); + mergeFiles.addAll(mergeManager.onDiskMapOutputs); + mergeManager.onDiskMapOutputs.clear(); + + mergeManager.onDiskMerger.merge(mergeFiles); + Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size()); + + FileChunk fcMerged1 = mergeManager.onDiskMapOutputs.iterator().next(); + Path m1Path = fcMerged1.getPath(); + assertTrue(m1Path.toString().endsWith("merged0")); + + // Add another file. Make sure the filename is different, and does not get clobbered. + + SrcFileInfo file3Info = + createFile(conf, localFs, new Path(srcDir, InputAttemptIdentifier.PATH_PREFIX + "src3.out"), + 2, 22, 5); + InputAttemptIdentifier iIdentifier3 = + new InputAttemptIdentifier(2, 0, file1Info.path.getName()); + MapOutput mapOutput3 = + getMapOutputForDirectDiskFetch(iIdentifier3, file3Info.path, file3Info.indexedRecords[0], + mergeManager); + mapOutput3.commit(); + verify(mergeManager).closeOnDiskFile(mapOutput3.getOutputPath()); + + mergeFiles = new LinkedList<FileChunk>(); + mergeFiles.addAll(mergeManager.onDiskMapOutputs); + mergeManager.onDiskMapOutputs.clear(); + + mergeManager.onDiskMerger.merge(mergeFiles); + Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size()); + + FileChunk fcMerged2 = mergeManager.onDiskMapOutputs.iterator().next(); + Path m2Path = fcMerged2.getPath(); + assertTrue(m2Path.toString().endsWith("merged1")); + assertNotEquals(m1Path, m2Path); + + // Add another file. This time add it to the head of the list. + SrcFileInfo file4Info = + createFile(conf, localFs, new Path(srcDir, InputAttemptIdentifier.PATH_PREFIX + "src4.out"), + 2, 45, 35); + InputAttemptIdentifier iIdentifier4 = + new InputAttemptIdentifier(3, 0, file4Info.path.getName()); + MapOutput mapOutput4 = + getMapOutputForDirectDiskFetch(iIdentifier4, file4Info.path, file4Info.indexedRecords[0], + mergeManager); + mapOutput4.commit(); + verify(mergeManager).closeOnDiskFile(mapOutput4.getOutputPath()); + + // Add in reverse order this time. + List<FileChunk> tmpList = new LinkedList<FileChunk>(); + mergeFiles = new LinkedList<FileChunk>(); + assertEquals(2, mergeManager.onDiskMapOutputs.size()); + tmpList.addAll(mergeManager.onDiskMapOutputs); + mergeFiles.add(tmpList.get(1)); + mergeFiles.add(tmpList.get(0)); + + mergeManager.onDiskMapOutputs.clear(); + + mergeManager.onDiskMerger.merge(mergeFiles); + Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size()); + + FileChunk fcMerged3 = mergeManager.onDiskMapOutputs.iterator().next(); + Path m3Path = fcMerged3.getPath(); + + assertTrue(m3Path.toString().endsWith("merged2")); + assertNotEquals(m2Path, m3Path); + + // Ensure the lengths are the same - since the source file names are the same. No append happening. + assertEquals(m1Path.toString().length(), m2Path.toString().length()); + assertEquals(m2Path.toString().length(), m3Path.toString().length()); + + // Ensure the filenames are used correctly - based on the first file given to the merger. + String m1Prefix = m1Path.toString().substring(0, m1Path.toString().indexOf(".")); + String m2Prefix = m2Path.toString().substring(0, m2Path.toString().indexOf(".")); + String m3Prefix = m3Path.toString().substring(0, m3Path.toString().indexOf(".")); + + assertEquals(m1Prefix, m2Prefix); + assertNotEquals(m1Prefix, m3Prefix); + assertNotEquals(m2Prefix, m3Prefix); + + } + private InputContext createMockInputContext(String uniqueId) { InputContext inputContext = mock(InputContext.class); doReturn(new TezCounters()).when(inputContext).getCounters();
