Author: cutting
Date: Fri Jun 15 21:29:12 2012
New Revision: 1350810

URL: http://svn.apache.org/viewvc?rev=1350810&view=rev
Log:
AVRO-1108. Java: Add support for reflect API to newer mapreduce API.

Modified:
    avro/trunk/CHANGES.txt
    
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroDeserializer.java
    
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroKeyComparator.java
    
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSerializer.java
    
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java
    
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroRecordReaderBase.java
    
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestWordCount.java

Modified: avro/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1350810&r1=1350809&r2=1350810&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Fri Jun 15 21:29:12 2012
@@ -10,6 +10,9 @@ Avro 1.7.1 (unreleased)
     AVRO-1112. Java: Add support for Snappy codec to newer mapreduce API.
     (Matt Mead via cutting)
 
+    AVRO-1108. Java: Add support for reflect API to newer mapreduce API.
+    (cutting)
+
   IMPROVEMENTS
 
   BUG FIXES

Modified: 
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroDeserializer.java
URL: 
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroDeserializer.java?rev=1350810&r1=1350809&r2=1350810&view=diff
==============================================================================
--- 
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroDeserializer.java
 (original)
+++ 
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroDeserializer.java
 Fri Jun 15 21:29:12 2012
@@ -26,7 +26,7 @@ import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.mapred.AvroWrapper;
-import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.hadoop.io.serializer.Deserializer;
 
 /**
@@ -66,7 +66,7 @@ public abstract class AvroDeserializer<T
   protected AvroDeserializer(Schema writerSchema, Schema readerSchema) {
     mWriterSchema = writerSchema;
     mReaderSchema = null != readerSchema ? readerSchema : writerSchema;
-    mAvroDatumReader = new SpecificDatumReader<D>(mWriterSchema, 
mReaderSchema);
+    mAvroDatumReader = new ReflectDatumReader<D>(mWriterSchema, mReaderSchema);
   }
 
   /**

Modified: 
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroKeyComparator.java
URL: 
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroKeyComparator.java?rev=1350810&r1=1350809&r2=1350810&view=diff
==============================================================================
--- 
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroKeyComparator.java
 (original)
+++ 
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroKeyComparator.java
 Fri Jun 15 21:29:12 2012
@@ -22,7 +22,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.io.BinaryData;
 import org.apache.avro.mapred.AvroKey;
 import org.apache.avro.mapreduce.AvroJob;
-import org.apache.avro.specific.SpecificData;
+import org.apache.avro.reflect.ReflectData;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.io.RawComparator;
@@ -57,6 +57,6 @@ public class AvroKeyComparator<T> extend
   /** {@inheritDoc} */
   @Override
   public int compare(AvroKey<T> x, AvroKey<T> y) {
-    return SpecificData.get().compare(x.datum(), y.datum(), mSchema);
+    return ReflectData.get().compare(x.datum(), y.datum(), mSchema);
   }
 }

Modified: 
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSerializer.java
URL: 
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSerializer.java?rev=1350810&r1=1350809&r2=1350810&view=diff
==============================================================================
--- 
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSerializer.java
 (original)
+++ 
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSerializer.java
 Fri Jun 15 21:29:12 2012
