Updated Branches: refs/heads/master 3eb5f0a8a -> ebacb54c6
CRUNCH-243: Support easily extensibility for custom reading of Avro Datum Signed-off-by: Micah Whitacre <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/ebacb54c Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/ebacb54c Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/ebacb54c Branch: refs/heads/master Commit: ebacb54c65be596392fbeb890856b64d6f2949b9 Parents: 3eb5f0a Author: Micah Whitacre <[email protected]> Authored: Tue Jul 23 11:55:55 2013 -0500 Committer: Micah Whitacre <[email protected]> Committed: Tue Jul 23 15:33:50 2013 -0500 ---------------------------------------------------------------------- .../crunch/io/avro/AvroFileReaderFactory.java | 6 +++++- .../apache/crunch/io/avro/AvroFileSource.java | 19 ++++++++++++++++++- .../crunch/io/avro/AvroFileSourceTarget.java | 9 +++++++++ 3 files changed, 32 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/ebacb54c/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java index c8fe23a..becde73 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java @@ -49,7 +49,11 @@ public class AvroFileReaderFactory<T> implements FileReaderFactory<T> { private final MapFn<T, T> mapFn; public AvroFileReaderFactory(AvroType<T> atype) { - this.recordReader = createDatumReader(atype); + this(createDatumReader(atype), atype); + } + + public AvroFileReaderFactory(DatumReader<T> reader, AvroType<T> atype) { + this.recordReader = reader != null ? reader : createDatumReader(atype); this.mapFn = (MapFn<T, T>) atype.getInputMapFn(); } http://git-wip-us.apache.org/repos/asf/crunch/blob/ebacb54c/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java index 3e1e933..8415d12 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java @@ -20,6 +20,7 @@ package org.apache.crunch.io.avro; import java.io.IOException; import java.util.List; +import org.apache.avro.io.DatumReader; import org.apache.avro.mapred.AvroJob; import org.apache.crunch.io.FormatBundle; import org.apache.crunch.io.ReadableSource; @@ -39,14 +40,26 @@ public class AvroFileSource<T> extends FileSourceImpl<T> implements ReadableSour .set(Avros.REFLECT_DATA_FACTORY_CLASS, Avros.REFLECT_DATA_FACTORY.getClass().getName()); return bundle; } + + private DatumReader<T> reader; public AvroFileSource(Path path, AvroType<T> ptype) { super(path, ptype, getBundle(ptype)); } + public AvroFileSource(Path path, AvroType<T> ptype, DatumReader<T> reader) { + super(path, ptype, getBundle(ptype)); + this.reader = reader; + } + public AvroFileSource(List<Path> paths, AvroType<T> ptype) { super(paths, ptype, getBundle(ptype)); } + + public AvroFileSource(List<Path> paths, AvroType<T> ptype, DatumReader<T> reader) { + super(paths, ptype, getBundle(ptype)); + this.reader = reader; + } @Override public String toString() { @@ -55,6 +68,10 @@ public class AvroFileSource<T> extends FileSourceImpl<T> implements ReadableSour @Override public Iterable<T> read(Configuration conf) throws IOException { - return read(conf, new AvroFileReaderFactory<T>((AvroType<T>) ptype)); + return read(conf, getFileReaderFactory((AvroType<T>) ptype)); + } + + protected AvroFileReaderFactory<T> getFileReaderFactory(AvroType<T> ptype){ + return new AvroFileReaderFactory(reader, ptype); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/ebacb54c/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java index 76103e5..9aa650a 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java @@ -17,6 +17,7 @@ */ package org.apache.crunch.io.avro; +import org.apache.avro.io.DatumReader; import org.apache.crunch.io.FileNamingScheme; import org.apache.crunch.io.SequentialFileNamingScheme; import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl; @@ -28,10 +29,18 @@ public class AvroFileSourceTarget<T> extends ReadableSourcePathTargetImpl<T> { this(path, atype, new SequentialFileNamingScheme()); } + public AvroFileSourceTarget(Path path, AvroType<T> atype, DatumReader<T> reader) { + this(path, atype, reader, new SequentialFileNamingScheme()); + } + public AvroFileSourceTarget(Path path, AvroType<T> atype, FileNamingScheme fileNamingScheme) { super(new AvroFileSource<T>(path, atype), new AvroFileTarget(path), fileNamingScheme); } + public AvroFileSourceTarget(Path path, AvroType<T> atype, DatumReader<T> reader, FileNamingScheme fileNamingScheme) { + super(new AvroFileSource<T>(path, atype, reader), new AvroFileTarget(path), fileNamingScheme); + } + @Override public String toString() { return target.toString();
