working. Aside from that, they also complain about deprecated
interface
when I compile. Any help you guys can give would be greatly
appreciated.
Here's what I need to do in the mapper:
Read through some logs.
Modulo the timestamp by some integer N, which I want to pass on the
command line.
Pull out some field M, which I want to pass on the command line.
If I hard code N and M, the job completes just fine and there's
proper
content in the output path.
If I pass N and M on the command line as otherArgs or as -D a=N -D
b=M and
attempt to assign these to the jobConf with configure(), the job
completes
successfully but with an empty output file. This tells me that
it's not
pulling the proper field (M) out of the log line. When passing as
otherArgs
and parsing in main(), I was using public static variables for
interval_length and field_name, but obviously that didn't work
either and I
don't know why.
How do I do this?
If it can't be done, I'll just have to write a bunch of Mapper
classes for
each combination of N and M, and that seems to be unnecessarily
verbose to
me.
The code:
import java.io.IOException;
import org.json.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class FooBar {
public static class FooMapper extends Mapper<Object, Text,
Text,
IntWritable> {
private final static IntWritable one = new IntWritable
(1);
private Integer interval_length;
private String field_name;
public void configure(JobConf job) {
interval_length = new
Integer(job.get("foo.interval_length"));
field_name = job.get("foo.field_name");
}
public void map(Object key, Text value, Context
context)
throws IOException, InterruptedException {
try {
JSONObject j = new
JSONObject(value.toString());
int t = j.getInt("timestamp");
t = t % interval_length;
Text k = new Text(t.toString() + ":" +
j.getString(field_name));
context.write(k, one);
} catch (Exception e) {
//
// how to do something useful with
exceptions in Hadoop?
//
}
}
}
//
// main was taken from the WordCount.java example
//
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf,
args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in>
<out>");
System.exit(2);
}
Job job = new Job(conf, "wordcount");
job.setJarByClass(FooBar.class);
job.setMapperClass(FooMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs
[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs
[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}