Author: cdouglas
Date: Tue Jul 14 22:42:06 2009
New Revision: 794103
URL: http://svn.apache.org/viewvc?rev=794103&view=rev
Log:
MAPREDUCE-565. Fix partitioner to work with new API. Contributed by Owen
O'Malley
Modified:
hadoop/common/branches/branch-0.20/CHANGES.txt
hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java
hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
Modified: hadoop/common/branches/branch-0.20/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/CHANGES.txt?rev=794103&r1=794102&r2=794103&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20/CHANGES.txt Tue Jul 14 22:42:06 2009
@@ -175,6 +175,9 @@
HADOOP-6145. Fix FsShell rm/rmr error messages when there is a FNFE.
(Jakob Homan via szetszwo)
+ MAPREDUCE-565. Fix partitioner to work with new API. (Owen O'Malley via
+ cdouglas)
+
Release 0.20.0 - 2009-04-15
INCOMPATIBLE CHANGES
Modified:
hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=794103&r1=794102&r2=794103&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java
(original)
+++
hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java
Tue Jul 14 22:42:06 2009
@@ -353,7 +353,7 @@
ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
try {
- runner.run(in, collector, reporter);
+ runner.run(in, new OldOutputCollector(collector, conf), reporter);
collector.flush();
} finally {
//close
@@ -427,20 +427,80 @@
}
}
+ /**
+ * Since the mapred and mapreduce Partitioners don't share a common interface
+ * (JobConfigurable is deprecated and a subtype of mapred.Partitioner), the
+ * partitioner lives in Old/NewOutputCollector. Note that, for map-only jobs,
+ * the configured partitioner should not be called. It's common for
+ * partitioners to compute a result mod numReduces, which causes a div0 error
+ */
+ private static class OldOutputCollector<K,V> implements OutputCollector<K,V>
{
+ private final Partitioner<K,V> partitioner;
+ private final MapOutputCollector<K,V> collector;
+ private final int numPartitions;
+
+ @SuppressWarnings("unchecked")
+ OldOutputCollector(MapOutputCollector<K,V> collector, JobConf conf) {
+ numPartitions = conf.getNumReduceTasks();
+ if (numPartitions > 0) {
+ partitioner = (Partitioner<K,V>)
+ ReflectionUtils.newInstance(conf.getPartitionerClass(), conf);
+ } else {
+ partitioner = new Partitioner<K,V>() {
+ @Override
+ public void configure(JobConf job) { }
+ @Override
+ public int getPartition(K key, V value, int numPartitions) {
+ return -1;
+ }
+ };
+ }
+ this.collector = collector;
+ }
+
+ @Override
+ public void collect(K key, V value) throws IOException {
+ try {
+ collector.collect(key, value,
+ partitioner.getPartition(key, value, numPartitions));
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new IOException("interrupt exception", ie);
+ }
+ }
+ }
+
private class NewOutputCollector<K,V>
extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
- private MapOutputCollector<K,V> collector;
+ private final MapOutputCollector<K,V> collector;
+ private final org.apache.hadoop.mapreduce.Partitioner<K,V> partitioner;
+ private final int partitions;
- NewOutputCollector(JobConf job,
+ @SuppressWarnings("unchecked")
+ NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
+ JobConf job,
TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException {
collector = new MapOutputBuffer<K,V>(umbilical, job, reporter);
+ partitions = jobContext.getNumReduceTasks();
+ if (partitions > 0) {
+ partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
+ ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
+ } else {
+ partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
+ @Override
+ public int getPartition(K key, V value, int numPartitions) {
+ return -1;
+ }
+ };
+ }
}
@Override
- public void write(K key, V value) throws IOException {
- collector.collect(key, value);
+ public void write(K key, V value) throws IOException, InterruptedException
{
+ collector.collect(key, value,
+ partitioner.getPartition(key, value, partitions));
}
@Override
@@ -510,7 +570,7 @@
if (job.getNumReduceTasks() == 0) {
output = outputFormat.getRecordWriter(taskContext);
} else {
- output = new NewOutputCollector(job, umbilical, reporter);
+ output = new NewOutputCollector(taskContext, job, umbilical, reporter);
}
mapperContext = contextConstructor.newInstance(mapper, job, getTaskID(),
@@ -532,9 +592,10 @@
}
}
- interface MapOutputCollector<K, V>
- extends OutputCollector<K, V> {
+ interface MapOutputCollector<K, V> {
+ public void collect(K key, V value, int partition
+ ) throws IOException, InterruptedException;
public void close() throws IOException, InterruptedException;
public void flush() throws IOException, InterruptedException,
@@ -574,7 +635,7 @@
ClassNotFoundException {
}
- public void collect(K key, V value) throws IOException {
+ public void collect(K key, V value, int partition) throws IOException {
reporter.progress();
out.write(key, value);
mapOutputRecordCounter.increment(1);
@@ -585,7 +646,6 @@
class MapOutputBuffer<K extends Object, V extends Object>
implements MapOutputCollector<K, V>, IndexedSortable {
private final int partitions;
- private final Partitioner<K, V> partitioner;
private final JobConf job;
private final TaskReporter reporter;
private final Class<K> keyClass;
@@ -653,7 +713,6 @@
this.reporter = reporter;
localFs = FileSystem.getLocal(job);
partitions = job.getNumReduceTasks();
- partitioner = ReflectionUtils.newInstance(job.getPartitionerClass(),
job);
rfs = ((LocalFileSystem)localFs).getRaw();
@@ -739,8 +798,8 @@
}
}
- public synchronized void collect(K key, V value)
- throws IOException {
+ public synchronized void collect(K key, V value, int partition
+ ) throws IOException {
reporter.progress();
if (key.getClass() != keyClass) {
throw new IOException("Type mismatch in key from map: expected "
@@ -801,7 +860,6 @@
valSerializer.serialize(value);
int valend = bb.markRecord();
- final int partition = partitioner.getPartition(key, value, partitions);
if (partition < 0 || partition >= partitions) {
throw new IOException("Illegal partition for " + key + " (" +
partition + ")");
@@ -821,7 +879,7 @@
kvindex = kvnext;
} catch (MapBufferTooSmallException e) {
LOG.info("Record too large for in-memory buffer: " + e.getMessage());
- spillSingleRecord(key, value);
+ spillSingleRecord(key, value, partition);
mapOutputRecordCounter.increment(1);
return;
}
@@ -1201,11 +1259,10 @@
* the in-memory buffer, so we must spill the record from collect
* directly to a spill file. Consider this "losing".
*/
- private void spillSingleRecord(final K key, final V value)
- throws IOException {
+ private void spillSingleRecord(final K key, final V value,
+ int partition) throws IOException {
long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
FSDataOutputStream out = null;
- final int partition = partitioner.getPartition(key, value, partitions);
try {
// create spill file
final SpillRecord spillRec = new SpillRecord(partitions);
Modified:
hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?rev=794103&r1=794102&r2=794103&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
(original)
+++
hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
Tue Jul 14 22:42:06 2009
@@ -27,6 +27,12 @@
import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.SecondarySort.FirstGroupingComparator;
+import org.apache.hadoop.examples.SecondarySort.FirstPartitioner;
+import org.apache.hadoop.examples.SecondarySort.IntPair;
+import org.apache.hadoop.examples.SecondarySort;
+import org.apache.hadoop.examples.WordCount;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
@@ -35,6 +41,10 @@
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapred.MRCaching.TestResult;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TestMapReduceLocal;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Progressable;
/**
@@ -45,7 +55,8 @@
new File(System.getProperty("test.build.data","/tmp"))
.toURI().toString().replace(' ', '+');
- public void testWithLocal() throws IOException {
+ public void testWithLocal()
+ throws IOException, InterruptedException, ClassNotFoundException {
MiniMRCluster mr = null;
try {
mr = new MiniMRCluster(2, "file:///", 3);
@@ -80,6 +91,7 @@
assertEquals("number of reduce outputs", 9,
counters.getCounter(Task.Counter.REDUCE_OUTPUT_RECORDS));
runCustomFormats(mr);
+ runSecondarySort(mr.createJobConf());
} finally {
if (mr != null) { mr.shutdown(); }
}
@@ -284,4 +296,47 @@
JobConf job) throws IOException {
}
}
+
+ private void runSecondarySort(Configuration conf) throws IOException,
+ InterruptedException,
+ ClassNotFoundException
{
+ FileSystem localFs = FileSystem.getLocal(conf);
+ localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
+ localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
+ TestMapReduceLocal.writeFile
+ ("in/part1", "-1 -4\n-3 23\n5 10\n-1 -2\n-1 300\n-1 10\n4 1\n" +
+ "4 2\n4 10\n4 -1\n4 -10\n10 20\n10 30\n10 25\n");
+ Job job = new Job(conf, "word count");
+ job.setJarByClass(WordCount.class);
+ job.setNumReduceTasks(2);
+ job.setMapperClass(SecondarySort.MapClass.class);
+ job.setReducerClass(SecondarySort.Reduce.class);
+ // group and partition by the first int in the pair
+ job.setPartitionerClass(FirstPartitioner.class);
+ job.setGroupingComparatorClass(FirstGroupingComparator.class);
+
+ // the map output is IntPair, IntWritable
+ job.setMapOutputKeyClass(IntPair.class);
+ job.setMapOutputValueClass(IntWritable.class);
+
+ // the reduce output is Text, IntWritable
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(IntWritable.class);
+
+ FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+ FileOutputFormat.setOutputPath(job, new Path(TEST_ROOT_DIR + "/out"));
+ assertTrue(job.waitForCompletion(true));
+ String out = TestMapReduceLocal.readFile("out/part-r-00000");
+ assertEquals("------------------------------------------------\n" +
+ "4\t-10\n4\t-1\n4\t1\n4\t2\n4\t10\n" +
+ "------------------------------------------------\n" +
+ "10\t20\n10\t25\n10\t30\n", out);
+ out = TestMapReduceLocal.readFile("out/part-r-00001");
+ assertEquals("------------------------------------------------\n" +
+ "-3\t23\n" +
+ "------------------------------------------------\n" +
+ "-1\t-4\n-1\t-2\n-1\t10\n-1\t300\n" +
+ "------------------------------------------------\n" +
+ "5\t10\n", out);
+ }
}
Modified:
hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java?rev=794103&r1=794102&r2=794103&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
(original)
+++
hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
Tue Jul 14 22:42:06 2009
@@ -27,11 +27,7 @@
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.examples.SecondarySort;
import org.apache.hadoop.examples.WordCount;
-import org.apache.hadoop.examples.SecondarySort.FirstGroupingComparator;
-import org.apache.hadoop.examples.SecondarySort.FirstPartitioner;
-import org.apache.hadoop.examples.SecondarySort.IntPair;
import org.apache.hadoop.examples.WordCount.IntSumReducer;
import org.apache.hadoop.examples.WordCount.TokenizerMapper;
import org.apache.hadoop.fs.FileSystem;
@@ -61,7 +57,7 @@
}
}
- public Path writeFile(String name, String data) throws IOException {
+ public static Path writeFile(String name, String data) throws IOException {
Path file = new Path(TEST_ROOT_DIR + "/" + name);
localFs.delete(file, false);
DataOutputStream f = localFs.create(file);
@@ -70,7 +66,7 @@
return file;
}
- public String readFile(String name) throws IOException {
+ public static String readFile(String name) throws IOException {
DataInputStream f = localFs.open(new Path(TEST_ROOT_DIR + "/" + name));
BufferedReader b = new BufferedReader(new InputStreamReader(f));
StringBuilder result = new StringBuilder();
@@ -90,7 +86,6 @@
mr = new MiniMRCluster(2, "file:///", 3);
Configuration conf = mr.createJobConf();
runWordCount(conf);
- runSecondarySort(conf);
} finally {
if (mr != null) { mr.shutdown(); }
}
@@ -162,43 +157,4 @@
assertTrue("combine in > combine out", combineIn > combineOut);
}
- private void runSecondarySort(Configuration conf) throws IOException,
- InterruptedException,
- ClassNotFoundException
{
- localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
- localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
- writeFile("in/part1", "-1 -4\n-3 23\n5 10\n-1 -2\n-1 300\n-1 10\n4 1\n" +
- "4 2\n4 10\n4 -1\n4 -10\n10 20\n10 30\n10 25\n");
- Job job = new Job(conf, "word count");
- job.setJarByClass(WordCount.class);
- job.setMapperClass(SecondarySort.MapClass.class);
- job.setReducerClass(SecondarySort.Reduce.class);
- // group and partition by the first int in the pair
- job.setPartitionerClass(FirstPartitioner.class);
- job.setGroupingComparatorClass(FirstGroupingComparator.class);
-
- // the map output is IntPair, IntWritable
- job.setMapOutputKeyClass(IntPair.class);
- job.setMapOutputValueClass(IntWritable.class);
-
- // the reduce output is Text, IntWritable
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
-
- FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
- FileOutputFormat.setOutputPath(job, new Path(TEST_ROOT_DIR + "/out"));
- assertTrue(job.waitForCompletion(true));
- String out = readFile("out/part-r-00000");
- assertEquals("------------------------------------------------\n" +
- "-3\t23\n" +
- "------------------------------------------------\n" +
- "-1\t-4\n-1\t-2\n-1\t10\n-1\t300\n" +
- "------------------------------------------------\n" +
- "4\t-10\n4\t-1\n4\t1\n4\t2\n4\t10\n" +
- "------------------------------------------------\n" +
- "5\t10\n" +
- "------------------------------------------------\n" +
- "10\t20\n10\t25\n10\t30\n", out);
- }
-
}