TEZ-2358. Pipelined Shuffle: MergeManager assumptions about 1 merge per source-task (rbalamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/aa87a14c Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/aa87a14c Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/aa87a14c Branch: refs/heads/TEZ-2003 Commit: aa87a14c5197d5aa3ddc8e829311f1f3534fd62d Parents: b08ca37 Author: Rajesh Balamohan <[email protected]> Authored: Tue Apr 28 03:22:25 2015 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Tue Apr 28 03:22:25 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../common/shuffle/DiskFetchedInput.java | 3 +- .../shuffle/orderedgrouped/MapOutput.java | 2 +- .../shuffle/orderedgrouped/MergeManager.java | 61 ++++- .../common/sort/impl/IFileInputStream.java | 1 + .../common/task/local/output/TezTaskOutput.java | 12 +- .../task/local/output/TezTaskOutputFiles.java | 31 +-- .../apache/tez/test/TestPipelinedShuffle.java | 258 +++++++++++++++++++ 8 files changed, 334 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/aa87a14c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a5c4a57..e42a79e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly ALL CHANGES: + TEZ-2358. Pipelined Shuffle: MergeManager assumptions about 1 merge per source-task. TEZ-2342. TestFaultTolerance.testRandomFailingTasks fails due to timeout. TEZ-2362. State Change Notifier Thread should be stopped when dag is completed http://git-wip-us.apache.org/repos/asf/tez/blob/aa87a14c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/DiskFetchedInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/DiskFetchedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/DiskFetchedInput.java index e25301e..6432b55 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/DiskFetchedInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/DiskFetchedInput.java @@ -50,7 +50,8 @@ public class DiskFetchedInput extends FetchedInput { this.localFS = FileSystem.getLocal(conf); this.outputPath = filenameAllocator.getInputFileForWrite( - this.inputAttemptIdentifier.getInputIdentifier().getInputIndex(), actualSize); + this.inputAttemptIdentifier.getInputIdentifier().getInputIndex(), this + .inputAttemptIdentifier.getSpillEventId(), actualSize); // Files are not clobbered due to the id being appended to the outputPath in the tmpPath, // otherwise fetches for the same task but from different attempts would clobber each other. this.tmpOutputPath = outputPath.suffix(String.valueOf(id)); http://git-wip-us.apache.org/repos/asf/tez/blob/aa87a14c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java index e999af6..55c80aa 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java @@ -107,7 +107,7 @@ class MapOutput { IOException { FileSystem fs = FileSystem.getLocal(conf); Path outputpath = mapOutputFile.getInputFileForWrite( - attemptIdentifier.getInputIdentifier().getInputIndex(), size); + attemptIdentifier.getInputIdentifier().getInputIndex(), attemptIdentifier.getSpillEventId(), size); // Files are not clobbered due to the id being appended to the outputPath in the tmpPath, // otherwise fetches for the same task but from different attempts would clobber each other. Path tmpOuputPath = outputpath.suffix(String.valueOf(fetcher)); http://git-wip-us.apache.org/repos/asf/tez/blob/aa87a14c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java index d5f7be1..2e6ebd9 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java @@ -27,6 +27,8 @@ import java.util.TreeSet; import com.google.common.annotations.VisibleForTesting; import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.base.Preconditions; import org.apache.commons.io.FilenameUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -436,7 +438,8 @@ public class MergeManager { inMemoryMapOutputs.add(mapOutput); LOG.info("closeInMemoryFile -> map-output of size: " + mapOutput.getSize() + ", inMemoryMapOutputs.size() -> " + inMemoryMapOutputs.size() - + ", commitMemory -> " + commitMemory + ", usedMemory ->" + usedMemory); + + ", commitMemory -> " + commitMemory + ", usedMemory ->" + usedMemory + ", mapOutput=" + + mapOutput); commitMemory+= mapOutput.getSize(); @@ -476,7 +479,21 @@ public class MergeManager { } public synchronized void closeOnDiskFile(FileChunk file) { + //including only path & offset for valdiations. + for (FileChunk fileChunk : onDiskMapOutputs) { + if (fileChunk.getPath().equals(file.getPath())) { + //ensure offsets are not the same. + Preconditions.checkArgument(fileChunk.getOffset() != file.getOffset(), + "Can't have a file with same path and offset." + + "OldFilePath=" + fileChunk.getPath() + ", OldFileOffset=" + fileChunk.getOffset() + + ", newFilePath=" + file.getPath() + ", newFileOffset=" + file.getOffset()); + } + } + onDiskMapOutputs.add(file); + if (LOG.isDebugEnabled()) { + LOG.debug("close onDiskFile=" + file.getPath() + ", len=" + file.getLength()); + } synchronized (onDiskMerger) { if (!onDiskMerger.isInProgress() && @@ -623,8 +640,9 @@ public class MergeManager { // All disk writes done by this merge are overhead - due to the lac of // adequate memory to keep all segments in memory. Path outputPath = mapOutputFile.getInputFileForWrite( - srcTaskIdentifier.getInputIdentifier().getInputIndex(), + srcTaskIdentifier.getInputIdentifier().getInputIndex(), srcTaskIdentifier.getSpillEventId(), mergeOutputSize).suffix(Constants.MERGED_OUTPUT_PREFIX); + LOG.info("Patch..InMemoryMerger outputPath: " + outputPath); Writer writer = null; long outFileLen = 0; @@ -720,6 +738,11 @@ public class MergeManager { final long offset = fileChunk.getOffset(); final long size = fileChunk.getLength(); final boolean preserve = fileChunk.isLocalFile(); + if (LOG.isDebugEnabled()) { + LOG.debug("InputAttemptIdentifier=" + fileChunk.getInputAttemptIdentifier() + + ", len=" + fileChunk.getLength() + ", offset=" + fileChunk.getOffset() + + ", path=" + fileChunk.getPath()); + } final Path file = fileChunk.getPath(); approxOutputSize += size; Segment segment = new Segment(rfs, file, offset, size, codec, ifileReadAhead, @@ -737,8 +760,8 @@ public class MergeManager { if (file0.isLocalFile()) { // This is setup the same way a type DISK MapOutput is setup when fetching. namePart = mapOutputFile.getSpillFileName( - file0.getInputAttemptIdentifier().getInputIdentifier().getInputIndex()); - + file0.getInputAttemptIdentifier().getInputIdentifier().getInputIndex(), + file0.getInputAttemptIdentifier().getSpillEventId()); } else { namePart = file0.getPath().getName().toString(); } @@ -859,8 +882,18 @@ public class MergeManager { LOG.info("finalMerge called with " + inMemoryMapOutputs.size() + " in-memory map-outputs and " + onDiskMapOutputs.size() + " on-disk map-outputs"); - - + + if (LOG.isDebugEnabled()) { + for (MapOutput inMemoryMapOutput : inMemoryMapOutputs) { + LOG.debug("inMemoryOutput=" + inMemoryMapOutput + ", size=" + inMemoryMapOutput + .getSize()); + } + + for (FileChunk onDiskMapOutput : onDiskMapOutputs) { + LOG.debug("onDiskMapOutput=" + onDiskMapOutput.getPath() + ", size=" + onDiskMapOutput + .getLength()); + } + } // merge config params @@ -876,7 +909,7 @@ public class MergeManager { boolean mergePhaseFinished = false; if (inMemoryMapOutputs.size() > 0) { int srcTaskId = inMemoryMapOutputs.get(0).getAttemptIdentifier().getInputIdentifier().getInputIndex(); - inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs, + inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs, memDiskSegments, this.postMergeMemLimit); final int numMemDiskSegments = memDiskSegments.size(); @@ -892,10 +925,11 @@ public class MergeManager { mergePhaseFinished = true; // must spill to disk, but can't retain in-mem for intermediate merge + // Can not use spill id in final merge as it would clobber with other files, hence using + // Integer.MAX_VALUE final Path outputPath = - mapOutputFile.getInputFileForWrite(srcTaskId, - inMemToDiskBytes).suffix( - Constants.MERGED_OUTPUT_PREFIX); + mapOutputFile.getInputFileForWrite(srcTaskId, Integer.MAX_VALUE, + inMemToDiskBytes).suffix(Constants.MERGED_OUTPUT_PREFIX); final TezRawKeyValueIterator rIter = TezMerger.merge(job, fs, keyClass, valueClass, memDiskSegments, numMemDiskSegments, tmpDir, comparator, nullProgressable, spilledRecordsCounter, null, additionalBytesRead, null); @@ -925,12 +959,12 @@ public class MergeManager { LOG.info("Merged " + numMemDiskSegments + " segments, " + inMemToDiskBytes + " bytes to disk to satisfy " + - "reduce memory limit"); + "reduce memory limit. outputPath=" + outputPath); inMemToDiskBytes = 0; memDiskSegments.clear(); } else if (inMemToDiskBytes != 0) { LOG.info("Keeping " + numMemDiskSegments + " segments, " + - inMemToDiskBytes + " bytes in memory for " + + inMemToDiskBytes + " bytes in memory for " + "intermediate, on-disk merge"); } } @@ -942,7 +976,8 @@ public class MergeManager { for (FileChunk fileChunk : onDisk) { final long fileLength = fileChunk.getLength(); onDiskBytes += fileLength; - LOG.debug("Disk file: " + fileChunk.getPath() + " Length is " + fileLength); + LOG.info("Disk file=" + fileChunk.getPath() + ", len=" + fileLength + ", isLocal=" + + fileChunk.isLocalFile()); final Path file = fileChunk.getPath(); TezCounter counter = http://git-wip-us.apache.org/repos/asf/tez/blob/aa87a14c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java index 64c1b51..d116242 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java @@ -264,6 +264,7 @@ public class IFileInputStream extends InputStream { } if (currentOffset == dataLength) { + //TODO: add checksumSize to currentOffset. // The last four bytes are checksum. Strip them and verify sum.update(buffer, 0, offset); csum = new byte[checksumSize]; http://git-wip-us.apache.org/repos/asf/tez/blob/aa87a14c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java index e9f33af..c41e4a6 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java @@ -122,18 +122,20 @@ public abstract class TezTaskOutput { * Create a local input file name. * * @param srcIdentifier The identifier for the source - * @param size the size of the file - * @return path the path to the input file. + * @param spillNum + * @param size the size of the file @return path the path to the input file. * @throws IOException */ - public abstract Path getInputFileForWrite( - int srcIdentifier, long size) throws IOException; + public abstract Path getInputFileForWrite(int srcIdentifier, + int spillNum, long size) throws IOException; /** * Construct a spill file name, given a spill number + * + * @param srcId * @param spillNum * @return a spill file name independent of the unique identifier and local directories */ - public abstract String getSpillFileName(int spillNum); + public abstract String getSpillFileName(int srcId, int spillNum); } http://git-wip-us.apache.org/repos/asf/tez/blob/aa87a14c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java index 6382e3a..1e6fca3 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java @@ -49,7 +49,7 @@ public class TezTaskOutputFiles extends TezTaskOutput { private static final String SPILL_FILE_DIR_PATTERN = "%s_%d"; - private static final String SPILL_FILE_PATTERN = "%s_spill_%d.out"; + private static final String SPILL_FILE_PATTERN = "%s_src_%d_spill_%d.out"; /* Under YARN, this defaults to one or more of the local directories, along with the appId in the path. @@ -233,35 +233,36 @@ public class TezTaskOutputFiles extends TezTaskOutput { /** * Create a local input file name. * - * ${appDir}/${uniqueId}_spill_${spillNumber}.out - * e.g. application_1418684642047_0006/attempt_1418684642047_0006_1_00_000000_0_10004_spill_0.out + * ${appDir}/${uniqueId}_src_{$srcId}_spill_${spillNumber}.out + * e.g. application_1418684642047_0006/attempt_1418684642047_0006_1_00_000000_0_10004_src_10_spill_0.out * - * This is currently equivalent to getSpillFileForWrite. Files are not clobbered due to the uniqueId - * being different for Outputs / Inputs within the same task (and across tasks) + * Files are not clobbered due to the uniqueId along with spillId being different for Outputs / + * Inputs within the same task (and across tasks) * * @param srcIdentifier The identifier for the source - * @param size the size of the file - * @return path the path to the input file. + * @param spillNum + * @param size the size of the file @return path the path to the input file. * @throws IOException */ @Override public Path getInputFileForWrite(int srcIdentifier, - long size) throws IOException { - return lDirAlloc.getLocalPathForWrite(getSpillFileName(srcIdentifier), - size, conf); + int spillNum, long size) throws IOException { + return lDirAlloc.getLocalPathForWrite(getSpillFileName(srcIdentifier, spillNum), size, conf); } /** - * Construct a spill file name, given a spill number + * Construct a spill file name, given a spill number and src id + * + * ${uniqueId}_src_${srcId}_spill_${spillNumber}.out + * e.g. attempt_1418684642047_0006_1_00_000000_0_10004_src_10_spill_0.out * - * ${uniqueId}_spill_${spillNumber}.out - * e.g. attempt_1418684642047_0006_1_00_000000_0_10004_spill_0.out * + * @param srcId * @param spillNum * @return a spill file name independent of the unique identifier and local directories */ @Override - public String getSpillFileName(int spillNum) { - return String.format(SPILL_FILE_PATTERN, uniqueId, spillNum); + public String getSpillFileName(int srcId, int spillNum) { + return String.format(SPILL_FILE_PATTERN, uniqueId, srcId, spillNum); } } http://git-wip-us.apache.org/repos/asf/tez/blob/aa87a14c/tez-tests/src/test/java/org/apache/tez/test/TestPipelinedShuffle.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestPipelinedShuffle.java b/tez-tests/src/test/java/org/apache/tez/test/TestPipelinedShuffle.java new file mode 100644 index 0000000..2a63293 --- /dev/null +++ b/tez-tests/src/test/java/org/apache/tez/test/TestPipelinedShuffle.java @@ -0,0 +1,258 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.test; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.util.Tool; +import org.apache.tez.client.TezClient; +import org.apache.tez.common.counters.TaskCounter; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.Edge; +import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.Vertex; +import org.apache.tez.dag.api.client.DAGClient; +import org.apache.tez.dag.api.client.DAGStatus; +import org.apache.tez.dag.api.client.StatusGetOpts; +import org.apache.tez.mapreduce.processor.SimpleMRProcessor; +import org.apache.tez.runtime.api.ProcessorContext; +import org.apache.tez.runtime.library.api.KeyValueWriter; +import org.apache.tez.runtime.library.api.KeyValuesReader; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig; +import org.apache.tez.runtime.library.partitioner.HashPartitioner; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestPipelinedShuffle { + + private static MiniDFSCluster miniDFSCluster; + private static MiniTezCluster miniTezCluster; + + private static Configuration conf = new Configuration(); + private static FileSystem fs; + + private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR + + TestPipelinedShuffle.class.getName() + "-tmpDir"; + + private static final int KEYS_PER_MAPPER = 5000; + + @BeforeClass + public static void setupDFSCluster() throws Exception { + conf = new Configuration(); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH, false); + EditLogFileOutputStream.setShouldSkipFsyncForTesting(true); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR); + miniDFSCluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); + fs = miniDFSCluster.getFileSystem(); + conf.set("fs.defaultFS", fs.getUri().toString()); + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false); + } + + @AfterClass + public static void shutdownDFSCluster() { + if (miniDFSCluster != null) { + //shutdown + miniDFSCluster.shutdown(); + } + } + + @Before + public void setupTezCluster() throws Exception { + //With 1 MB sort buffer and with good amount of dataset, it would spill records + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 1); + + //Enable PipelinedShuffle + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, true); + + //Enable local fetch + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true); + + // 3 seconds should be good enough in local machine + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT, 3 * 1000); + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT, 3 * 1000); + //set to low value so that it can detect failures quickly + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT, 2); + + miniTezCluster = new MiniTezCluster(TestPipelinedShuffle.class.getName(), 1, 1, 1); + + miniTezCluster.init(conf); + miniTezCluster.start(); + } + + @After + public void shutdownTezCluster() throws IOException { + if (miniTezCluster != null) { + miniTezCluster.stop(); + } + } + + @Test + public void baseTest() throws Exception { + PipelinedShuffleJob pipelinedShuffle = new PipelinedShuffleJob(); + pipelinedShuffle.setConf(new Configuration(miniTezCluster.getConfig())); + + String[] args = new String[] { }; + assertEquals(0, pipelinedShuffle.run(args)); + } + + /** + * + * mapper1 --\ + * --> reducer + * mapper2 --/ + * + * Mappers just generate dummy data, but ensures that there is enough spills. + * Reducer should process them correctly and validate the total number of records. + * Only record count is validated in the reducer which is fine for this test. + */ + public static class PipelinedShuffleJob extends Configured implements Tool { + private TezConfiguration tezConf; + + public static class DataGenerator extends SimpleMRProcessor { + + public DataGenerator(ProcessorContext context) { + super(context); + } + + @Override public void run() throws Exception { + Preconditions.checkArgument(getInputs().size() == 0); + Preconditions.checkArgument(getOutputs().size() == 1); + KeyValueWriter writer = (KeyValueWriter) getOutputs().get("reducer").getWriter(); + + for (int i = 0; i < KEYS_PER_MAPPER; i++) { + writer.write(new Text(RandomStringUtils.randomAlphanumeric(1000)), + new Text(RandomStringUtils.randomAlphanumeric(1000))); + } + } + } + + public static class SimpleReduceProcessor extends SimpleMRProcessor { + + public SimpleReduceProcessor(ProcessorContext context) { + super(context); + } + + private long readData(KeyValuesReader reader) throws IOException { + long records = 0; + while (reader.next()) { + reader.getCurrentKey(); + for (Object val : reader.getCurrentValues()) { + records++; + } + } + return records; + } + + @Override + public void run() throws Exception { + Preconditions.checkArgument(getInputs().size() == 2); + + long totalRecords = 0; + + KeyValuesReader reader1 = (KeyValuesReader) getInputs().get("mapper1").getReader(); + totalRecords += readData(reader1); + + KeyValuesReader reader2 = (KeyValuesReader) getInputs().get("mapper2").getReader(); + totalRecords += readData(reader2); + + //Verify if correct number of records are retrieved. + assertEquals(2 * KEYS_PER_MAPPER, totalRecords); + } + } + + @Override + public int run(String[] args) throws Exception { + this.tezConf = new TezConfiguration(getConf()); + String dagName = "pipelinedShuffleTest"; + DAG dag = DAG.create(dagName); + + Vertex m1_Vertex = Vertex.create("mapper1", + ProcessorDescriptor.create(DataGenerator.class.getName()), 1); + + Vertex m2_Vertex = Vertex.create("mapper2", + ProcessorDescriptor.create(DataGenerator.class.getName()), 1); + + Vertex reducerVertex = Vertex.create("reducer", + ProcessorDescriptor.create(SimpleReduceProcessor.class.getName()), 1); + + Edge mapper1_to_reducer = Edge.create(m1_Vertex, reducerVertex, + OrderedPartitionedKVEdgeConfig + .newBuilder(Text.class.getName(), Text.class.getName(), + HashPartitioner.class.getName()) + .setFromConfiguration(tezConf).build().createDefaultEdgeProperty()); + + Edge mapper2_to_reducer = Edge.create(m2_Vertex, reducerVertex, + OrderedPartitionedKVEdgeConfig + .newBuilder(Text.class.getName(), Text.class.getName(), + HashPartitioner.class.getName()) + .setFromConfiguration(tezConf).build().createDefaultEdgeProperty()); + + dag.addVertex(m1_Vertex); + dag.addVertex(m2_Vertex); + dag.addVertex(reducerVertex); + + dag.addEdge(mapper1_to_reducer).addEdge(mapper2_to_reducer); + + TezClient client = TezClient.create(dagName, tezConf); + client.start(); + client.waitTillReady(); + + DAGClient dagClient = client.submitDAG(dag); + Set<StatusGetOpts> getOpts = Sets.newHashSet(); + getOpts.add(StatusGetOpts.GET_COUNTERS); + + DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(getOpts); + + System.out.println(dagStatus.getDAGCounters()); + TezCounters counters = dagStatus.getDAGCounters(); + + //Ensure that atleast 10 spills were there in this job. + assertTrue(counters.findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT).getValue() > 10); + + if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) { + System.out.println("DAG diagnostics: " + dagStatus.getDiagnostics()); + return -1; + } + return 0; + } + } + +}
