Here is the MapReduce I use to randomize the lines of a file. I've omitted
the imports for brevity - your IDE can fix that.
Enjoy!
-ryan
public class Randomize {
// technically text/text could be 'object'.
public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, IntWritable, Text> {
Random rnd = new Random();
public void map(LongWritable key, Text value,
OutputCollector<IntWritable, Text> output, Reporter reporter)
throws IOException {
IntWritable redKey = new IntWritable(rnd.nextInt(100000));
output.collect(redKey, value);
reporter.setStatus("Map emitting cell for: " + redKey);
}
}
// This combiner reduces the time of a map-reduce from 1h18m -> 48m.
// That is a 38% improvement (!!).
public static class Combiner extends MapReduceBase
implements Reducer<IntWritable, Text, IntWritable, Text> {
public void reduce(IntWritable key, Iterator<Text> values,
OutputCollector<IntWritable, Text> output, Reporter reporter)
throws IOException {
Text out = new Text();
byte newline [] = {'\n'};
int siz = 0;
while (values.hasNext())
{
Text txt = values.next();
out.append(txt.getBytes(), 0, txt.getLength());
if (++siz > 500) {
output.collect(key, out);
siz = 0;
out = new Text();
} else {
if (values.hasNext())
out.append(newline, 0, newline.length);
}
}
output.collect(key, out);
}
}
public static class Reduce extends MapReduceBase implements
Reducer<IntWritable, Text, NullWritable, Text> {
public void reduce(IntWritable key, Iterator<Text> values,
OutputCollector<NullWritable, Text> output, Reporter reporter)
throws IOException {
while (values.hasNext())
{
output.collect(NullWritable.get(), values.next());
}
}
}
public static void main(String [] argv) throws IOException {
if (argv.length < 2) {
System.out.println("Usage: <input> <randomized output>");
return;
}
JobConf job = new JobConf(Randomize.class);
job.setJobName("Randomize: " + argv[0]);
FileInputFormat.setInputPaths(job, new Path(argv[0]));
job.setInputFormat(TextInputFormat.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setCombinerClass(Combiner.class);
FileOutputFormat.setOutputPath(job, new Path(argv[1]));
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
JobClient jc = new JobClient(job);
jc.submitJob(job);
}
}