Hi all ,

I was trying a mapreduce module with multiple outputs.

My reducer looks like this :

*public class JohnReducer<K, V>  extends MapReduceBase implements Reducer<K,
V, K, V> {*
*private MultipleOutputs mos;*
*
*
*  public void configure (JobConf conf) {*
*mos = new MultipleOutputs(conf);*
*}*
*
*
*
*
*  /** Writes all keys and values directly to output. */*
*  public void reduce(K key, Iterator<V> values, OutputCollector<K, V>
output, Reporter reporter)*
*    throws IOException {*
*
*
*BytesWritable value = new BytesWritable();*
*
*
*   while (values.hasNext()) {*
*      value = (BytesWritable)values.next();*
*      mos.getCollector("fpdb", reporter).collect(key, (V)value);*
*          }*
*      mos.getCollector("dup1Int",reporter).collect(key,(V)value);*
*  }*
*
*
*
*
*public void close() throws IOException {*
*mos.close();*
*}*
*
*
*}*

So effectively, fpdb should be giving a IdentityReducer output and dup1Int
should be giving the last record in every Reducer.reduce.* But both my files
are getting filled with all records !!!*

I have set the job configuration as ::

*        MultipleOutputs.addNamedOutput(job, "fpdb",
JohnOutputFormat.class,BytesWritable.class, BytesWritable.class);*
*        MultipleOutputs.addNamedOutput(job, "dup1Int",
JohnOutputFormat.class,BytesWritable.class, BytesWritable.class);*
*
 job.setOutputValueGroupingComparator(BytesWritableNew.FirstComparator.class);
*
*        job.setOutputKeyComparatorClass(BytesWritableNew.Comparator.class);
*

Here, FirstComparator is used to get all the keys into the same reduce using
a compare which compares only partially. Comparator is implemented to sort
the keys totally .
I dont think Partitioner is important since I am using a single reduce.
I dont know whether the problem is with MultipleOutputs / Comparator or any
other modules..

Someone please help me get out of  this !

Thanks ,

Matthew

Reply via email to