Author: cutting Date: Mon Feb 26 12:13:15 2007 New Revision: 511985 URL: http://svn.apache.org/viewvc?view=rev&rev=511985 Log: HADOOP-1040. Update RandomWriter example to use counters and user-defined input and output formats.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=511985&r1=511984&r2=511985 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Mon Feb 26 12:13:15 2007 @@ -138,6 +138,9 @@ 40. HADOOP-1039. In HDFS's TestCheckpoint, avoid restarting MiniDFSCluster so often, speeding this test. (Dhruba Borthakur via cutting) +41. HADOOP-1040. Update RandomWriter example to use counters and + user-defined input and output formats. (omalley via cutting) + Release 0.11.2 - 2007-02-16 Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java?view=diff&rev=511985&r1=511984&r2=511985 ============================================================================== --- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java (original) +++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java Mon Feb 26 12:13:15 2007 @@ -19,29 +19,16 @@ package org.apache.hadoop.examples; import java.io.IOException; -import java.text.NumberFormat; import java.util.Date; -import java.util.Iterator; import java.util.Random; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.*; import org.apache.hadoop.io.SequenceFile.CompressionType; -import org.apache.hadoop.mapred.ClusterStatus; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.mapred.lib.IdentityReducer; +import org.apache.hadoop.util.Progressable; /** * This program uses map/reduce to just run a distributed job where there is @@ -50,9 +37,91 @@ * * @author Owen O'Malley */ -public class RandomWriter extends MapReduceBase implements Reducer { +public class RandomWriter { - public static class Map extends MapReduceBase implements Mapper { + /** + * User counters + */ + static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN } + + /** + * A custom input format that creates virtual inputs of a single string + * for each map. + */ + static class RandomInputFormat implements InputFormat { + + /** Accept all job confs */ + public void validateInput(JobConf job) throws IOException { + } + + /** + * Generate the requested number of file splits, with the filename + * set to the filename of the output file. + */ + public InputSplit[] getSplits(JobConf job, + int numSplits) throws IOException { + InputSplit[] result = new InputSplit[numSplits]; + Path outDir = job.getOutputPath(); + for(int i=0; i < result.length; ++i) { + result[i] = new FileSplit(new Path(outDir, "part-" + i), 0, 1, job); + } + return result; + } + + /** + * Return a single record (filename, "") where the filename is taken from + * the file split. + */ + static class RandomRecordReader implements RecordReader { + Path name; + public RandomRecordReader(Path p) { + name = p; + } + public boolean next(Writable key, Writable value) { + if (name != null) { + ((Text) key).set(name.toString()); + name = null; + return true; + } + return false; + } + public WritableComparable createKey() { + return new Text(); + } + public Writable createValue() { + return new Text(); + } + public long getPos() { + return 0; + } + public void close() {} + public float getProgress() { + return 0.0f; + } + } + + public RecordReader getRecordReader(InputSplit split, + JobConf job, + Reporter reporter) throws IOException { + return new RandomRecordReader(((FileSplit) split).getPath()); + } + } + + /** + * Consume all outputs and put them in /dev/null. + */ + static class DataSink implements OutputFormat { + public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, + String name, Progressable progress) { + return new RecordWriter(){ + public void write(WritableComparable key, Writable value) { } + public void close(Reporter reporter) { } + }; + } + public void checkOutputSpecs(FileSystem ignored, JobConf job) { } + } + + static class Map extends MapReduceBase implements Mapper { private FileSystem fileSys = null; private JobConf jobConf = null; private long numBytesToWrite; @@ -77,7 +146,7 @@ Writable value, OutputCollector output, Reporter reporter) throws IOException { - String filename = ((Text) value).toString(); + String filename = ((Text) key).toString(); SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, jobConf, new Path(filename), BytesWritable.class, BytesWritable.class, @@ -94,6 +163,8 @@ randomizeBytes(randomValue.get(), 0, randomValue.getSize()); writer.append(randomKey, randomValue); numBytesToWrite -= keyLength + valueLength; + reporter.incrCounter(Counters.BYTES_WRITTEN, keyLength + valueLength); + reporter.incrCounter(Counters.RECORDS_WRITTEN, 1); if (++itemCount % 200 == 0) { reporter.setStatus("wrote record " + itemCount + ". " + numBytesToWrite + " bytes left."); @@ -126,13 +197,6 @@ } - public void reduce(WritableComparable key, - Iterator values, - OutputCollector output, - Reporter reporter) throws IOException { - // nothing - } - /** * This is the main routine for launching a distributed random write job. * It runs 10 maps/node and each node writes 1 gig of data to a DFS file. @@ -149,78 +213,48 @@ * @throws IOException */ public static void main(String[] args) throws IOException { - Configuration defaults = new Configuration(); if (args.length == 0) { System.out.println("Usage: writer <out-dir> [<config>]"); return; } Path outDir = new Path(args[0]); + JobConf job; if (args.length >= 2) { - defaults.addFinalResource(new Path(args[1])); - } - - JobConf jobConf = new JobConf(defaults, RandomWriter.class); - jobConf.setJobName("random-writer"); + job = new JobConf(new Path(args[1])); + } else { + job = new JobConf(); + } + job.setJarByClass(RandomWriter.class); + job.setJobName("random-writer"); + job.setOutputPath(outDir); // turn off speculative execution, because DFS doesn't handle // multiple writers to the same file. - jobConf.setSpeculativeExecution(false); - jobConf.setOutputKeyClass(BytesWritable.class); - jobConf.setOutputValueClass(BytesWritable.class); - - jobConf.setMapperClass(Map.class); - jobConf.setReducerClass(RandomWriter.class); + job.setSpeculativeExecution(false); + job.setOutputKeyClass(BytesWritable.class); + job.setOutputValueClass(BytesWritable.class); + + job.setInputFormat(RandomInputFormat.class); + job.setMapperClass(Map.class); + job.setReducerClass(IdentityReducer.class); + job.setOutputFormat(DataSink.class); - JobClient client = new JobClient(jobConf); + JobClient client = new JobClient(job); ClusterStatus cluster = client.getClusterStatus(); int numMaps = cluster.getTaskTrackers() * - jobConf.getInt("test.randomwriter.maps_per_host", 10); - jobConf.setNumMapTasks(numMaps); + job.getInt("test.randomwriter.maps_per_host", 10); + job.setNumMapTasks(numMaps); System.out.println("Running " + numMaps + " maps."); - jobConf.setNumReduceTasks(1); - - Path tmpDir = new Path("random-work"); - Path inDir = new Path(tmpDir, "in"); - Path fakeOutDir = new Path(tmpDir, "out"); - FileSystem fileSys = FileSystem.get(jobConf); - if (fileSys.exists(outDir)) { - System.out.println("Error: Output directory " + outDir + - " already exists."); - return; - } - fileSys.delete(tmpDir); - if (!fileSys.mkdirs(inDir)) { - System.out.println("Error: Mkdirs failed to create " + - inDir.toString()); - return; - } - NumberFormat numberFormat = NumberFormat.getInstance(); - numberFormat.setMinimumIntegerDigits(6); - numberFormat.setGroupingUsed(false); - - for(int i=0; i < numMaps; ++i) { - Path file = new Path(inDir, "part"+i); - FSDataOutputStream writer = fileSys.create(file); - writer.writeBytes(outDir + "/part" + numberFormat.format(i)+ "\n"); - writer.close(); - } - jobConf.setInputPath(inDir); - jobConf.setOutputPath(fakeOutDir); - - // Uncomment to run locally in a single process - //job_conf.set("mapred.job.tracker", "local"); + job.setNumReduceTasks(1); Date startTime = new Date(); System.out.println("Job started: " + startTime); - try { - JobClient.runJob(jobConf); - Date endTime = new Date(); - System.out.println("Job ended: " + endTime); - System.out.println("The job took " + - (endTime.getTime() - startTime.getTime()) /1000 + " seconds."); - } finally { - fileSys.delete(tmpDir); - } + JobClient.runJob(job); + Date endTime = new Date(); + System.out.println("Job ended: " + endTime); + System.out.println("The job took " + + (endTime.getTime() - startTime.getTime()) /1000 + + " seconds."); } }