Author: ddas Date: Thu Nov 29 03:31:15 2007 New Revision: 599389 URL: http://svn.apache.org/viewvc?rev=599389&view=rev Log: HADOOP-2245. Fixes LocalJobRunner to include a jobId in the mapId. Also, adds a testcase for JobControl. Contributed by Adrian Woodhead.
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/TestLocalJobControl.java Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/TestJobControl.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=599389&r1=599388&r2=599389&view=diff ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Thu Nov 29 03:31:15 2007 @@ -158,6 +158,8 @@ HADOOP-2244. Fixes the MapWritable.readFields to clear the instance field variable every time readFields is called. (Michael Stack via ddas). + HADOOP-2245. Fixes LocalJobRunner to include a jobId in the mapId. Also, + adds a testcase for JobControl. (Adrian Woodhead via ddas). Branch 0.15 (unreleased) Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=599389&r1=599388&r2=599389&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Thu Nov 29 03:31:15 2007 @@ -114,7 +114,7 @@ } DataOutputBuffer buffer = new DataOutputBuffer(); for (int i = 0; i < splits.length; i++) { - String mapId = "map_" + idFormat.format(i); + String mapId = jobId + "_map_" + idFormat.format(i); mapIds.add(mapId); buffer.reset(); splits[i].write(buffer); Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/TestJobControl.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/TestJobControl.java?rev=599389&r1=599388&r2=599389&view=diff ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/TestJobControl.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/TestJobControl.java Thu Nov 29 03:31:15 2007 @@ -18,25 +18,12 @@ package org.apache.hadoop.mapred.jobcontrol; -import java.io.IOException; -import java.text.NumberFormat; -import java.util.Iterator; import java.util.ArrayList; -import java.util.Random; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; /** * This class performs unit test for Job/JobControl classes. @@ -44,87 +31,6 @@ */ public class TestJobControl extends junit.framework.TestCase { - private static NumberFormat idFormat = NumberFormat.getInstance(); - static { - idFormat.setMinimumIntegerDigits(4); - idFormat.setGroupingUsed(false); - } - - static private Random rand = new Random(); - - private static void cleanData(FileSystem fs, Path dirPath) - throws IOException { - fs.delete(dirPath); - } - - private static String generateRandomWord() { - return idFormat.format(rand.nextLong()); - } - - private static String generateRandomLine() { - long r = rand.nextLong() % 7; - long n = r + 20; - StringBuffer sb = new StringBuffer(); - for (int i = 0; i < n; i++) { - sb.append(generateRandomWord()).append(" "); - } - sb.append("\n"); - return sb.toString(); - } - - private static void generateData(FileSystem fs, Path dirPath) - throws IOException { - FSDataOutputStream out = fs.create(new Path(dirPath, "data.txt")); - for (int i = 0; i < 100000; i++) { - String line = TestJobControl.generateRandomLine(); - out.write(line.getBytes("UTF-8")); - } - out.close(); - } - - public static class DataCopy extends MapReduceBase - implements Mapper<WritableComparable, Text, Text, Text>, - Reducer<Text, Text, Text, Text> { - public void map(WritableComparable key, Text value, - OutputCollector<Text, Text> output, - Reporter reporter) throws IOException { - output.collect(new Text(key.toString()), value); - } - - public void reduce(Text key, Iterator<Text> values, - OutputCollector<Text, Text> output, - Reporter reporter) throws IOException { - Text dumbKey = new Text(""); - while (values.hasNext()) { - Text data = (Text) values.next(); - output.collect(dumbKey, data); - } - } - } - - private static JobConf createCopyJob(ArrayList indirs, Path outdir) - throws Exception { - - Configuration defaults = new Configuration(); - JobConf theJob = new JobConf(defaults, TestJobControl.class); - theJob.setJobName("DataMoveJob"); - - theJob.setInputPath((Path) indirs.get(0)); - if (indirs.size() > 1) { - for (int i = 1; i < indirs.size(); i++) { - theJob.addInputPath((Path) indirs.get(i)); - } - } - theJob.setMapperClass(DataCopy.class); - theJob.setOutputPath(outdir); - theJob.setOutputKeyClass(Text.class); - theJob.setOutputValueClass(Text.class); - theJob.setReducerClass(DataCopy.class); - theJob.setNumMapTasks(12); - theJob.setNumReduceTasks(4); - return theJob; - } - /** * This is a main function for testing JobControl class. * It first cleans all the dirs it will use. Then it generates some random text @@ -139,11 +45,9 @@ * Then it creates a JobControl object and add the 4 jobs to the JobControl object. * Finally, it creates a thread to run the JobControl object and monitors/reports * the job states. - * - * @param args */ public static void doJobControlTest() throws Exception { - + Configuration defaults = new Configuration(); FileSystem fs = FileSystem.get(defaults); Path rootDataDir = new Path(System.getProperty("test.build.data", "."), "TestJobControlData"); @@ -153,29 +57,29 @@ Path outdir_3 = new Path(rootDataDir, "outdir_3"); Path outdir_4 = new Path(rootDataDir, "outdir_4"); - cleanData(fs, indir); - generateData(fs, indir); + JobControlTestUtils.cleanData(fs, indir); + JobControlTestUtils.generateData(fs, indir); - cleanData(fs, outdir_1); - cleanData(fs, outdir_2); - cleanData(fs, outdir_3); - cleanData(fs, outdir_4); + JobControlTestUtils.cleanData(fs, outdir_1); + JobControlTestUtils.cleanData(fs, outdir_2); + JobControlTestUtils.cleanData(fs, outdir_3); + JobControlTestUtils.cleanData(fs, outdir_4); ArrayList<Job> dependingJobs = null; ArrayList<Path> inPaths_1 = new ArrayList<Path>(); inPaths_1.add(indir); - JobConf jobConf_1 = createCopyJob(inPaths_1, outdir_1); + JobConf jobConf_1 = JobControlTestUtils.createCopyJob(inPaths_1, outdir_1); Job job_1 = new Job(jobConf_1, dependingJobs); ArrayList<Path> inPaths_2 = new ArrayList<Path>(); inPaths_2.add(indir); - JobConf jobConf_2 = createCopyJob(inPaths_2, outdir_2); + JobConf jobConf_2 = JobControlTestUtils.createCopyJob(inPaths_2, outdir_2); Job job_2 = new Job(jobConf_2, dependingJobs); ArrayList<Path> inPaths_3 = new ArrayList<Path>(); inPaths_3.add(outdir_1); inPaths_3.add(outdir_2); - JobConf jobConf_3 = createCopyJob(inPaths_3, outdir_3); + JobConf jobConf_3 = JobControlTestUtils.createCopyJob(inPaths_3, outdir_3); dependingJobs = new ArrayList<Job>(); dependingJobs.add(job_1); dependingJobs.add(job_2); @@ -183,7 +87,7 @@ ArrayList<Path> inPaths_4 = new ArrayList<Path>(); inPaths_4.add(outdir_3); - JobConf jobConf_4 = createCopyJob(inPaths_4, outdir_4); + JobConf jobConf_4 = JobControlTestUtils.createCopyJob(inPaths_4, outdir_4); dependingJobs = new ArrayList<Job>(); dependingJobs.add(job_3); Job job_4 = new Job(jobConf_4, dependingJobs); Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/TestLocalJobControl.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/TestLocalJobControl.java?rev=599389&view=auto ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/TestLocalJobControl.java (added) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/TestLocalJobControl.java Thu Nov 29 03:31:15 2007 @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.jobcontrol; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.HadoopTestCase; +import org.apache.hadoop.mapred.JobConf; + +/** + * HadoopTestCase that tests the local job runner. + */ +public class TestLocalJobControl extends HadoopTestCase { + + public static final Log LOG = LogFactory.getLog(TestLocalJobControl.class + .getName()); + + /** + * Initialises a new instance of this test case to use a Local MR cluster and + * a local filesystem. + * + * @throws IOException If an error occurs initialising this object. + */ + public TestLocalJobControl() throws IOException { + super(HadoopTestCase.LOCAL_MR, HadoopTestCase.LOCAL_FS, 2, 2); + } + + /** + * This is a main function for testing JobControl class. It first cleans all + * the dirs it will use. Then it generates some random text data in + * TestJobControlData/indir. Then it creates 4 jobs: Job 1: copy data from + * indir to outdir_1 Job 2: copy data from indir to outdir_2 Job 3: copy data + * from outdir_1 and outdir_2 to outdir_3 Job 4: copy data from outdir to + * outdir_4 The jobs 1 and 2 have no dependency. The job 3 depends on jobs 1 + * and 2. The job 4 depends on job 3. + * + * Then it creates a JobControl object and add the 4 jobs to the JobControl + * object. Finally, it creates a thread to run the JobControl object and + * monitors/reports the job states. + */ + public void testLocalJobControlDataCopy() throws Exception { + + FileSystem fs = FileSystem.get(createJobConf()); + Path rootDataDir = new Path(System.getProperty("test.build.data", "."), + "TestLocalJobControlData"); + Path indir = new Path(rootDataDir, "indir"); + Path outdir_1 = new Path(rootDataDir, "outdir_1"); + Path outdir_2 = new Path(rootDataDir, "outdir_2"); + Path outdir_3 = new Path(rootDataDir, "outdir_3"); + Path outdir_4 = new Path(rootDataDir, "outdir_4"); + + JobControlTestUtils.cleanData(fs, indir); + JobControlTestUtils.generateData(fs, indir); + + JobControlTestUtils.cleanData(fs, outdir_1); + JobControlTestUtils.cleanData(fs, outdir_2); + JobControlTestUtils.cleanData(fs, outdir_3); + JobControlTestUtils.cleanData(fs, outdir_4); + + ArrayList<Job> dependingJobs = null; + + ArrayList<Path> inPaths_1 = new ArrayList<Path>(); + inPaths_1.add(indir); + JobConf jobConf_1 = JobControlTestUtils.createCopyJob(inPaths_1, outdir_1); + Job job_1 = new Job(jobConf_1, dependingJobs); + ArrayList<Path> inPaths_2 = new ArrayList<Path>(); + inPaths_2.add(indir); + JobConf jobConf_2 = JobControlTestUtils.createCopyJob(inPaths_2, outdir_2); + Job job_2 = new Job(jobConf_2, dependingJobs); + + ArrayList<Path> inPaths_3 = new ArrayList<Path>(); + inPaths_3.add(outdir_1); + inPaths_3.add(outdir_2); + JobConf jobConf_3 = JobControlTestUtils.createCopyJob(inPaths_3, outdir_3); + dependingJobs = new ArrayList<Job>(); + dependingJobs.add(job_1); + dependingJobs.add(job_2); + Job job_3 = new Job(jobConf_3, dependingJobs); + + ArrayList<Path> inPaths_4 = new ArrayList<Path>(); + inPaths_4.add(outdir_3); + JobConf jobConf_4 = JobControlTestUtils.createCopyJob(inPaths_4, outdir_4); + dependingJobs = new ArrayList<Job>(); + dependingJobs.add(job_3); + Job job_4 = new Job(jobConf_4, dependingJobs); + + JobControl theControl = new JobControl("Test"); + theControl.addJob(job_1); + theControl.addJob(job_2); + theControl.addJob(job_3); + theControl.addJob(job_4); + + Thread theController = new Thread(theControl); + theController.start(); + while (!theControl.allFinished()) { + LOG.debug("Jobs in waiting state: " + theControl.getWaitingJobs().size()); + LOG.debug("Jobs in ready state: " + theControl.getReadyJobs().size()); + LOG.debug("Jobs in running state: " + theControl.getRunningJobs().size()); + LOG.debug("Jobs in success state: " + + theControl.getSuccessfulJobs().size()); + LOG.debug("Jobs in failed state: " + theControl.getFailedJobs().size()); + LOG.debug("\n"); + try { + Thread.sleep(5000); + } catch (Exception e) { + + } + } + + assertEquals("Some jobs failed", 0, theControl.getFailedJobs().size()); + theControl.stop(); + } + +}