Updated Branches: refs/heads/master d864f2fd4 -> 70da18c54
Extend MemPipline.write to support Avro types (CRUNCH-204) Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/2276ee05 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/2276ee05 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/2276ee05 Branch: refs/heads/master Commit: 2276ee050d2a22ef74c75d462fa7ddf22a38a67b Parents: d864f2f Author: tzolov <[email protected]> Authored: Thu May 9 20:35:42 2013 +0200 Committer: tzolov <[email protected]> Committed: Thu May 9 20:35:42 2013 +0200 ---------------------------------------------------------------------- .../crunch/impl/mr/collect/UnionCollectionIT.java | 2 +- .../org/apache/crunch/impl/mem/MemPipeline.java | 36 ++++++++++++++- 2 files changed, 36 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/2276ee05/crunch-core/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java b/crunch-core/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java index f9f73b2..2832437 100644 --- a/crunch-core/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java @@ -142,7 +142,7 @@ public class UnionCollectionIT { private void checkFileContents(String filePath) throws IOException { - List<String> fileContentValues = (typeFamily != AvroTypeFamily.getInstance() || !(pipeline instanceof MRPipeline)) ? Lists + List<String> fileContentValues = (typeFamily != AvroTypeFamily.getInstance())? Lists .newArrayList(pipeline.read(At.textFile(filePath, typeFamily.strings())).materialize().iterator()) : Lists .newArrayList(pipeline.read(At.avroFile(filePath, Avros.strings())).materialize().iterator()); http://git-wip-us.apache.org/repos/asf/crunch/blob/2276ee05/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java index 272b2af..80b0543 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java @@ -22,6 +22,10 @@ import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericContainer; +import org.apache.avro.generic.GenericDatumWriter; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.crunch.CrunchRuntimeException; @@ -40,8 +44,10 @@ import org.apache.crunch.impl.mem.collect.MemTable; import org.apache.crunch.io.At; import org.apache.crunch.io.PathTarget; import org.apache.crunch.io.ReadableSource; +import org.apache.crunch.io.avro.AvroFileTarget; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; +import org.apache.crunch.types.avro.ReflectDataFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -180,7 +186,9 @@ public class MemPipeline implements Pipeline { FileSystem fs = path.getFileSystem(conf); FSDataOutputStream os = fs.create(new Path(path, "out" + outputIndex)); outputIndex++; - if (collection instanceof PTable) { + if (target instanceof AvroFileTarget) { + writeAvroFile(os, collection.materialize()); + } else if (collection instanceof PTable) { for (Object o : collection.materialize()) { Pair p = (Pair) o; os.writeBytes(p.first().toString()); @@ -202,6 +210,32 @@ public class MemPipeline implements Pipeline { } } + @SuppressWarnings({ "rawtypes", "unchecked" }) + private void writeAvroFile(FSDataOutputStream outputStream, Iterable genericRecords) throws IOException { + + Object r = genericRecords.iterator().next(); + + Schema schema = null; + + if (r instanceof GenericContainer) { + schema = ((GenericContainer) r).getSchema(); + } else { + schema = new ReflectDataFactory().getReflectData().getSchema(r.getClass()); + } + + GenericDatumWriter genericDatumWriter = new GenericDatumWriter(schema); + + DataFileWriter dataFileWriter = new DataFileWriter(genericDatumWriter); + dataFileWriter.create(schema, outputStream); + + for (Object record : genericRecords) { + dataFileWriter.append(record); + } + + dataFileWriter.close(); + outputStream.close(); + } + @Override public PCollection<String> readTextFile(String pathName) { return read(At.textFile(pathName));
