Updated Branches:
  refs/heads/master 3eb5f0a8a -> ebacb54c6

CRUNCH-243: Support easily extensibility for custom reading of Avro Datum

Signed-off-by: Micah Whitacre <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/ebacb54c
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/ebacb54c
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/ebacb54c

Branch: refs/heads/master
Commit: ebacb54c65be596392fbeb890856b64d6f2949b9
Parents: 3eb5f0a
Author: Micah Whitacre <[email protected]>
Authored: Tue Jul 23 11:55:55 2013 -0500
Committer: Micah Whitacre <[email protected]>
Committed: Tue Jul 23 15:33:50 2013 -0500

----------------------------------------------------------------------
 .../crunch/io/avro/AvroFileReaderFactory.java    |  6 +++++-
 .../apache/crunch/io/avro/AvroFileSource.java    | 19 ++++++++++++++++++-
 .../crunch/io/avro/AvroFileSourceTarget.java     |  9 +++++++++
 3 files changed, 32 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/ebacb54c/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
----------------------------------------------------------------------
diff --git 
a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
 
b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
index c8fe23a..becde73 100644
--- 
a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
+++ 
b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
@@ -49,7 +49,11 @@ public class AvroFileReaderFactory<T> implements 
FileReaderFactory<T> {
   private final MapFn<T, T> mapFn;
 
   public AvroFileReaderFactory(AvroType<T> atype) {
-    this.recordReader = createDatumReader(atype);
+    this(createDatumReader(atype), atype);
+  }
+
+  public AvroFileReaderFactory(DatumReader<T> reader, AvroType<T> atype) {
+    this.recordReader = reader != null ? reader : createDatumReader(atype);
     this.mapFn = (MapFn<T, T>) atype.getInputMapFn();
   }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/ebacb54c/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
----------------------------------------------------------------------
diff --git 
a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java 
b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
index 3e1e933..8415d12 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
@@ -20,6 +20,7 @@ package org.apache.crunch.io.avro;
 import java.io.IOException;
 
 import java.util.List;
+import org.apache.avro.io.DatumReader;
 import org.apache.avro.mapred.AvroJob;
 import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.io.ReadableSource;
@@ -39,14 +40,26 @@ public class AvroFileSource<T> extends FileSourceImpl<T> 
implements ReadableSour
         .set(Avros.REFLECT_DATA_FACTORY_CLASS, 
Avros.REFLECT_DATA_FACTORY.getClass().getName());
     return bundle;
   }
+
+  private DatumReader<T> reader;
   
   public AvroFileSource(Path path, AvroType<T> ptype) {
     super(path, ptype, getBundle(ptype));
   }
 
+  public AvroFileSource(Path path, AvroType<T> ptype, DatumReader<T> reader) {
+    super(path, ptype, getBundle(ptype));
+    this.reader = reader;
+  }
+
   public AvroFileSource(List<Path> paths, AvroType<T> ptype) {
     super(paths, ptype, getBundle(ptype));
   }
+  
+  public AvroFileSource(List<Path> paths, AvroType<T> ptype, DatumReader<T> 
reader) {
+    super(paths, ptype, getBundle(ptype));
+    this.reader = reader;
+  }  
 
   @Override
   public String toString() {
@@ -55,6 +68,10 @@ public class AvroFileSource<T> extends FileSourceImpl<T> 
implements ReadableSour
 
   @Override
   public Iterable<T> read(Configuration conf) throws IOException {
-    return read(conf, new AvroFileReaderFactory<T>((AvroType<T>) ptype));
+    return read(conf, getFileReaderFactory((AvroType<T>) ptype));
+  }
+
+  protected AvroFileReaderFactory<T> getFileReaderFactory(AvroType<T> ptype){
+    return new AvroFileReaderFactory(reader, ptype);
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/ebacb54c/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java
----------------------------------------------------------------------
diff --git 
a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java 
b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java
index 76103e5..9aa650a 100644
--- 
a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java
+++ 
b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java
@@ -17,6 +17,7 @@
  */
 package org.apache.crunch.io.avro;
 
+import org.apache.avro.io.DatumReader;
 import org.apache.crunch.io.FileNamingScheme;
 import org.apache.crunch.io.SequentialFileNamingScheme;
 import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl;
@@ -28,10 +29,18 @@ public class AvroFileSourceTarget<T> extends 
ReadableSourcePathTargetImpl<T> {
     this(path, atype, new SequentialFileNamingScheme());
   }
 
+  public AvroFileSourceTarget(Path path, AvroType<T> atype, DatumReader<T> 
reader) {
+    this(path, atype, reader, new SequentialFileNamingScheme());
+  }
+
   public AvroFileSourceTarget(Path path, AvroType<T> atype, FileNamingScheme 
fileNamingScheme) {
     super(new AvroFileSource<T>(path, atype), new AvroFileTarget(path), 
fileNamingScheme);
   }
 
+  public AvroFileSourceTarget(Path path, AvroType<T> atype, DatumReader<T> 
reader, FileNamingScheme fileNamingScheme) {
+    super(new AvroFileSource<T>(path, atype, reader), new 
AvroFileTarget(path), fileNamingScheme);
+  }  
+
   @Override
   public String toString() {
     return target.toString();

Reply via email to