I've tried what it shows in the examples, but those don't seem to be
working. Aside from that, they also complain about deprecated
interface when I compile. Any help you guys can give would be greatly
appreciated.
Here's what I need to do in the mapper:
Read through some logs.
Modulo the timestamp by some integer N, which I want to pass on the
command line.
Pull out some field M, which I want to pass on the command line.
If I hard code N and M, the job completes just fine and there's proper
content in the output path.
If I pass N and M on the command line as otherArgs or as -D a=N -D b=M
and attempt to assign these to the jobConf with configure(), the job
completes successfully but with an empty output file. This tells me
that it's not pulling the proper field (M) out of the log line. When
passing as otherArgs and parsing in main(), I was using public static
variables for interval_length and field_name, but obviously that
didn't work either and I don't know why.
How do I do this?
If it can't be done, I'll just have to write a bunch of Mapper classes
for each combination of N and M, and that seems to be unnecessarily
verbose to me.
The code:
import java.io.IOException;
import org.json.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class FooBar {
public static class FooMapper extends Mapper<Object, Text,
Text, IntWritable> {
private final static IntWritable one = new IntWritable
(1);
private Integer interval_length;
private String field_name;
public void configure(JobConf job) {
interval_length = new Integer(job.get
("foo.interval_length"));
field_name = job.get("foo.field_name");
}
public void map(Object key, Text value, Context
context) throws IOException, InterruptedException {
try {
JSONObject j = new JSONObject
(value.toString());
int t = j.getInt("timestamp");
t = t % interval_length;
Text k = new Text(t.toString() + ":"
+ j.getString(field_name));
context.write(k, one);
} catch (Exception e) {
//
// how to do something useful with exceptions
in Hadoop?
//
}
}
}
//
// main was taken from the WordCount.java example
//
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf,
args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in>
<out>");
System.exit(2);
}
Job job = new Job(conf, "wordcount");
job.setJarByClass(FooBar.class);
job.setMapperClass(FooMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs
[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs
[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}