Updated Branches: refs/heads/TEZ-1 38b410e31 -> cee79f638
TEZ-62. Fix and enable tests in TestMapProcessor and TestReduceProcessor. (sseth) Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/cee79f63 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/cee79f63 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/cee79f63 Branch: refs/heads/TEZ-1 Commit: cee79f6384940ca226cd0c2ef02e024f19e7550a Parents: 38b410e Author: Siddharth Seth <[email protected]> Authored: Fri May 17 00:03:20 2013 -0700 Committer: Siddharth Seth <[email protected]> Committed: Fri May 17 00:03:20 2013 -0700 ---------------------------------------------------------------------- .../java/org/apache/tez/common/TezJobConfig.java | 7 + .../org/apache/hadoop/mapred/YarnTezDagChild.java | 10 +- .../apache/tez/common/TezEngineTaskContext.java | 4 +- .../mapreduce/split/SplitMetaInfoReaderTez.java | 14 ++- .../tez/mapreduce/hadoop/TezTypeConverters.java | 2 + .../org/apache/tez/mapreduce/processor/MRTask.java | 8 +- .../tez/mapreduce/processor/map/MapProcessor.java | 14 ++- .../tez/mapreduce/TestUmbilicalProtocol.java | 2 +- .../apache/tez/mapreduce/processor/MapUtils.java | 85 ++++++++---- .../mapreduce/processor/map/TestMapProcessor.java | 79 +++++++++--- .../processor/reduce/TestReduceProcessor.java | 104 +++++++++++---- 11 files changed, 236 insertions(+), 93 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cee79f63/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java index e867458..4a7abea 100644 --- a/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java +++ b/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java @@ -68,6 +68,13 @@ public class TezJobConfig { public static final String DEFAULT_LOCAL_DIR = "/tmp"; /** + * The directory which contains the localized files for this task. + */ + @Private + public static final String TASK_LOCAL_RESOURCE_DIR = "tez.engine.task-local-resource.dir"; + public static final String DEFAULT_TASK_LOCAL_RESOURCE_DIR = "/tmp"; + + /** * */ public static final String TEZ_ENGINE_TASK_INDEGREE = http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cee79f63/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java index f140c3a..2295b6a 100644 --- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java +++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java @@ -249,12 +249,16 @@ public class YarnTezDagChild { * out an output directory. * @throws IOException */ - private static void configureLocalDirs(MRTask task, JobConf job) throws IOException { + private static void configureLocalDirs(JobConf job) throws IOException { String[] localSysDirs = StringUtils.getTrimmedStrings( System.getenv(Environment.LOCAL_DIRS.name())); job.setStrings(TezJobConfig.LOCAL_DIR, localSysDirs); + job.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, + System.getenv(Environment.PWD.name())); LOG.info(TezJobConfig.LOCAL_DIR + " for child: " + job.get(TezJobConfig.LOCAL_DIR)); + LOG.info(TezJobConfig.TASK_LOCAL_RESOURCE_DIR + " for child: " + + job.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR)); LocalDirAllocator lDirAlloc = new LocalDirAllocator(TezJobConfig.LOCAL_DIR); Path workDir = null; // First, try to find the JOB_LOCAL_DIR on this host. @@ -285,7 +289,7 @@ public class YarnTezDagChild { } } // TODO TEZ This likely needs fixing to make sure things work when there are multiple local-dirs etc. - job.set(MRJobConfig.JOB_LOCAL_DIR,workDir.toString()); + job.set(MRJobConfig.JOB_LOCAL_DIR, workDir.toString()); } private static JobConf configureTask(MRTask task, Credentials credentials, @@ -308,7 +312,7 @@ public class YarnTezDagChild { // JobTokenSecretManager.createSecretKey(jt.getPassword())); // setup the child's MRConfig.LOCAL_DIR. - configureLocalDirs(task, job); + configureLocalDirs(job); // setup the child's attempt directories // Do the task-type specific localization http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cee79f63/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java b/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java index 350702a..5823de6 100644 --- a/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java +++ b/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java @@ -39,7 +39,7 @@ public class TezEngineTaskContext extends TezTaskContext { } public TezEngineTaskContext(TezTaskAttemptID taskAttemptID, String user, - String jobName, String vertexName, String moduleClassName, + String jobName, String vertexName, String processorName, List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList) { super(taskAttemptID, user, jobName, vertexName); this.inputSpecList = inputSpecList; @@ -52,7 +52,7 @@ public class TezEngineTaskContext extends TezTaskContext { } this.inputSpecList = inputSpecList; this.outputSpecList = outputSpecList; - this.processorName = moduleClassName; + this.processorName = processorName; } public String getProcessorName() { http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cee79f63/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java index de8d972..72246c0 100644 --- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java +++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; +import org.apache.tez.common.TezJobConfig; import org.apache.tez.mapreduce.hadoop.MRJobConfig; /** @@ -44,6 +45,9 @@ public class SplitMetaInfoReaderTez { public static final Log LOG = LogFactory.getLog(SplitMetaInfoReaderTez.class); + public static final int META_SPLIT_VERSION = JobSplit.META_SPLIT_VERSION; + public static final byte[] META_SPLIT_FILE_HEADER = JobSplit.META_SPLIT_FILE_HEADER; + // Forked from the MR variant so that the metaInfo file as well as the split // file can be read from local fs - relying on these files being localized. public static TaskSplitMetaInfo[] readSplitMetaInfo(Configuration conf, @@ -53,9 +57,11 @@ public class SplitMetaInfoReaderTez { MRJobConfig.SPLIT_METAINFO_MAXSIZE, MRJobConfig.DEFAULT_SPLIT_METAINFO_MAXSIZE); - Path metaSplitFile = new Path(MRJobConfig.JOB_SPLIT_METAINFO); + Path metaSplitFile = new Path( + conf.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR), + MRJobConfig.JOB_SPLIT_METAINFO); String jobSplitFile = MRJobConfig.JOB_SPLIT; - + File file = new File(metaSplitFile.toUri().getPath()).getAbsoluteFile(); LOG.info("DEBUG: Setting up JobSplitIndex with JobSplitFile at: " + file.getAbsolutePath() + ", defaultFS from conf: " @@ -83,12 +89,12 @@ public class SplitMetaInfoReaderTez { JobSplit.SplitMetaInfo splitMetaInfo = new JobSplit.SplitMetaInfo(); splitMetaInfo.readFields(in); JobSplit.TaskSplitIndex splitIndex = new JobSplit.TaskSplitIndex( - jobSplitFile, splitMetaInfo.getStartOffset()); + new Path(conf.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR), jobSplitFile) + .toUri().toString(), splitMetaInfo.getStartOffset()); allSplitMetaInfo[i] = new JobSplit.TaskSplitMetaInfo(splitIndex, splitMetaInfo.getLocations(), splitMetaInfo.getInputDataLength()); } in.close(); return allSplitMetaInfo; } - } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cee79f63/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java index 1fc71b0..768d347 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java @@ -34,6 +34,8 @@ import org.apache.tez.engine.records.TezDependentTaskCompletionEvent; public class TezTypeConverters { + // TODO Remove unused methods + // Tez objects will be imported. Others will use the fully qualified name when // required. // All public methods named toYarn / toTez / toMapReduce http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cee79f63/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java index d7d721f..276294c 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java @@ -149,7 +149,7 @@ extends RunningTaskContext { jobConf.set(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID, taskAttemptId.toString()); - + initResourceCalculatorPlugin(); LOG.info("MRTask.inited: taskAttemptId = " + taskAttemptId.toString()); @@ -695,8 +695,10 @@ extends RunningTaskContext { public void localizeConfiguration(JobConf jobConf) throws IOException, InterruptedException { - jobConf.set(JobContext.TASK_ID, taskAttemptId.getTaskID().toString()); - jobConf.set(JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString()); + jobConf.set(JobContext.TASK_ID, IDConverter + .toMRTaskAttemptId(taskAttemptId).toString()); + jobConf.set(JobContext.TASK_ATTEMPT_ID, + IDConverter.toMRTaskAttemptId(taskAttemptId).toString()); jobConf.setInt(JobContext.TASK_PARTITION, taskAttemptId.getTaskID().getId()); jobConf.set(JobContext.ID, taskAttemptId.getTaskID().getVertexID().getDAGId().toString()); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cee79f63/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java index a6ab986..3558739 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java @@ -69,11 +69,7 @@ public class MapProcessor extends MRTask implements Processor { public void initialize(Configuration conf, Master master) throws IOException, InterruptedException { super.initialize(conf, master); - TaskSplitMetaInfo[] allMetaInfo = readSplits(); - TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[tezEngineTaskContext - .getTaskAttemptId().getTaskID().getId()]; - splitMetaInfo = new TaskSplitIndex(thisTaskMetaInfo.getSplitLocation(), - thisTaskMetaInfo.getStartOffset()); + } @Override @@ -81,6 +77,14 @@ public class MapProcessor extends MRTask implements Processor { final Input[] ins, final Output[] outs) throws IOException, InterruptedException { + + // Read split information. + TaskSplitMetaInfo[] allMetaInfo = readSplits(); + TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[tezEngineTaskContext + .getTaskAttemptId().getTaskID().getId()]; + splitMetaInfo = new TaskSplitIndex(thisTaskMetaInfo.getSplitLocation(), + thisTaskMetaInfo.getStartOffset()); + MRTaskReporter reporter = new MRTaskReporter(getTaskReporter()); boolean useNewApi = jobConf.getUseNewMapper(); initTask(jobConf, taskAttemptId.getTaskID().getVertexID().getDAGId(), http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cee79f63/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java index 025bf61..840cb31 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java @@ -111,7 +111,7 @@ public class TestUmbilicalProtocol implements TezTaskUmbilicalProtocol { @Override public boolean canCommit(TezTaskAttemptID taskid) throws IOException { LOG.info("Got 'can-commit' from " + taskid); - return false; + return true; } @Override http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cee79f63/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java index aec6667..0b6bc5f 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java @@ -18,40 +18,37 @@ package org.apache.tez.mapreduce.processor; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.spy; - import java.io.IOException; -import java.util.Collections; +import java.util.List; import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.SequenceFileInputFormat; -import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex; +import org.apache.hadoop.mapreduce.split.JobSplit; +import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo; +import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez; import org.apache.tez.common.InputSpec; import org.apache.tez.common.OutputSpec; import org.apache.tez.common.TezEngineTaskContext; +import org.apache.tez.common.TezJobConfig; import org.apache.tez.common.TezTaskUmbilicalProtocol; -import org.apache.tez.engine.api.Input; -import org.apache.tez.engine.api.Output; -import org.apache.tez.engine.api.Processor; import org.apache.tez.engine.api.Task; -import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput; import org.apache.tez.engine.runtime.RuntimeUtils; -import org.apache.tez.engine.task.RuntimeTask; import org.apache.tez.mapreduce.TezTestUtils; -import org.apache.tez.mapreduce.input.SimpleInput; +import org.apache.tez.mapreduce.hadoop.MRJobConfig; import org.apache.tez.mapreduce.processor.map.MapProcessor; public class MapUtils { @@ -62,8 +59,7 @@ public class MapUtils { createInputSplit(FileSystem fs, Path workDir, JobConf job, Path file) throws IOException { FileInputFormat.setInputPaths(job, workDir); - - + // create a file with length entries SequenceFile.Writer writer = SequenceFile.createWriter(fs, job, file, @@ -92,30 +88,59 @@ public class MapUtils { "file = " + ((FileSplit)splits[0]).getPath()); return splits[0]; } + + final private static FsPermission JOB_FILE_PERMISSION = FsPermission + .createImmutable((short) 0644); // rw-r--r-- + + // Will write files to PWD, from where they are read. + + private static void writeSplitFiles(FileSystem fs, JobConf conf, + InputSplit split) throws IOException { + Path jobSplitFile = new Path(conf.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, + TezJobConfig.DEFAULT_TASK_LOCAL_RESOURCE_DIR), MRJobConfig.JOB_SPLIT); + FSDataOutputStream out = FileSystem.create(fs, jobSplitFile, + new FsPermission(JOB_FILE_PERMISSION)); + + long offset = out.getPos(); + Text.writeString(out, split.getClass().getName()); + split.write(out); + out.close(); + + String[] locations = split.getLocations(); + + SplitMetaInfo info = null; + info = new JobSplit.SplitMetaInfo(locations, offset, split.getLength()); + + Path jobSplitMetaInfoFile = new Path( + conf.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR), + MRJobConfig.JOB_SPLIT_METAINFO); + + FSDataOutputStream outMeta = FileSystem.create(fs, jobSplitMetaInfoFile, + new FsPermission(JOB_FILE_PERMISSION)); + outMeta.write(SplitMetaInfoReaderTez.META_SPLIT_FILE_HEADER); + WritableUtils.writeVInt(outMeta, SplitMetaInfoReaderTez.META_SPLIT_VERSION); + WritableUtils.writeVInt(outMeta, 1); // Only 1 split meta info being written + info.write(outMeta); + outMeta.close(); + } public static Task runMapProcessor(FileSystem fs, Path workDir, - JobConf jobConf, - int mapId, Path mapInput, + JobConf jobConf, int mapId, Path mapInput, TezTaskUmbilicalProtocol umbilical, - Class<?> outputClazz) throws Exception { + String vertexName, List<InputSpec> inputSpecs, + List<OutputSpec> outputSpecs) throws Exception { jobConf.setInputFormat(SequenceFileInputFormat.class); InputSplit split = createInputSplit(fs, workDir, jobConf, mapInput); - TezEngineTaskContext taskContext = - new TezEngineTaskContext( - TezTestUtils.getMockTaskAttemptId(0, 0, mapId, 0), "tez", - "tez", "TODO_vertexName", MapProcessor.class.getName(), - Collections.singletonList(new InputSpec("srcVertex", 0, - SimpleInput.class.getName())), - Collections.singletonList(new OutputSpec("targetVertex", 0, - outputClazz.getName()))); + + writeSplitFiles(fs, jobConf, split); + TezEngineTaskContext taskContext = new TezEngineTaskContext( + TezTestUtils.getMockTaskAttemptId(0, 0, mapId, 0), "testuser", + "testJob", vertexName, MapProcessor.class.getName(), + inputSpecs, outputSpecs); Task t = RuntimeUtils.createRuntimeTask(taskContext); t.initialize(jobConf, umbilical); - SimpleInput[] real = ((SimpleInput[])t.getInputs()); - SimpleInput[] inputs = spy(real); - doReturn(split).when(inputs[0]).getOldSplitDetails(any(TaskSplitIndex.class)); - t.getProcessor().process(inputs, t.getOutputs()); + t.getProcessor().process(t.getInputs(), t.getOutputs()); return t; } - } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cee79f63/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java index f09340c..cda15fb 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java @@ -19,9 +19,11 @@ package org.apache.tez.mapreduce.processor.map; import java.io.IOException; +import java.util.Collections; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputBuffer; @@ -29,6 +31,8 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.tez.common.Constants; +import org.apache.tez.common.InputSpec; +import org.apache.tez.common.OutputSpec; import org.apache.tez.common.TezJobConfig; import org.apache.tez.engine.api.Task; import org.apache.tez.engine.common.sort.impl.IFile; @@ -37,11 +41,15 @@ import org.apache.tez.engine.common.task.local.output.TezTaskOutput; import org.apache.tez.engine.lib.output.InMemorySortedOutput; import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput; import org.apache.tez.mapreduce.TestUmbilicalProtocol; +import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator; +import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil; +import org.apache.tez.mapreduce.input.SimpleInput; import org.apache.tez.mapreduce.processor.MapUtils; import org.jboss.netty.buffer.BigEndianHeapChannelBuffer; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.TruncatedChannelBuffer; import org.jboss.netty.handler.stream.ChunkedStream; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Ignore; @@ -49,9 +57,7 @@ import org.junit.Test; public class TestMapProcessor { - private static final Log LOG = LogFactory.getLog(TestMapProcessor.class); - - JobConf job; + private static final Log LOG = LogFactory.getLog(TestMapProcessor.class); private static JobConf defaultConf = new JobConf(); private static FileSystem localFs = null; @@ -63,32 +69,51 @@ public class TestMapProcessor { throw new RuntimeException("init failure", e); } } + @SuppressWarnings("deprecation") private static Path workDir = new Path(new Path(System.getProperty("test.build.data", "/tmp")), "TestMapProcessor").makeQualified(localFs); TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles(); - - @Before - public void setUp() { - job = new JobConf(defaultConf); + + public void setUpJobConf(JobConf job) { job.set(TezJobConfig.LOCAL_DIR, workDir.toString()); job.setClass( Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER, TezLocalTaskOutputFiles.class, TezTaskOutput.class); job.setNumReduceTasks(1); - mapOutputs.setConf(job); + } + + @Before + @After + public void cleanup() throws Exception { + localFs.delete(workDir, true); } @Test - @Ignore public void testMapProcessor() throws Exception { - localFs.delete(workDir, true); - MapUtils.runMapProcessor( - localFs, workDir, job, 0, new Path(workDir, "map0"), - new TestUmbilicalProtocol(), - LocalOnFileSorterOutput.class).close(); + String vertexName = MultiStageMRConfigUtil.getInitialMapVertexName(); + JobConf jobConf = new JobConf(defaultConf); + setUpJobConf(jobConf); + TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles(); + mapOutputs.setConf(jobConf); + + Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf); + Configuration stageConf = MultiStageMRConfigUtil.getConfForVertex(conf, + vertexName); + + JobConf job = new JobConf(stageConf); + + job.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, new Path(workDir, + "localized-resources").toUri().toString()); + + MapUtils.runMapProcessor(localFs, workDir, job, 0, + new Path(workDir, "map0"), new TestUmbilicalProtocol(), vertexName, + Collections.singletonList(new InputSpec("NullVertex", 0, + SimpleInput.class.getName())), + Collections.singletonList(new OutputSpec("FakeVertex", 1, + LocalOnFileSorterOutput.class.getName()))).close(); Path mapOutputFile = mapOutputs.getInputFile(0); LOG.info("mapOutputFile = " + mapOutputFile); @@ -115,16 +140,34 @@ public class TestMapProcessor { @Test @Ignore public void testMapProcessorWithInMemSort() throws Exception { + + String vertexName = MultiStageMRConfigUtil.getInitialMapVertexName(); + final int partitions = 2; - job.setNumReduceTasks(partitions); - job.setInt(TezJobConfig.TEZ_ENGINE_TASK_OUTDEGREE, partitions); + JobConf jobConf = new JobConf(defaultConf); + jobConf.setNumReduceTasks(partitions); + setUpJobConf(jobConf); + TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles(); + mapOutputs.setConf(jobConf); + + Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf); + Configuration stageConf = MultiStageMRConfigUtil.getConfForVertex(conf, + vertexName); + + JobConf job = new JobConf(stageConf); + job.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, new Path(workDir, + "localized-resources").toUri().toString()); localFs.delete(workDir, true); Task t = MapUtils.runMapProcessor( localFs, workDir, job, 0, new Path(workDir, "map0"), - new TestUmbilicalProtocol(true), - InMemorySortedOutput.class); + new TestUmbilicalProtocol(true), vertexName, + Collections.singletonList(new InputSpec("NullVertex", 0, + SimpleInput.class.getName())), + Collections.singletonList(new OutputSpec("FakeVertex", 1, + InMemorySortedOutput.class.getName())) + ); InMemorySortedOutput[] outputs = (InMemorySortedOutput[])t.getOutputs(); verifyInMemSortedStream(outputs[0], 0, 4096); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cee79f63/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java index afc21a7..69571e1 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java @@ -22,40 +22,44 @@ import java.util.Collections; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.tez.common.Constants; import org.apache.tez.common.InputSpec; import org.apache.tez.common.OutputSpec; import org.apache.tez.common.TezEngineTaskContext; import org.apache.tez.common.TezJobConfig; -import org.apache.tez.engine.api.Input; -import org.apache.tez.engine.api.Output; import org.apache.tez.engine.api.Task; import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles; import org.apache.tez.engine.common.task.local.output.TezTaskOutput; import org.apache.tez.engine.lib.input.LocalMergedInput; import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput; import org.apache.tez.engine.runtime.RuntimeUtils; -import org.apache.tez.engine.task.RuntimeTask; import org.apache.tez.mapreduce.TestUmbilicalProtocol; import org.apache.tez.mapreduce.TezTestUtils; +import org.apache.tez.mapreduce.hadoop.IDConverter; +import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator; +import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil; +import org.apache.tez.mapreduce.input.SimpleInput; import org.apache.tez.mapreduce.output.SimpleOutput; import org.apache.tez.mapreduce.processor.MapUtils; +import org.junit.After; +import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; public class TestReduceProcessor { private static final Log LOG = LogFactory.getLog(TestReduceProcessor.class); - - JobConf job; - + private static JobConf defaultConf = new JobConf(); private static FileSystem localFs = null; static { @@ -66,13 +70,12 @@ public class TestReduceProcessor { throw new RuntimeException("init failure", e); } } + @SuppressWarnings("deprecation") private static Path workDir = new Path(new Path(System.getProperty("test.build.data", "/tmp")), "TestReduceProcessor").makeQualified(localFs); - @Before - public void setUp() { - job = new JobConf(defaultConf); + public void setUpJobConf(JobConf job) { job.set(TezJobConfig.LOCAL_DIR, workDir.toString()); job.setClass( Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER, @@ -80,37 +83,84 @@ public class TestReduceProcessor { TezTaskOutput.class); job.setNumReduceTasks(1); } - + + @Before + @After + public void cleanup() throws Exception { + localFs.delete(workDir, true); + } + @Test - @Ignore public void testReduceProcessor() throws Exception { - localFs.delete(workDir, true); + String mapVertexName = MultiStageMRConfigUtil.getInitialMapVertexName(); + String reduceVertexName = MultiStageMRConfigUtil.getFinalReduceVertexName(); + JobConf jobConf = new JobConf(defaultConf); + setUpJobConf(jobConf); + TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles(); + mapOutputs.setConf(jobConf); + + Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf); + Configuration mapStageConf = MultiStageMRConfigUtil.getConfForVertex(conf, + mapVertexName); + + JobConf mapConf = new JobConf(mapStageConf); + + mapConf.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, new Path(workDir, + "localized-resources").toUri().toString()); + // Run a map - MapUtils.runMapProcessor( - localFs, workDir, job, 0, new Path(workDir, "map0"), - new TestUmbilicalProtocol(), - LocalOnFileSorterOutput.class); + MapUtils.runMapProcessor(localFs, workDir, mapConf, 0, + new Path(workDir, "map0"), new TestUmbilicalProtocol(), mapVertexName, + Collections.singletonList(new InputSpec("NullVertex", 0, + SimpleInput.class.getName())), + Collections.singletonList(new OutputSpec("FakeVertex", 1, + LocalOnFileSorterOutput.class.getName()))); LOG.info("Starting reduce..."); - FileOutputFormat.setOutputPath(job, new Path(workDir, "output")); + + + Configuration reduceStageConf = MultiStageMRConfigUtil.getConfForVertex(conf, + reduceVertexName); + JobConf reduceConf = new JobConf(reduceStageConf); + reduceConf.setOutputFormat(SequenceFileOutputFormat.class); + FileOutputFormat.setOutputPath(reduceConf, new Path(workDir, "output")); // Now run a reduce TezEngineTaskContext taskContext = new TezEngineTaskContext( - TezTestUtils.getMockTaskAttemptId(0, 0, 0, 0), "tez", - "tez", "TODO_vertexName", ReduceProcessor.class.getName(), - Collections.singletonList(new InputSpec("TODO_srcVertexName", 1, + TezTestUtils.getMockTaskAttemptId(0, 1, 0, 0), "testUser", + "testJob", reduceVertexName, ReduceProcessor.class.getName(), + Collections.singletonList(new InputSpec(mapVertexName, 1, LocalMergedInput.class.getName())), - Collections.singletonList(new OutputSpec("TODO_targetVertexName", 1, + Collections.singletonList(new OutputSpec("", 1, SimpleOutput.class.getName()))); - - job.set(JobContext.TASK_ATTEMPT_ID, - taskContext.getTaskAttemptId().toString()); + Task t = RuntimeUtils.createRuntimeTask(taskContext); - t.initialize(job, new TestUmbilicalProtocol()); + t.initialize(reduceConf, new TestUmbilicalProtocol()); t.run(); t.close(); + + // Can this be done via some utility class ? MapOutputFile derivative, or + // instantiating the OutputCommitter + Path reduceOutputDir = new Path(new Path(workDir, "output"), + "_temporary/0/" + IDConverter + .toMRTaskId(taskContext.getTaskAttemptId().getTaskID())); + Path reduceOutputFile = new Path(reduceOutputDir, "part-00000"); + + @SuppressWarnings("deprecation") + SequenceFile.Reader reader = new SequenceFile.Reader(localFs, reduceOutputFile, reduceConf); + + LongWritable key = new LongWritable(); + Text value = new Text(); + long prev = Long.MIN_VALUE; + while (reader.next(key, value)) { + if (prev != Long.MIN_VALUE) { + Assert.assertTrue(prev < key.get()); + prev = key.get(); + } + } + reader.close(); } }
