Updated Branches: refs/heads/master b9e9672d9 -> 2a8b6c149
CRUNCH-302: Initialize PType input/output functions when writing data in the MemPipeline. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/2a8b6c14 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/2a8b6c14 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/2a8b6c14 Branch: refs/heads/master Commit: 2a8b6c1498f0a0eff3f595652f1745098beca8d5 Parents: b9e9672 Author: Josh Wills <[email protected]> Authored: Thu Nov 21 17:40:36 2013 -0800 Committer: Josh Wills <[email protected]> Committed: Thu Nov 21 17:40:36 2013 -0800 ---------------------------------------------------------------------- .../impl/mem/MemPipelineFileReadingWritingIT.java | 15 +++++++++++++++ .../java/org/apache/crunch/impl/mem/MemPipeline.java | 7 ++++--- 2 files changed, 19 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/2a8b6c14/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileReadingWritingIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileReadingWritingIT.java b/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileReadingWritingIT.java index bb75681..5cefc2d 100644 --- a/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileReadingWritingIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileReadingWritingIT.java @@ -27,7 +27,10 @@ import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.Set; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.commons.io.filefilter.WildcardFileFilter; @@ -248,6 +251,18 @@ public class MemPipelineFileReadingWritingIT { } + @Test + public void testMemPipelineWriteAvroFile_Tuples() throws IOException { + AvroType<Pair<String, Long>> at = Avros.pairs(Avros.strings(), Avros.longs()); + Set<Pair<String, Long>> data = ImmutableSet.of(Pair.of("a", 1L), Pair.of("b", 2L), Pair.of("c", 3L)); + PCollection < Pair < String, Long >> pc = MemPipeline.typedCollectionOf(at, data); + pc.write(To.avroFile(outputDir.getPath())); + + Iterable<Pair<String, Long>> it = MemPipeline.getInstance().read( + at.getDefaultFileSource(new Path(outputDir.getPath()))).materialize(); + assertEquals(data, Sets.newHashSet(it)); + } + static class SimpleBean { public int value; http://git-wip-us.apache.org/repos/asf/crunch/blob/2a8b6c14/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java index 5e6dfa0..ce411ca 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java @@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.avro.file.DataFileWriter; -import org.apache.avro.generic.GenericContainer; import org.apache.avro.io.DatumWriter; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -51,7 +50,6 @@ import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; import org.apache.crunch.types.avro.AvroType; import org.apache.crunch.types.avro.Avros; -import org.apache.crunch.types.avro.ReflectDataFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -188,6 +186,9 @@ public class MemPipeline implements Pipeline { } activeTargets.add(target); if (target instanceof PathTarget) { + if (collection.getPType() != null) { + collection.getPType().initialize(getConfiguration()); + } Path path = ((PathTarget) target).getPath(); try { FileSystem fs = path.getFileSystem(conf); @@ -245,7 +246,7 @@ public class MemPipeline implements Pipeline { dataFileWriter.create(avroType.getSchema(), outputStream); for (Object record : recordCollection.materialize()) { - dataFileWriter.append(record); + dataFileWriter.append(avroType.getOutputMapFn().map(record)); } dataFileWriter.close();
