Hi Everyone,
I'm working on a relatively simple MapReduce job with a slight complication
with regards to the ordering of my key/values heading into the reducer. The
output from the mapper might be something like
cat -> doc5, 1
cat -> doc1, 1
cat -> doc5, 3
...
Here, 'cat' is my key and the value is the document ID and the count (my own
WritableComparable.) Originally I was going to create a HashMap in the
reduce method and add an entry for each document ID and sum the counts for
each. I realized the method would be better if the values were in order like
so:
cat -> doc1, 1
cat -> doc5, 1
cat -> doc5, 3
...
Using this style I can continue summing until I reach a new document ID and
just collect the output at this point thus avoiding data structures and
object creation costs. I tried setting
JobConf.setOutputValueGroupingComparator() but this didn't seem to do
anything. In fact, I threw an exception from the Comparator I supplied but
this never showed up when running the job. My map output value consists of a
UTF and a Long so perhaps the Comparator I'm using (identical to
Text.Comparator) is incorrect:
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
int n1 = WritableUtils.decodeVIntSize(b1[s1]);
int n2 = WritableUtils.decodeVIntSize(b2[s2]);
return compareBytes(b1, s1 + n1, l1 - n1, b2, s2 + n2, l2 - n2);
}
In my final output I'm basically running into the same word -> documentID
being output multiple times. So for the above example I have multiple lines
with cat -> doc5, X.
Reducer method just in case:
public void reduce(Text key, Iterator<TermFrequencyWritable> values,
OutputCollector<Text, TermFrequencyWritable> output, Reporter reporter)
throws IOException {
long sum = 0;
String lastDocID = null;
// Iterate through all values
while(values.hasNext()) {
TermFrequencyWritable value = values.next();
// Encountered new document ID = record and reset
if(!value.getDocumentID().equals(lastDocID)) {
// Ignore first go through
if(sum != 0) {
termFrequency.setDocumentID(lastDocID);
termFrequency.setFrequency(sum);
output.collect(key, termFrequency);
}
sum = 0;
lastDocID = value.getDocumentID();
}
sum += value.getFrequency();
}
// Record last one
termFrequency.setDocumentID(lastDocID);
termFrequency.setFrequency(sum);
output.collect(key, termFrequency);
}
Any ideas (Using Hadoop .19.1)?
Thanks,
- Bill
smime.p7s
Description: S/MIME cryptographic signature
