Hi all ,
I was trying a mapreduce module with multiple outputs.
My reducer looks like this :
*public class JohnReducer<K, V> extends MapReduceBase implements Reducer<K,
V, K, V> {*
*private MultipleOutputs mos;*
*
*
* public void configure (JobConf conf) {*
*mos = new MultipleOutputs(conf);*
*}*
*
*
*
*
* /** Writes all keys and values directly to output. */*
* public void reduce(K key, Iterator<V> values, OutputCollector<K, V>
output, Reporter reporter)*
* throws IOException {*
*
*
*BytesWritable value = new BytesWritable();*
*
*
* while (values.hasNext()) {*
* value = (BytesWritable)values.next();*
* mos.getCollector("fpdb", reporter).collect(key, (V)value);*
* }*
* mos.getCollector("dup1Int",reporter).collect(key,(V)value);*
* }*
*
*
*
*
*public void close() throws IOException {*
*mos.close();*
*}*
*
*
*}*
So effectively, fpdb should be giving a IdentityReducer output and dup1Int
should be giving the last record in every Reducer.reduce.* But both my files
are getting filled with all records !!!*
I have set the job configuration as ::
* MultipleOutputs.addNamedOutput(job, "fpdb",
JohnOutputFormat.class,BytesWritable.class, BytesWritable.class);*
* MultipleOutputs.addNamedOutput(job, "dup1Int",
JohnOutputFormat.class,BytesWritable.class, BytesWritable.class);*
*
job.setOutputValueGroupingComparator(BytesWritableNew.FirstComparator.class);
*
* job.setOutputKeyComparatorClass(BytesWritableNew.Comparator.class);
*
Here, FirstComparator is used to get all the keys into the same reduce using
a compare which compares only partially. Comparator is implemented to sort
the keys totally .
I dont think Partitioner is important since I am using a single reduce.
I dont know whether the problem is with MultipleOutputs / Comparator or any
other modules..
Someone please help me get out of this !
Thanks ,
Matthew