Hairong Kuang wrote:
Did you create your input file using a SequenceFile.Writer?
No, my input files are line-oriented log files, conceptually similar to
the Grep example.
Here's what I'm trying to do in a simplified example (parsing the lines
replaced with simulation code).
public class MapRedClassDemo implements Mapper, Reducer {
long count = 0;
public void configure(JobConf job) {}
public void close() throws IOException {}
public void map(WritableComparable key, Writable value,
OutputCollector output, Reporter reporter) throws IOException {
String line = ((UTF8) value).toString();
String s = "demo" + line.length();
output.collect(new LongWritable(count++ % 20), new UTF8(s));
}
public void reduce(WritableComparable key, Iterator values,
OutputCollector output, Reporter reporter) throws IOException {
Set set = new HashSet();
while (values.hasNext()) {
set.add(values.next());
}
// output.collect(key, new IntWritable(set.size()));
output.collect(key, new UTF8("" + set.size()));
}
public static void main(String[] args) throws Exception {
Configuration defaults = new Configuration();
JobConf statJob = new JobConf(defaults, MapRedClassDemo.class);
statJob.setInputDir(new File(args[0]));
statJob.setMapperClass(MapRedClassDemo.class);
statJob.setReducerClass(MapRedClassDemo.class);
statJob.setInputKeyClass(LongWritable.class);
statJob.setInputValueClass(UTF8.class);
statJob.setOutputDir(new File(args[1] + "/" + System.currentTimeMillis()
+ "/"));
statJob.setOutputFormat(TextOutputFormat.class);
statJob.setOutputKeyClass(LongWritable.class);
// statJob.setOutputValueClass(IntWritable.class);
statJob.setOutputValueClass(UTF8.class);
JobClient.runJob(statJob);
}
}
This works, since both Map and Reduce emit <LongWritable, UTF8> pairs.
If tried with statJob.setOutputValueClass(IntWritable.class) and Recuce
emitting <LongWritable,IntWritable>s the _Map_ operation fails with
java.io.IOException: wrong value class: demo310 is not class
org.apache.hadoop.io.IntWritable
at
org.apache.hadoop.io.SequenceFile$Writer.append(SequenceFile.java:150)
at org.apache.hadoop.mapred.MapTask$2.collect(MapTask.java:92)
so it appears that the setInputXXClass methods have no effect on the Map
phase.
Or have I understood something completely wrong?