Author: cutting
Date: Wed Apr 20 19:55:37 2011
New Revision: 1095493
URL: http://svn.apache.org/viewvc?rev=1095493&view=rev
Log:
AVRO-763. Java MapReduce API: add support for configure() and close() methods
to mappers and reducers. Contributed by Marshall Pierce.
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMapper.java
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroReducer.java
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopMapper.java
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopReducerBase.java
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestWeather.java
Modified: avro/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1095493&r1=1095492&r2=1095493&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Wed Apr 20 19:55:37 2011
@@ -52,6 +52,9 @@ Avro 1.5.1 (unreleased)
AVRO-798. Add checksum to Snappy compressed blocks. (cutting)
+ AVRO-763. Java MapReduce API: add support for configure() and
+ close() methods to mappers and reducers. (Marshall Pierce via cutting)
+
BUG FIXES
AVRO-786. Java: Fix equals() to work on objects containing maps. (cutting)
Modified:
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMapper.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMapper.java?rev=1095493&r1=1095492&r2=1095493&view=diff
==============================================================================
---
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMapper.java
(original)
+++
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMapper.java
Wed Apr 20 19:55:37 2011
@@ -18,17 +18,20 @@
package org.apache.avro.mapred;
+import java.io.Closeable;
import java.io.IOException;
-import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.Reporter;
/** A mapper for Avro data.
*
* <p>Applications subclass this class and pass their subclass to {@link
- * AvroJob#setMapperClass}, overriding {@link #map}.
+ * AvroJob#setMapperClass(JobConf, Class)}, overriding {@link #map(Object,
AvroCollector, Reporter)}.
*/
-public class AvroMapper<IN,OUT> extends Configured {
+public class AvroMapper<IN, OUT> extends Configured implements
JobConfigurable, Closeable {
/** Called with each map input datum. By default, collects inputs. */
@SuppressWarnings("unchecked")
@@ -38,4 +41,15 @@ public class AvroMapper<IN,OUT> extends
}
+ /** Subclasses can override this as desired. */
+ @Override
+ public void close() throws IOException {
+ // no op
+ }
+
+ /** Subclasses can override this as desired. */
+ @Override
+ public void configure(JobConf jobConf) {
+ // no op
+ }
}
Modified:
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroReducer.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroReducer.java?rev=1095493&r1=1095492&r2=1095493&view=diff
==============================================================================
---
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroReducer.java
(original)
+++
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroReducer.java
Wed Apr 20 19:55:37 2011
@@ -18,19 +18,22 @@
package org.apache.avro.mapred;
+import java.io.Closeable;
import java.io.IOException;
-import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.Reporter;
/** A reducer for Avro data.
*
* <p>Applications should subclass this class and pass their subclass to {@link
- * AvroJob#setReducerClass} and perhaps {@link AvroJob#setCombinerClass}.
- * Subclasses override {@link #reduce}.
+ * AvroJob#setReducerClass(JobConf, Class)} and perhaps {@link
AvroJob#setCombinerClass(JobConf, Class)}.
+ * Subclasses override {@link #reduce(Object, Iterable, AvroCollector,
Reporter)}.
*/
-public class AvroReducer<K,V,OUT> extends Configured {
+public class AvroReducer<K,V,OUT> extends Configured implements
JobConfigurable, Closeable {
private Pair<K,V> outputPair;
@@ -48,4 +51,15 @@ public class AvroReducer<K,V,OUT> extend
}
}
+ /** Subclasses can override this as desired. */
+ @Override
+ public void close() throws IOException {
+ // no op
+ }
+
+ /** Subclasses can override this as desired. */
+ @Override
+ public void configure(JobConf jobConf) {
+ // no op
+ }
}
Modified:
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopMapper.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopMapper.java?rev=1095493&r1=1095492&r2=1095493&view=diff
==============================================================================
---
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopMapper.java
(original)
+++
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopMapper.java
Wed Apr 20 19:55:37 2011
@@ -21,11 +21,11 @@ package org.apache.avro.mapred;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.ReflectionUtils;
/** Bridge between a {@link org.apache.hadoop.mapred.Mapper} and an {@link
@@ -45,6 +45,7 @@ class HadoopMapper<IN,OUT,K,V,KO,VO> ext
(conf.getClass(AvroJob.MAPPER, AvroMapper.class, AvroMapper.class),
conf);
this.isMapOnly = conf.getNumReduceTasks() == 0;
+ this.mapper.configure(conf);
}
@SuppressWarnings("unchecked")
@@ -80,4 +81,9 @@ class HadoopMapper<IN,OUT,K,V,KO,VO> ext
mapper.map(wrapper.datum(), out, reporter);
}
+ @Override
+ public void close() throws IOException {
+ this.mapper.close();
+ }
+
}
Modified:
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopReducerBase.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopReducerBase.java?rev=1095493&r1=1095492&r2=1095493&view=diff
==============================================================================
---
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopReducerBase.java
(original)
+++
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopReducerBase.java
Wed Apr 20 19:55:37 2011
@@ -39,6 +39,7 @@ abstract class HadoopReducerBase<K,V,OUT
@Override
public void configure(JobConf conf) {
this.reducer = getReducer(conf);
+ this.reducer.configure(conf);
}
class ReduceIterable implements Iterable<V>, Iterator<V> {
@@ -60,4 +61,8 @@ abstract class HadoopReducerBase<K,V,OUT
reducer.reduce(key.datum(), reduceIterable, collector, reporter);
}
+ @Override
+ public void close() throws IOException {
+ this.reducer.close();
+ }
}
Modified:
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestWeather.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestWeather.java?rev=1095493&r1=1095492&r2=1095493&view=diff
==============================================================================
---
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestWeather.java
(original)
+++
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestWeather.java
Wed Apr 20 19:55:37 2011
@@ -20,6 +20,7 @@ package org.apache.avro.mapred;
import java.io.IOException;
import java.io.File;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobClient;
@@ -34,15 +35,31 @@ import org.apache.avro.io.DatumReader;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.file.DataFileReader;
import static org.apache.avro.file.DataFileConstants.SNAPPY_CODEC;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import org.junit.After;
import org.junit.Test;
-import static org.junit.Assert.*;
import test.Weather;
/** Tests mapred API with a specific record. */
public class TestWeather {
+ private static final AtomicInteger mapCloseCalls = new AtomicInteger();
+ private static final AtomicInteger mapConfigureCalls = new AtomicInteger();
+ private static final AtomicInteger reducerCloseCalls = new AtomicInteger();
+ private static final AtomicInteger reducerConfigureCalls = new
AtomicInteger();
+
+
+ @After
+ public void tearDown() {
+ mapCloseCalls.set(0);
+ mapConfigureCalls.set(0);
+ reducerCloseCalls.set(0);
+ reducerConfigureCalls.set(0);
+ }
+
/** Uses default mapper with no reduces for a map-only identity job. */
@Test
@SuppressWarnings("deprecation")
@@ -64,7 +81,7 @@ public class TestWeather {
FileOutputFormat.setCompressOutput(job, true);
job.setNumReduceTasks(0); // map-only
-
+
JobClient.runJob(job);
// check output is correct
@@ -88,8 +105,18 @@ public class TestWeather {
Reporter reporter) throws IOException {
collector.collect(new Pair<Weather,Void>(w, (Void)null));
}
+
+ @Override
+ public void close() throws IOException {
+ mapCloseCalls.incrementAndGet();
+ }
+
+ @Override
+ public void configure(JobConf jobConf) {
+ mapConfigureCalls.incrementAndGet();
+ }
}
-
+
// output keys only, since values are empty
public static class SortReducer
extends AvroReducer<Weather, Void, Weather> {
@@ -99,7 +126,17 @@ public class TestWeather {
Reporter reporter) throws IOException {
collector.collect(w);
}
- }
+
+ @Override
+ public void close() throws IOException {
+ reducerCloseCalls.incrementAndGet();
+ }
+
+ @Override
+ public void configure(JobConf jobConf) {
+ reducerConfigureCalls.incrementAndGet();
+ }
+ }
@Test
@SuppressWarnings("deprecation")
@@ -140,6 +177,15 @@ public class TestWeather {
check.close();
sorted.close();
+
+ // check that AvroMapper and AvroReducer get close() and configure() called
+ assertEquals(1, mapCloseCalls.get());
+ assertEquals(1, reducerCloseCalls.get());
+ // gets called twice for some reason, so loosen this check
+ assertTrue(mapConfigureCalls.get() >= 1);
+ assertTrue(reducerConfigureCalls.get() >= 1);
+
+
}