@@ -26,7 +26,7 @@ import org.apache.avro.io.EncoderFactory
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.mapred.AvroWrapper;
-import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.hadoop.io.serializer.Serializer;
 
 /**
@@ -79,7 +79,7 @@ public class AvroSerializer<T> implement
       throw new IllegalArgumentException("Writer schema may not be null");
     }
     mWriterSchema = writerSchema;
-    mAvroDatumWriter = new SpecificDatumWriter<T>(writerSchema);
+    mAvroDatumWriter = new ReflectDatumWriter<T>(writerSchema);
   }
 
   /**

Modified: 
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java
URL: 
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java?rev=1350810&r1=1350809&r2=1350810&view=diff
==============================================================================
--- 
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java
 (original)
+++ 
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java
 Fri Jun 15 21:29:12 2012
@@ -25,7 +25,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -50,7 +50,7 @@ public class AvroKeyRecordWriter<T> exte
   public AvroKeyRecordWriter(Schema writerSchema, CodecFactory 
compressionCodec,
       OutputStream outputStream) throws IOException {
     // Create an Avro container file and a writer to it.
-    mAvroFileWriter = new DataFileWriter<T>(new 
SpecificDatumWriter<T>(writerSchema));
+    mAvroFileWriter = new DataFileWriter<T>(new 
ReflectDatumWriter<T>(writerSchema));
     mAvroFileWriter.setCodec(compressionCodec);
     mAvroFileWriter.create(writerSchema, outputStream);
   }

Modified: 
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroRecordReaderBase.java
URL: 
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroRecordReaderBase.java?rev=1350810&r1=1350809&r2=1350810&view=diff
==============================================================================
--- 
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroRecordReaderBase.java
 (original)
+++ 
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroRecordReaderBase.java
 Fri Jun 15 21:29:12 2012
@@ -25,7 +25,7 @@ import org.apache.avro.file.DataFileRead
 import org.apache.avro.file.SeekableInput;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.mapred.FsInput;
-import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -88,7 +88,7 @@ public abstract class AvroRecordReaderBa
 
     // Wrap the seekable input stream in an Avro DataFileReader.
     mAvroFileReader = createAvroFileReader(seekableFileInput,
-        new SpecificDatumReader<T>(mReaderSchema));
+        new ReflectDatumReader<T>(mReaderSchema));
 
     // Initialize the start and end offsets into the file based on the 
boundaries of the
     // input split we're responsible for.  We will read the first block that 
begins

Modified: 
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestWordCount.java
URL: 
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestWordCount.java?rev=1350810&r1=1350809&r2=1350810&view=diff
==============================================================================
--- 
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestWordCount.java
 (original)
+++ 
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestWordCount.java
 Fri Jun 15 21:29:12 2012
@@ -29,6 +29,8 @@ import org.apache.avro.generic.GenericDa
 import org.apache.avro.mapred.AvroKey;
 import org.apache.avro.mapred.FsInput;
 import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.util.Utf8;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -57,6 +59,18 @@ public class TestWordCount {
           + "\"fields\":[{\"name\":\"count\",\"type\":\"int\"},"
           + "{\"name\":\"name\",\"type\":\"string\"}]}");
 
+  public static class ReflectStats {
+    String name;
+    int count;
+  }
+
+  // permit data written as SpecficStats to be read as ReflectStats
+  private static Schema REFLECT_STATS_SCHEMA
+    = ReflectData.get().getSchema(ReflectStats.class);
+  static {
+    REFLECT_STATS_SCHEMA.addAlias(TextStats.SCHEMA$.getFullName());
+  }
+
   private static class LineCountMapper extends Mapper<LongWritable, Text, 
Text, IntWritable> {
     private IntWritable mOne;
 
@@ -92,6 +106,26 @@ public class TestWordCount {
     }
   }
 
+  private static class ReflectCountMapper
+      extends Mapper<AvroKey<ReflectStats>, NullWritable, Text, IntWritable> {
+    private IntWritable mCount;
+    private Text mText;
+
+    @Override
+    protected void setup(Context context) {
+      mCount = new IntWritable(0);
+      mText = new Text("");
+    }
+
+    @Override
+    protected void map(AvroKey<ReflectStats> record, NullWritable ignore, 
Context context)
+        throws IOException, InterruptedException {
+      mCount.set(record.datum().count);
+      mText.set(record.datum().name);
+      context.write(mText, mCount);
+    }
+  }
+
   private static class GenericStatsReducer
       extends Reducer<Text, IntWritable, AvroKey<GenericData.Record>, 
NullWritable> {
     private AvroKey<GenericData.Record> mStats;
@@ -139,6 +173,29 @@ public class TestWordCount {
     }
   }
 
+  private static class ReflectStatsReducer
+      extends Reducer<Text, IntWritable, AvroKey<ReflectStats>, NullWritable> {
+    private AvroKey<ReflectStats> mStats;
+
+    @Override
+    protected void setup(Context context) {
+      mStats = new AvroKey<ReflectStats>(null);
+    }
+
+    @Override
+    protected void reduce(Text line, Iterable<IntWritable> counts, Context 
context)
+        throws IOException, InterruptedException {
+      ReflectStats record = new ReflectStats();
+      record.count = 0;
+      for (IntWritable count : counts) {
+        record.count += count.get();
+      }
+      record.name = line.toString();
+      mStats.datum(record);
+      context.write(mStats, NullWritable.get());
+    }
+  }
+
   private static class SortMapper
       extends Mapper<AvroKey<TextStats>, NullWritable, AvroKey<TextStats>, 
NullWritable> {
     @Override
@@ -238,6 +295,46 @@ public class TestWordCount {
   }
 
   @Test
+  public void testAvroReflectOutput() throws Exception {
+    Job job = new Job();
+
+    FileInputFormat.setInputPaths(job, new Path(getClass()
+            .getResource("/org/apache/avro/mapreduce/mapreduce-test-input.txt")
+            .toURI().toString()));
+    job.setInputFormatClass(TextInputFormat.class);
+
+    job.setMapperClass(LineCountMapper.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(IntWritable.class);
+
+    job.setReducerClass(ReflectStatsReducer.class);
+    AvroJob.setOutputKeySchema(job, REFLECT_STATS_SCHEMA);
+
+    job.setOutputFormatClass(AvroKeyOutputFormat.class);
+    Path outputPath = new Path(tmpFolder.getRoot().getPath() + "/out-reflect");
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    Assert.assertTrue(job.waitForCompletion(true));
+
+    // Check that the results from the MapReduce were as expected.
+    FileSystem fileSystem = FileSystem.get(job.getConfiguration());
+    FileStatus[] outputFiles = 
fileSystem.globStatus(outputPath.suffix("/part-*"));
+    Assert.assertEquals(1, outputFiles.length);
+    DataFileReader<ReflectStats> reader = new DataFileReader<ReflectStats>(
+        new FsInput(outputFiles[0].getPath(), job.getConfiguration()),
+        new ReflectDatumReader<ReflectStats>());
+    Map<String, Integer> counts = new HashMap<String, Integer>();
+    for (ReflectStats record : reader) {
+      counts.put(record.name.toString(), record.count);
+    }
+    reader.close();
+
+    Assert.assertEquals(3, counts.get("apple").intValue());
+    Assert.assertEquals(2, counts.get("banana").intValue());
+    Assert.assertEquals(1, counts.get("carrot").intValue());
+  }
+
+  @Test
   public void testAvroInput() throws Exception {
     Job job = new Job();
 
@@ -279,6 +376,46 @@ public class TestWordCount {
   }
 
   @Test
+  public void testReflectInput() throws Exception {
+    Job job = new Job();
+    FileInputFormat.setInputPaths(job, new Path(getClass()
+            
.getResource("/org/apache/avro/mapreduce/mapreduce-test-input.avro")
+            .toURI().toString()));
+    job.setInputFormatClass(AvroKeyInputFormat.class);
+    AvroJob.setInputKeySchema(job, REFLECT_STATS_SCHEMA);
+
+    job.setMapperClass(ReflectCountMapper.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(IntWritable.class);
+
+    job.setReducerClass(ReflectStatsReducer.class);
+    AvroJob.setOutputKeySchema(job, REFLECT_STATS_SCHEMA);
+
+    job.setOutputFormatClass(AvroKeyOutputFormat.class);
+    Path outputPath = new Path(tmpFolder.getRoot().getPath() + 
"/out-reflect-input");
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    Assert.assertTrue(job.waitForCompletion(true));
+
+    // Check that the results from the MapReduce were as expected.
+    FileSystem fileSystem = FileSystem.get(job.getConfiguration());
+    FileStatus[] outputFiles = 
fileSystem.globStatus(outputPath.suffix("/part-*"));
+    Assert.assertEquals(1, outputFiles.length);
+    DataFileReader<ReflectStats> reader = new DataFileReader<ReflectStats>(
+        new FsInput(outputFiles[0].getPath(), job.getConfiguration()),
+        new ReflectDatumReader<ReflectStats>());
+    Map<String, Integer> counts = new HashMap<String, Integer>();
+    for (ReflectStats record : reader) {
+      counts.put(record.name.toString(), record.count);
+    }
+    reader.close();
+
+    Assert.assertEquals(3, counts.get("apple").intValue());
+    Assert.assertEquals(2, counts.get("banana").intValue());
+    Assert.assertEquals(1, counts.get("carrot").intValue());
+  }
+
+  @Test
   public void testAvroMapOutput() throws Exception {
     Job job = new Job();
 


Reply via email to