Repository: tez Updated Branches: refs/heads/master f9942ccf6 -> c17d4a0a4
TEZ-2734. Add a test to verify the filename generated by OnDiskMerge. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c17d4a0a Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c17d4a0a Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c17d4a0a Branch: refs/heads/master Commit: c17d4a0a441477caed06d61d2ebe868946ff71a6 Parents: f9942cc Author: Siddharth Seth <[email protected]> Authored: Fri Aug 21 17:05:58 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Fri Aug 21 17:05:58 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 4 + .../orderedgrouped/TestMergeManager.java | 140 ++++++++++++++++++- 2 files changed, 143 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/c17d4a0a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index fc61bbf..89c7cb0 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -10,6 +10,7 @@ INCOMPATIBLE CHANGES TEZ-2468. Change the minimum Java version to Java 7. ALL CHANGES: + TEZ-2734. Add a test to verify the filename generated by OnDiskMerge. TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort buffers TEZ-2687. ATS History shutdown happens before the min-held containers are released TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters @@ -76,6 +77,7 @@ Release 0.7.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2734. Add a test to verify the filename generated by OnDiskMerge. TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort buffers TEZ-2687. ATS History shutdown happens before the min-held containers are released TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters @@ -300,6 +302,7 @@ Release 0.6.3: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2734. Add a test to verify the filename generated by OnDiskMerge. TEZ-2687. ATS History shutdown happens before the min-held containers are released TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters TEZ-2630. TezChild receives IP address instead of FQDN. @@ -509,6 +512,7 @@ INCOMPATIBLE CHANGES TEZ-2552. CRC errors can cause job to run for very long time in large jobs. ALL CHANGES: + TEZ-2734. Add a test to verify the filename generated by OnDiskMerge. TEZ-2687. ATS History shutdown happens before the min-held containers are released TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters TEZ-2630. TezChild receives IP address instead of FQDN. http://git-wip-us.apache.org/repos/asf/tez/blob/c17d4a0a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java index 0faa22a..f3b8e99 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java @@ -18,6 +18,9 @@ package org.apache.tez.runtime.library.common.shuffle.orderedgrouped; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -43,7 +46,6 @@ import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; -import org.apache.tez.runtime.library.common.Constants; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; import org.apache.tez.runtime.library.common.sort.impl.IFile; import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord; @@ -190,6 +192,141 @@ public class TestMergeManager { testLocalDiskMergeMultipleTasks(true); } + @Test(timeout = 10000) + public void testOnDiskMergerFilenames() throws IOException, InterruptedException { + Configuration conf = new TezConfiguration(defaultConf); + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, false); + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, IntWritable.class.getName()); + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, IntWritable.class.getName()); + + Path localDir = new Path(workDir, "local"); + Path srcDir = new Path(workDir, "srcData"); + localFs.mkdirs(localDir); + localFs.mkdirs(srcDir); + + conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDir.toString()); + + FileSystem localFs = FileSystem.getLocal(conf); + + LocalDirAllocator localDirAllocator = + new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS); + InputContext inputContext = createMockInputContext(UUID.randomUUID().toString()); + + ExceptionReporter exceptionReporter = mock(ExceptionReporter.class); + + MergeManager mergeManagerReal = + new MergeManager(conf, localFs, localDirAllocator, inputContext, null, null, null, null, + exceptionReporter, 1 * 1024l * 1024l, null, false, -1); + MergeManager mergeManager = spy(mergeManagerReal); + + // Partition 0 Keys 0-2, Partition 1 Keys 3-5 + SrcFileInfo file1Info = + createFile(conf, localFs, new Path(srcDir, InputAttemptIdentifier.PATH_PREFIX + "src1.out"), + 2, 3, 6); + + SrcFileInfo file2Info = + createFile(conf, localFs, new Path(srcDir, InputAttemptIdentifier.PATH_PREFIX + "src2.out"), + 2, 3, 0); + + InputAttemptIdentifier iIdentifier1 = + new InputAttemptIdentifier(0, 0, file1Info.path.getName()); + InputAttemptIdentifier iIdentifier2 = + new InputAttemptIdentifier(1, 0, file2Info.path.getName()); + + MapOutput mapOutput1 = + getMapOutputForDirectDiskFetch(iIdentifier1, file1Info.path, file1Info.indexedRecords[0], + mergeManager); + MapOutput mapOutput2 = + getMapOutputForDirectDiskFetch(iIdentifier2, file2Info.path, file2Info.indexedRecords[0], + mergeManager); + + mapOutput1.commit(); + mapOutput2.commit(); + verify(mergeManager).closeOnDiskFile(mapOutput1.getOutputPath()); + verify(mergeManager).closeOnDiskFile(mapOutput2.getOutputPath()); + + List<FileChunk> mergeFiles = new LinkedList<FileChunk>(); + mergeFiles.addAll(mergeManager.onDiskMapOutputs); + mergeManager.onDiskMapOutputs.clear(); + + mergeManager.onDiskMerger.merge(mergeFiles); + Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size()); + + FileChunk fcMerged1 = mergeManager.onDiskMapOutputs.iterator().next(); + Path m1Path = fcMerged1.getPath(); + assertTrue(m1Path.toString().endsWith("merged0")); + + // Add another file. Make sure the filename is different, and does not get clobbered. + + SrcFileInfo file3Info = + createFile(conf, localFs, new Path(srcDir, InputAttemptIdentifier.PATH_PREFIX + "src3.out"), + 2, 22, 5); + InputAttemptIdentifier iIdentifier3 = + new InputAttemptIdentifier(2, 0, file1Info.path.getName()); + MapOutput mapOutput3 = + getMapOutputForDirectDiskFetch(iIdentifier3, file3Info.path, file3Info.indexedRecords[0], + mergeManager); + mapOutput3.commit(); + verify(mergeManager).closeOnDiskFile(mapOutput3.getOutputPath()); + + mergeFiles = new LinkedList<FileChunk>(); + mergeFiles.addAll(mergeManager.onDiskMapOutputs); + mergeManager.onDiskMapOutputs.clear(); + + mergeManager.onDiskMerger.merge(mergeFiles); + Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size()); + + FileChunk fcMerged2 = mergeManager.onDiskMapOutputs.iterator().next(); + Path m2Path = fcMerged2.getPath(); + assertTrue(m2Path.toString().endsWith("merged1")); + assertNotEquals(m1Path, m2Path); + + // Add another file. This time add it to the head of the list. + SrcFileInfo file4Info = + createFile(conf, localFs, new Path(srcDir, InputAttemptIdentifier.PATH_PREFIX + "src4.out"), + 2, 45, 35); + InputAttemptIdentifier iIdentifier4 = + new InputAttemptIdentifier(3, 0, file4Info.path.getName()); + MapOutput mapOutput4 = + getMapOutputForDirectDiskFetch(iIdentifier4, file4Info.path, file4Info.indexedRecords[0], + mergeManager); + mapOutput4.commit(); + verify(mergeManager).closeOnDiskFile(mapOutput4.getOutputPath()); + + // Add in reverse order this time. + List<FileChunk> tmpList = new LinkedList<>(); + mergeFiles = new LinkedList<>(); + assertEquals(2, mergeManager.onDiskMapOutputs.size()); + tmpList.addAll(mergeManager.onDiskMapOutputs); + mergeFiles.add(tmpList.get(1)); + mergeFiles.add(tmpList.get(0)); + + mergeManager.onDiskMapOutputs.clear(); + + mergeManager.onDiskMerger.merge(mergeFiles); + Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size()); + + FileChunk fcMerged3 = mergeManager.onDiskMapOutputs.iterator().next(); + Path m3Path = fcMerged3.getPath(); + + assertTrue(m3Path.toString().endsWith("merged2")); + assertNotEquals(m2Path, m3Path); + + // Ensure the lengths are the same - since the source file names are the same. No append happening. + assertEquals(m1Path.toString().length(), m2Path.toString().length()); + assertEquals(m2Path.toString().length(), m3Path.toString().length()); + + // Ensure the filenames are used correctly - based on the first file given to the merger. + String m1Prefix = m1Path.toString().substring(0, m1Path.toString().indexOf(".")); + String m2Prefix = m2Path.toString().substring(0, m2Path.toString().indexOf(".")); + String m3Prefix = m3Path.toString().substring(0, m3Path.toString().indexOf(".")); + + assertEquals(m1Prefix, m2Prefix); + assertNotEquals(m1Prefix, m3Prefix); + assertNotEquals(m2Prefix, m3Prefix); + + } + void testLocalDiskMergeMultipleTasks(boolean interruptInMiddle) throws IOException, InterruptedException { @@ -367,4 +504,5 @@ public class TestMergeManager { return MapOutput.createLocalDiskMapOutput(srcAttemptId, merger, filename, indexRecord.getStartOffset(), indexRecord.getPartLength(), true); } + }
