Author: tomwhite
Date: Tue Jul 29 12:50:23 2014
New Revision: 1614329
URL: http://svn.apache.org/r1614329
Log:
AVRO-1553. Java: MapReduce never uses MapOutputValueSchema.
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroDatumConverterFactory.java
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestKeyValueInput.java
Modified: avro/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1614329&r1=1614328&r2=1614329&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Tue Jul 29 12:50:23 2014
@@ -10,6 +10,8 @@ Trunk (not yet released)
BUG FIXES
+ AVRO-1553. Java: MapReduce never uses MapOutputValueSchema (tomwhite)
+
Avro 1.7.7 (23 July 2014)
NEW FEATURES
Modified:
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroDatumConverterFactory.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroDatumConverterFactory.java?rev=1614329&r1=1614328&r2=1614329&view=diff
==============================================================================
---
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroDatumConverterFactory.java
(original)
+++
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroDatumConverterFactory.java
Tue Jul 29 12:50:23 2014
@@ -80,7 +80,7 @@ public class AvroDatumConverterFactory e
public <IN, OUT> AvroDatumConverter<IN, OUT> create(Class<IN> inputClass) {
boolean isMapOnly = ((JobConf)getConf()).getNumReduceTasks() == 0;
if (AvroKey.class.isAssignableFrom(inputClass)) {
- Schema schema = null;
+ Schema schema;
if (isMapOnly) {
schema = AvroJob.getMapOutputKeySchema(getConf());
if (null == schema) {
@@ -97,9 +97,9 @@ public class AvroDatumConverterFactory e
return (AvroDatumConverter<IN, OUT>) new AvroWrapperConverter(schema);
}
if (AvroValue.class.isAssignableFrom(inputClass)) {
- Schema schema = null;
+ Schema schema;
if (isMapOnly) {
- AvroJob.getMapOutputValueSchema(getConf());
+ schema = AvroJob.getMapOutputValueSchema(getConf());
if (null == schema) {
schema = AvroJob.getOutputValueSchema(getConf());
}
Modified:
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestKeyValueInput.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestKeyValueInput.java?rev=1614329&r1=1614328&r2=1614329&view=diff
==============================================================================
---
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestKeyValueInput.java
(original)
+++
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestKeyValueInput.java
Tue Jul 29 12:50:23 2014
@@ -148,7 +148,7 @@ public class TestKeyValueInput {
// Run the job.
assertTrue(job.waitForCompletion(true));
- // Verify that the output Avro container file as the expected data.
+ // Verify that the output Avro container file has the expected data.
File avroFile = new File(outputPath.toString(), "part-r-00000.avro");
DatumReader<GenericRecord> datumReader = new
SpecificDatumReader<GenericRecord>(
AvroKeyValue.getSchema(Schema.create(Schema.Type.STRING),
@@ -188,4 +188,67 @@ public class TestKeyValueInput {
assertFalse(avroFileReader.hasNext());
avroFileReader.close();
}
+
+ @Test
+ public void testKeyValueInputMapOnly()
+ throws ClassNotFoundException, IOException, InterruptedException {
+ // Create a test input file.
+ File inputFile = createInputFile();
+
+ // Configure the job input.
+ Job job = new Job();
+ FileInputFormat.setInputPaths(job, new Path(inputFile.getAbsolutePath()));
+ job.setInputFormatClass(AvroKeyValueInputFormat.class);
+ AvroJob.setInputKeySchema(job, Schema.create(Schema.Type.INT));
+ AvroJob.setInputValueSchema(job, Schema.create(Schema.Type.STRING));
+
+ // Configure the identity mapper.
+ AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.INT));
+ AvroJob.setMapOutputValueSchema(job, Schema.create(Schema.Type.STRING));
+
+ // Configure zero reducers.
+ job.setNumReduceTasks(0);
+ job.setOutputKeyClass(AvroKey.class);
+ job.setOutputValueClass(AvroValue.class);
+
+ // Configure the output format.
+ job.setOutputFormatClass(AvroKeyValueOutputFormat.class);
+ Path outputPath = new Path(mTempDir.getRoot().getPath(), "out-index");
+ FileOutputFormat.setOutputPath(job, outputPath);
+
+ // Run the job.
+ assertTrue(job.waitForCompletion(true));
+
+ // Verify that the output Avro container file has the expected data.
+ File avroFile = new File(outputPath.toString(), "part-m-00000.avro");
+ DatumReader<GenericRecord> datumReader = new
SpecificDatumReader<GenericRecord>(
+ AvroKeyValue.getSchema(Schema.create(Schema.Type.INT),
+ Schema.create(Schema.Type.STRING)));
+ DataFileReader<GenericRecord> avroFileReader
+ = new DataFileReader<GenericRecord>(avroFile, datumReader);
+ assertTrue(avroFileReader.hasNext());
+
+ AvroKeyValue<Integer, CharSequence> record1
+ = new AvroKeyValue<Integer, CharSequence>(avroFileReader.next());
+ assertNotNull(record1.get());
+ assertEquals(1, record1.getKey().intValue());
+ assertEquals("apple banana carrot", record1.getValue().toString());
+
+ assertTrue(avroFileReader.hasNext());
+ AvroKeyValue<Integer, CharSequence> record2
+ = new AvroKeyValue<Integer, CharSequence>(avroFileReader.next());
+ assertNotNull(record2.get());
+ assertEquals(2, record2.getKey().intValue());
+ assertEquals("apple banana", record2.getValue().toString());
+
+ assertTrue(avroFileReader.hasNext());
+ AvroKeyValue<Integer, CharSequence> record3
+ = new AvroKeyValue<Integer, CharSequence>(avroFileReader.next());
+ assertNotNull(record3.get());
+ assertEquals(3, record3.getKey().intValue());
+ assertEquals("apple", record3.getValue().toString());
+
+ assertFalse(avroFileReader.hasNext());
+ avroFileReader.close();
+ }
}