Author: cutting Date: Fri Jun 15 14:56:37 2007 New Revision: 547807 URL: http://svn.apache.org/viewvc?view=rev&rev=547807 Log: HADOOP-1320. Rewrite RandomWriter example to bypass reduce. Contributed by Arun.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamReduceNone.java lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=547807&r1=547806&r2=547807 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Fri Jun 15 14:56:37 2007 @@ -147,6 +147,9 @@ 46. HADOOP-1417. Disable a few FindBugs checks that generate a lot of spurious warnings. (Nigel Daley via cutting) + 47. HADOOP-1320. Rewrite RandomWriter example to bypass reduce. + (Arun C Murthy via cutting) + Release 0.13.0 - 2007-06-08 Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamReduceNone.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamReduceNone.java?view=diff&rev=547807&r1=547806&r2=547807 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamReduceNone.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamReduceNone.java Fri Jun 15 14:56:37 2007 @@ -71,6 +71,7 @@ public void testCommandLine() { + String outFileName = "part-00000"; File outFile = null; try { try { @@ -85,7 +86,7 @@ // So don't specify -config or -cluster job = new StreamJob(genArgs(), mayExit); job.go(); - outFile = new File(OUTPUT_DIR, "tip_m_map_0000").getAbsoluteFile(); + outFile = new File(OUTPUT_DIR, outFileName).getAbsoluteFile(); String output = StreamUtil.slurp(outFile); System.err.println("outEx1=" + outputExpect); System.err.println(" out1=" + output); @@ -94,7 +95,7 @@ failTrace(e); } finally { outFile.delete(); - File outFileCRC = new File(OUTPUT_DIR, ".tip_m_map_0000.crc").getAbsoluteFile(); + File outFileCRC = new File(OUTPUT_DIR, "."+outFileName+".crc").getAbsoluteFile(); INPUT_FILE.delete(); outFileCRC.delete(); OUTPUT_DIR.getAbsoluteFile().delete(); Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java?view=diff&rev=547807&r1=547806&r2=547807 ============================================================================== --- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java (original) +++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java Fri Jun 15 14:56:37 2007 @@ -22,13 +22,12 @@ import java.util.Date; import java.util.Random; -import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; -import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapred.lib.IdentityReducer; -import org.apache.hadoop.mapred.lib.NullOutputFormat; +import org.apache.hadoop.util.ToolBase; /** * This program uses map/reduce to just run a distributed job where there is @@ -61,8 +60,11 @@ * <value>1099511627776</value> * </property> * </configuration></xmp> + * + * Equivalently, [EMAIL PROTECTED] RandomWriter} also supports all the above options + * and ones supported by [EMAIL PROTECTED] ToolBase} via the command-line. */ -public class RandomWriter { +public class RandomWriter extends ToolBase { /** * User counters @@ -88,7 +90,7 @@ InputSplit[] result = new InputSplit[numSplits]; Path outDir = job.getOutputPath(); for(int i=0; i < result.length; ++i) { - result[i] = new FileSplit(new Path(outDir, "part-" + i), 0, 1, job); + result[i] = new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1, job); } return result; } @@ -133,8 +135,6 @@ } static class Map extends MapReduceBase implements Mapper { - private FileSystem fileSys = null; - private JobConf jobConf = null; private long numBytesToWrite; private int minKeySize; private int keySizeRange; @@ -143,7 +143,6 @@ private Random random = new Random(); private BytesWritable randomKey = new BytesWritable(); private BytesWritable randomValue = new BytesWritable(); - private Path outputDir = null; private void randomizeBytes(byte[] data, int offset, int length) { for(int i=offset + length - 1; i >= offset; --i) { @@ -158,12 +157,6 @@ Writable value, OutputCollector output, Reporter reporter) throws IOException { - String filename = ((Text) key).toString(); - SequenceFile.Writer writer = - SequenceFile.createWriter(fileSys, jobConf, - new Path(outputDir, filename), - BytesWritable.class, BytesWritable.class, - CompressionType.NONE, reporter); int itemCount = 0; while (numBytesToWrite > 0) { int keyLength = minKeySize + @@ -174,7 +167,7 @@ (valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0); randomValue.setSize(valueLength); randomizeBytes(randomValue.get(), 0, randomValue.getSize()); - writer.append(randomKey, randomValue); + output.collect(randomKey, randomValue); numBytesToWrite -= keyLength + valueLength; reporter.incrCounter(Counters.BYTES_WRITTEN, keyLength + valueLength); reporter.incrCounter(Counters.RECORDS_WRITTEN, 1); @@ -184,7 +177,6 @@ } } reporter.setStatus("done with " + itemCount + " records."); - writer.close(); } /** @@ -192,14 +184,6 @@ * the data. */ public void configure(JobConf job) { - jobConf = job; - try { - fileSys = FileSystem.get(job); - } catch (IOException e) { - throw new RuntimeException("Can't get default file system", e); - } - outputDir = job.getOutputPath(); - numBytesToWrite = job.getLong("test.randomwrite.bytes_per_map", 1*1024*1024*1024); minKeySize = job.getInt("test.randomwrite.min_key", 10); @@ -219,18 +203,15 @@ * * @throws IOException */ - public static void main(String[] args) throws IOException { + public int run(String[] args) throws Exception { if (args.length == 0) { System.out.println("Usage: writer <out-dir> [<config>]"); - return; + return -1; } + Path outDir = new Path(args[0]); - JobConf job; - if (args.length >= 2) { - job = new JobConf(new Path(args[1])); - } else { - job = new JobConf(); - } + JobConf job = new JobConf(conf); + job.setJarByClass(RandomWriter.class); job.setJobName("random-writer"); job.setOutputPath(outDir); @@ -241,7 +222,7 @@ job.setInputFormat(RandomInputFormat.class); job.setMapperClass(Map.class); job.setReducerClass(IdentityReducer.class); - job.setOutputFormat(NullOutputFormat.class); + job.setOutputFormat(SequenceFileOutputFormat.class); JobClient client = new JobClient(job); ClusterStatus cluster = client.getClusterStatus(); @@ -250,7 +231,7 @@ 1*1024*1024*1024); if (numBytesToWritePerMap == 0) { System.err.println("Cannot have test.randomwrite.bytes_per_map set to 0"); - System.exit(-1); + return -2; } long totalBytesToWrite = job.getLong("test.randomwrite.total_bytes", numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers()); @@ -262,7 +243,9 @@ job.setNumMapTasks(numMaps); System.out.println("Running " + numMaps + " maps."); - job.setNumReduceTasks(1); + + // reducer NONE + job.setNumReduceTasks(0); Date startTime = new Date(); System.out.println("Job started: " + startTime); @@ -272,6 +255,13 @@ System.out.println("The job took " + (endTime.getTime() - startTime.getTime()) /1000 + " seconds."); + + return 0; } + public static void main(String[] args) throws Exception { + int res = new RandomWriter().doMain(new Configuration(), args); + System.exit(res); + } + } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=547807&r1=547806&r2=547807 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Fri Jun 15 14:56:37 2007 @@ -239,7 +239,7 @@ JobConf job, Reporter reporter) throws IOException { this.job = job; this.reporter = reporter; - String finalName = getTipId(); + String finalName = getOutputName(getPartition()); FileSystem fs = FileSystem.get(this.job); out = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=547807&r1=547806&r2=547807 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Fri Jun 15 14:56:37 2007 @@ -26,7 +26,6 @@ import java.net.URL; import java.net.URLClassLoader; import java.text.DecimalFormat; -import java.text.NumberFormat; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -373,20 +372,6 @@ throw ioe; } done(umbilical); - } - - /** Construct output file names so that, when an output directory listing is - * sorted lexicographically, positions correspond to output partitions.*/ - - private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance(); - - static { - NUMBER_FORMAT.setMinimumIntegerDigits(5); - NUMBER_FORMAT.setGroupingUsed(false); - } - - static synchronized String getOutputName(int partition) { - return "part-" + NUMBER_FORMAT.format(partition); } private class ReduceCopier implements MRConstants { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?view=diff&rev=547807&r1=547806&r2=547807 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Fri Jun 15 14:56:37 2007 @@ -22,6 +22,7 @@ import java.io.DataOutput; import java.io.IOException; import java.net.URI; +import java.text.NumberFormat; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -54,8 +55,23 @@ REDUCE_INPUT_RECORDS, REDUCE_OUTPUT_RECORDS } + + /////////////////////////////////////////////////////////// + // Helper methods to construct task-output paths + /////////////////////////////////////////////////////////// - + /** Construct output file names so that, when an output directory listing is + * sorted lexicographically, positions correspond to output partitions.*/ + private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance(); + static { + NUMBER_FORMAT.setMinimumIntegerDigits(5); + NUMBER_FORMAT.setGroupingUsed(false); + } + + static synchronized String getOutputName(int partition) { + return "part-" + NUMBER_FORMAT.format(partition); + } + //////////////////////////////////////////// // Fields //////////////////////////////////////////// Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java?view=diff&rev=547807&r1=547806&r2=547807 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java Fri Jun 15 14:56:37 2007 @@ -305,7 +305,7 @@ assertTrue("job was complete", rj.isComplete()); assertTrue("job was successful", rj.isSuccessful()); Path output = new Path(outDir, - ReduceTask.getOutputName(0)); + Task.getOutputName(0)); assertTrue("reduce output exists " + output, fs.exists(output)); SequenceFile.Reader rdr = new SequenceFile.Reader(fs, output, conf);