Author: cutting Date: Thu May 3 12:36:50 2007 New Revision: 534971 URL: http://svn.apache.org/viewvc?view=rev&rev=534971 Log: HADOOP-485. Allow a different comparator for grouping keys in calls to reduce. Contributed by Tahir.
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestUserValueGrouping.java Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=534971&r1=534970&r2=534971 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Thu May 3 12:36:50 2007 @@ -319,6 +319,9 @@ 94. HADOOP-1315. Clean up contrib/streaming, switching it to use core classes more and removing unused code. (Runping Qi via cutting) +95. HADOOP-485. Allow a different comparator for grouping keys in + calls to reduce. (Tahir Hashmi via cutting) + Release 0.12.3 - 2007-04-06 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?view=diff&rev=534971&r1=534970&r2=534971 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Thu May 3 12:36:50 2007 @@ -443,6 +443,40 @@ theClass, WritableComparator.class); } + /** Get the user defined comparator for grouping values. + * + * This call is used to get the comparator for grouping values by key. + * @see #setOutputValueGroupingComparator(Class) for details. + * + * @return Comparator set by the user for grouping values. + */ + public WritableComparator getOutputValueGroupingComparator() { + Class theClass = getClass("mapred.output.value.groupfn.class", null, + WritableComparator.class); + if (theClass == null) { + return getOutputKeyComparator(); + } + + return (WritableComparator)ReflectionUtils.newInstance(theClass, this); + } + + /** Set the user defined comparator for grouping values. + * + * For key-value pairs (K1,V1) and (K2,V2), the values are passed + * in a single call to the map function if K1 and K2 compare as equal. + * + * This comparator should be provided if the equivalence rules for keys + * for sorting the intermediates are different from those for grouping + * values. + * + * @param theClass The Comparator class to be used for grouping. It should + * extend WritableComparator. + */ + public void setOutputValueGroupingComparator(Class theClass) { + setClass("mapred.output.value.groupfn.class", + theClass, WritableComparator.class); + } + public Class<? extends Writable> getOutputValueClass() { return getClass("mapred.output.value.class", Text.class, Writable.class); } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=534971&r1=534970&r2=534971 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Thu May 3 12:36:50 2007 @@ -295,8 +295,8 @@ Path tempDir = job.getLocalPath(getTaskId()); - WritableComparator comparator = job.getOutputKeyComparator(); - + WritableComparator comparator = job.getOutputValueGroupingComparator(); + SequenceFile.Sorter.RawKeyValueIterator rIter; try { Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestUserValueGrouping.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestUserValueGrouping.java?view=auto&rev=534971 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestUserValueGrouping.java (added) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestUserValueGrouping.java Thu May 3 12:36:50 2007 @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.io.BooleanWritable.Comparator; +import org.apache.hadoop.mapred.lib.*; +import junit.framework.TestCase; +import java.io.*; +import java.util.*; + +/** + * + */ +public class TestUserValueGrouping extends TestCase +{ + JobConf conf = new JobConf(TestMapOutputType.class); + JobClient jc; + static Random rng = new Random(); + /** + * RandomGen is a mapper that generates 5 random values for each key + * in the input. The values are in the range [0-4]. The mapper also + * generates a composite key. If the input key is x and the generated + * value is y, the composite key is x0y (x-zero-y). Therefore, the inter- + * mediate key value pairs are ordered by {input key, value}. + */ + + static class RandomGen implements Mapper { + public void configure(JobConf job) { + } + + public void map(WritableComparable key, Writable value, + OutputCollector out, Reporter reporter) throws IOException { + int num_values = 5; + for(int i = 0; i < num_values; ++i) { + int val = rng.nextInt(num_values); + int compositeKey = ((IntWritable)(key)).get() * 100 + val; + out.collect(new IntWritable(compositeKey), new IntWritable(val)); + } + } + + public void close() { + } + } + + /** The reducer checks whether the input values are in sorted order and + * whether they are correctly grouped by key (i.e. each call to reduce + * should have 5 values if the grouping is correct). + */ + static class AscendingReduce implements Reducer { + + public void configure(JobConf job) { + } + + public void reduce(WritableComparable key, + Iterator values, + OutputCollector out, + Reporter reporter) throws IOException { + IntWritable previous = new IntWritable(-1); + int valueCount = 0; + while (values.hasNext()) { + IntWritable current = (IntWritable) values.next(); + + // Check that the values are sorted + if (current.compareTo(previous) < 0) + fail("Values generated by Mapper not in order"); + previous = current; + ++valueCount; + } + if (valueCount != 5) { + fail("Values not grouped by primary key"); + } + out.collect(key, new Text("success")); + } + + public void close() { + } + } + + /** Grouping function for values based on the composite key. This + * comparator strips off the secondary key part from the x0y composite + * and only compares the primary key value (x). + */ + public static class CompositeIntGroupFn extends WritableComparator { + public CompositeIntGroupFn() { + super(IntWritable.class); + } + public int compare (WritableComparable v1, WritableComparable v2) { + int val1 = ((IntWritable)(v1)).get() / 100; + int val2 = ((IntWritable)(v2)).get() / 100; + if (val1 > val2) + return 1; + else if (val2 > val1) + return -1; + else + return 0; + } + + public boolean equals (IntWritable v1, IntWritable v2) { + int val1 = v1.get(); + int val2 = v2.get(); + + return (val1/100) == (val2/100); + } + + static { + WritableComparator.define(CompositeIntGroupFn.class, new Comparator()); + } + } + + + public void configure() throws Exception { + Path testdir = new Path("build/test/test.mapred.spill"); + Path inDir = new Path(testdir, "in"); + Path outDir = new Path(testdir, "out"); + FileSystem fs = FileSystem.get(conf); + fs.delete(testdir); + conf.setInputFormat(SequenceFileInputFormat.class); + conf.setInputPath(inDir); + conf.setOutputPath(outDir); + conf.setMapperClass(RandomGen.class); + conf.setReducerClass(AscendingReduce.class); + conf.setOutputKeyClass(IntWritable.class); + conf.setOutputValueClass(Text.class); + conf.setMapOutputValueClass(IntWritable.class); + conf.setOutputValueGroupingComparator(CompositeIntGroupFn.class); + + conf.setOutputFormat(SequenceFileOutputFormat.class); + if (!fs.mkdirs(testdir)) { + throw new IOException("Mkdirs failed to create " + testdir.toString()); + } + if (!fs.mkdirs(inDir)) { + throw new IOException("Mkdirs failed to create " + inDir.toString()); + } + Path inFile = new Path(inDir, "part0"); + SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, inFile, + IntWritable.class, IntWritable.class); + writer.append(new IntWritable(11), new IntWritable(999)); + writer.append(new IntWritable(23), new IntWritable(456)); + writer.append(new IntWritable(10), new IntWritable(780)); + writer.close(); + + jc = new JobClient(conf); + } + + public void testUserValueGrouping() throws Exception { + configure(); + + RunningJob r_job = jc.submitJob(conf); + while (!r_job.isComplete()) { + Thread.sleep(1000); + } + + if (!r_job.isSuccessful()) { + fail("Oops! The job broke due to an unexpected error"); + } + } +}