Updated Branches: refs/heads/master fb172fd84 -> b9e9672d9
CRUNCH-300: Allow MemPipeline to write Avro files by reflection and add more tests for writes done from MemPipeline. Signed-off-by: Josh Wills <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/b9e9672d Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/b9e9672d Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/b9e9672d Branch: refs/heads/master Commit: b9e9672d9ced2a48db95028b48407b5b2d2a830b Parents: fb172fd Author: David Whiting <[email protected]> Authored: Wed Nov 20 15:53:21 2013 +0100 Committer: Josh Wills <[email protected]> Committed: Thu Nov 21 16:39:32 2013 -0800 ---------------------------------------------------------------------- .../java/org/apache/crunch/TermFrequencyIT.java | 4 +- .../mem/MemPipelineFileReadingWritingIT.java | 152 +++++++++++++++---- .../crunch/io/avro/AvroMemPipelineIT.java | 58 +++---- .../org/apache/crunch/impl/mem/MemPipeline.java | 59 ++++--- 4 files changed, 183 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/b9e9672d/crunch-core/src/it/java/org/apache/crunch/TermFrequencyIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/TermFrequencyIT.java b/crunch-core/src/it/java/org/apache/crunch/TermFrequencyIT.java index 2abdb8c..7ede881 100644 --- a/crunch-core/src/it/java/org/apache/crunch/TermFrequencyIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/TermFrequencyIT.java @@ -74,7 +74,7 @@ public class TermFrequencyIT implements Serializable { /* * Input: String Input title text - * + * * Output: PTable<Pair<String, String>, Long> Pair<Pair<word, title>, count * in title> */ @@ -98,7 +98,7 @@ public class TermFrequencyIT implements Serializable { /* * Input: Pair<Pair<String, String>, Long> Pair<Pair<word, title>, count * in title> - * + * * Output: PTable<String, Pair<String, Long>> PTable<word, Pair<title, * count in title>> */ http://git-wip-us.apache.org/repos/asf/crunch/blob/b9e9672d/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileReadingWritingIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileReadingWritingIT.java b/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileReadingWritingIT.java index 2d66672..bb75681 100644 --- a/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileReadingWritingIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileReadingWritingIT.java @@ -18,23 +18,32 @@ package org.apache.crunch.impl.mem; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; import java.io.File; +import java.io.FilenameFilter; import java.io.IOException; +import java.util.Arrays; import java.util.Collection; +import java.util.Iterator; import java.util.List; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.commons.io.filefilter.WildcardFileFilter; import org.apache.crunch.PCollection; import org.apache.crunch.Pair; import org.apache.crunch.Pipeline; import org.apache.crunch.Target; -import org.apache.crunch.impl.mem.collect.MemCollection; import org.apache.crunch.impl.mem.collect.MemTable; import org.apache.crunch.io.From; import org.apache.crunch.io.To; +import org.apache.crunch.io.avro.AvroFileReaderFactory; +import org.apache.crunch.test.Person; import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.avro.AvroType; +import org.apache.crunch.types.avro.Avros; import org.apache.crunch.types.writable.Writables; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -50,53 +59,57 @@ import org.junit.Test; import com.google.common.base.Charsets; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.io.Files; public class MemPipelineFileReadingWritingIT { @Rule public TemporaryPath baseTmpDir = TemporaryPaths.create(); - + private File inputFile; - private File outputFile; - - + private File outputDir; + + private static final Collection<String> EXPECTED_COLLECTION = Lists.newArrayList("hello", "world"); @SuppressWarnings("unchecked") private static final Collection<Pair<Integer, String>> EXPECTED_TABLE = Lists.newArrayList( - Pair.of(1, "hello"), + Pair.of(1, "hello"), Pair.of(2, "world")); - + @Before public void setUp() throws IOException { - inputFile = baseTmpDir.getFile("test-read.seq"); - outputFile = baseTmpDir.getFile("test-write.seq"); + inputFile = baseTmpDir.getFile("test-read"); + outputDir = baseTmpDir.getFile("test-write"); + } + + private File getOutputFile(File outputDir, String wildcardFilter) { + + File[] files = outputDir.listFiles((FilenameFilter)new WildcardFileFilter(wildcardFilter)); + System.out.println(Arrays.asList(files)); + assertEquals(1, files.length); + return files[0]; } @Test public void testMemPipelineFileWriter() throws Exception { - File tmpDir = baseTmpDir.getFile("mempipe"); + File outputDir = baseTmpDir.getFile("mempipe"); Pipeline p = MemPipeline.getInstance(); PCollection<String> lines = MemPipeline.collectionOf("hello", "world"); - p.writeTextFile(lines, tmpDir.toString()); + p.writeTextFile(lines, outputDir.toString()); p.done(); - assertTrue(tmpDir.exists()); - File[] files = tmpDir.listFiles(); - assertTrue(files != null && files.length > 0); - for (File f : files) { - if (!f.getName().startsWith(".")) { - List<String> txt = Files.readLines(f, Charsets.UTF_8); - assertEquals(ImmutableList.of("hello", "world"), txt); - } - } + File outputFile = getOutputFile(outputDir, "*.txt"); + + List<String> txt = Files.readLines(outputFile, Charsets.UTF_8); + assertEquals(ImmutableList.of("hello", "world"), txt); } private void createTestSequenceFile(final File seqFile) throws IOException { SequenceFile.Writer writer = null; writer = new Writer(FileSystem.getLocal(baseTmpDir.getDefaultConfiguration()), - baseTmpDir.getDefaultConfiguration(), - new Path(seqFile.toString()), + baseTmpDir.getDefaultConfiguration(), + new Path(seqFile.toString()), IntWritable.class, Text.class); writer.append(new IntWritable(1), new Text("hello")); writer.append(new IntWritable(2), new Text("world")); @@ -110,9 +123,9 @@ public class MemPipelineFileReadingWritingIT { // read from sequence file final PCollection<Pair<Integer, String>> readCollection = MemPipeline.getInstance().read( - From.sequenceFile(inputFile.toString(), + From.sequenceFile(inputFile.toString(), Writables.tableOf( - Writables.ints(), + Writables.ints(), Writables.strings()))); // assert read same as written. @@ -123,12 +136,13 @@ public class MemPipelineFileReadingWritingIT { public void testMemPipelineWriteSequenceFile_PCollection() throws IOException { // write PCollection<String> collection = MemPipeline.typedCollectionOf(Writables.strings(), EXPECTED_COLLECTION); - final Target target = To.sequenceFile(outputFile.toString()); + final Target target = To.sequenceFile(outputDir.toString()); MemPipeline.getInstance().write(collection, target); // read final SequenceFile.Reader reader = new Reader(FileSystem.getLocal( - baseTmpDir.getDefaultConfiguration()), new Path(outputFile.toString()), + baseTmpDir.getDefaultConfiguration()), + new Path(getOutputFile(outputDir, "*.seq").toString()), baseTmpDir.getDefaultConfiguration()); final List<String> actual = Lists.newArrayList(); final NullWritable key = NullWritable.get(); @@ -147,14 +161,14 @@ public class MemPipelineFileReadingWritingIT { // write final MemTable<Integer, String> collection = new MemTable<Integer, String>(EXPECTED_TABLE, // Writables.tableOf( - Writables.ints(), + Writables.ints(), Writables.strings()), "test input"); - final Target target = To.sequenceFile(outputFile.toString()); + final Target target = To.sequenceFile(outputDir.toString()); MemPipeline.getInstance().write(collection, target); // read final SequenceFile.Reader reader = new Reader(FileSystem.getLocal(baseTmpDir - .getDefaultConfiguration()), new Path(outputFile.toString()), + .getDefaultConfiguration()), new Path(getOutputFile(outputDir, "*.seq").toString()), baseTmpDir.getDefaultConfiguration()); final List<Pair<Integer, String>> actual = Lists.newArrayList(); final IntWritable key = new IntWritable(); @@ -167,4 +181,82 @@ public class MemPipelineFileReadingWritingIT { // assert read same as written assertEquals(EXPECTED_TABLE, actual); } + + @Test + public void testMemPipelineWriteAvroFile_SpecificRecords() throws IOException { + AvroType<Person> ptype = Avros.specifics(Person.class); + PCollection<Person> collection = MemPipeline.typedCollectionOf( + ptype, + Person.newBuilder() + .setName("A") + .setAge(1) + .setSiblingnames(ImmutableList.<CharSequence>of()) + .build(), + Person.newBuilder() + .setName("B") + .setAge(2) + .setSiblingnames(ImmutableList.<CharSequence>of()) + .build()); + + MemPipeline.getInstance().write(collection, To.avroFile(outputDir.getPath())); + + Iterator<Person> itr = new AvroFileReaderFactory<Person>(ptype).read( + FileSystem.getLocal(baseTmpDir.getDefaultConfiguration()), + new Path(getOutputFile(outputDir, "*.avro").getPath())); + + assertEquals(2, Iterators.size(itr)); + + } + + @Test + public void testMemPipelineWriteAvroFile_ReflectRecords() throws IOException { + AvroType<SimpleBean> ptype = Avros.reflects(SimpleBean.class); + PCollection<SimpleBean> collection = MemPipeline.typedCollectionOf( + ptype, + new SimpleBean(1), + new SimpleBean(2)); + + MemPipeline.getInstance().write(collection, To.avroFile(outputDir.getPath())); + + Iterator<SimpleBean> itr = new AvroFileReaderFactory<SimpleBean>(ptype).read( + FileSystem.getLocal(baseTmpDir.getDefaultConfiguration()), + new Path(getOutputFile(outputDir, "*.avro").getPath())); + + assertEquals(2, Iterators.size(itr)); + + } + + @Test + public void testMemPipelineWriteAvroFile_GenericRecords() throws IOException { + AvroType<GenericData.Record> ptype = Avros.generics(Person.SCHEMA$); + GenericData.Record record = new GenericRecordBuilder(ptype.getSchema()) + .set("name", "A") + .set("age", 1) + .set("siblingnames", ImmutableList.of()) + .build(); + PCollection<GenericData.Record> collection = MemPipeline.typedCollectionOf( + ptype, record); + + MemPipeline.getInstance().write(collection, To.avroFile(outputDir.getPath())); + + Iterator<GenericData.Record> itr = new AvroFileReaderFactory<GenericData.Record>(ptype).read( + FileSystem.getLocal(baseTmpDir.getDefaultConfiguration()), + new Path(getOutputFile(outputDir, "*.avro").getPath())); + + assertEquals(record, itr.next()); + assertFalse(itr.hasNext()); + + } + + static class SimpleBean { + public int value; + + public SimpleBean() { + this(0); + } + + public SimpleBean(int value) { + this.value = value; + } + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/b9e9672d/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java index cfb669e..40224e7 100644 --- a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java @@ -5,9 +5,9 @@ * licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the @@ -18,14 +18,13 @@ package org.apache.crunch.io.avro; import static org.junit.Assert.assertEquals; -import com.google.common.collect.Sets; import java.io.File; import java.io.IOException; import java.io.Serializable; import java.util.Collections; import java.util.List; - import java.util.Set; + import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData.Record; import org.apache.avro.generic.GenericRecord; @@ -36,6 +35,7 @@ import org.apache.crunch.io.To; import org.apache.crunch.test.Person; import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.PType; import org.apache.crunch.types.avro.Avros; import org.apache.hadoop.fs.Path; import org.junit.Before; @@ -43,6 +43,7 @@ import org.junit.Rule; import org.junit.Test; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; public class AvroMemPipelineIT implements Serializable { @@ -56,11 +57,13 @@ public class AvroMemPipelineIT implements Serializable { } @Test - public void testMemPipelienWithSpecificRecord() { + public void testMemPipelineWithSpecificRecord() { Person writeRecord = createSpecificRecord(); - final PCollection<Person> writeCollection = MemPipeline.collectionOf(Collections.singleton(writeRecord)); + final PCollection<Person> writeCollection = MemPipeline.typedCollectionOf( + Avros.specifics(Person.class), + writeRecord); writeCollection.write(To.avroFile(avroFile.getAbsolutePath())); @@ -78,11 +81,15 @@ public class AvroMemPipelineIT implements Serializable { } @Test - public void testMemPipelienWithGenericRecord() { + public void testMemPipelineWithGenericRecord() { + + PType<GenericData.Record> ptype = Avros.generics(Person.SCHEMA$); - GenericRecord writeRecord = createGenericRecord(); + GenericData.Record writeRecord = createGenericRecord("John Doe"); - final PCollection<GenericRecord> writeCollection = MemPipeline.collectionOf(Collections.singleton(writeRecord)); + final PCollection<GenericData.Record> writeCollection = MemPipeline.typedCollectionOf( + ptype, + writeRecord); writeCollection.write(To.avroFile(avroFile.getAbsolutePath())); @@ -94,22 +101,14 @@ public class AvroMemPipelineIT implements Serializable { assertEquals(writeRecord, readRecord); } - private GenericRecord createGenericRecord() { - - GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$); - savedRecord.put("name", "John Doe"); - savedRecord.put("age", 42); - savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); - - return savedRecord; - } - @Test - public void testMemPipelienWithReflectionRecord() { + public void testMemPipelineWithReflectionRecord() { String writeRecord = "John Doe"; - final PCollection<String> writeCollection = MemPipeline.collectionOf(Collections.singleton(writeRecord)); + final PCollection<String> writeCollection = MemPipeline.typedCollectionOf( + Avros.strings(), + writeRecord); writeCollection.write(To.avroFile(avroFile.getAbsolutePath())); @@ -124,13 +123,18 @@ public class AvroMemPipelineIT implements Serializable { @Test public void testMemPipelineWithMultiplePaths() { - GenericRecord writeRecord1 = createGenericRecord("John Doe"); - final PCollection<GenericRecord> writeCollection1 = MemPipeline.collectionOf(Collections.singleton(writeRecord1)); + PType<GenericData.Record> ptype = Avros.generics(Person.SCHEMA$); + GenericData.Record writeRecord1 = createGenericRecord("John Doe"); + final PCollection<GenericData.Record> writeCollection1 = MemPipeline.typedCollectionOf( + ptype, + writeRecord1); writeCollection1.write(To.avroFile(avroFile.getAbsolutePath())); File avroFile2 = tmpDir.getFile("test2.avro"); - GenericRecord writeRecord2 = createGenericRecord("Jane Doe"); - final PCollection<GenericRecord> writeCollection2 = MemPipeline.collectionOf(Collections.singleton(writeRecord2)); + GenericData.Record writeRecord2 = createGenericRecord("Jane Doe"); + final PCollection<GenericData.Record> writeCollection2 = MemPipeline.typedCollectionOf( + ptype, + writeRecord2); writeCollection2.write(To.avroFile(avroFile2.getAbsolutePath())); List<Path> paths = Lists.newArrayList(new Path(avroFile.getAbsolutePath()), @@ -143,9 +147,9 @@ public class AvroMemPipelineIT implements Serializable { assertEquals(Sets.newHashSet(writeRecord1, writeRecord2), readSet); } - private GenericRecord createGenericRecord(String name) { + private GenericData.Record createGenericRecord(String name) { - GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$); + GenericData.Record savedRecord = new GenericData.Record(Person.SCHEMA$); savedRecord.put("name", name); savedRecord.put("age", 42); savedRecord.put("siblingnames", Lists.newArrayList("Jimmy")); http://git-wip-us.apache.org/repos/asf/crunch/blob/b9e9672d/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java index 362763b..5e6dfa0 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java @@ -24,8 +24,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import com.google.common.util.concurrent.AbstractFuture; -import org.apache.avro.Schema; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericContainer; import org.apache.avro.io.DatumWriter; @@ -51,6 +49,7 @@ import org.apache.crunch.io.seq.SeqFileTarget; import org.apache.crunch.types.Converter; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; +import org.apache.crunch.types.avro.AvroType; import org.apache.crunch.types.avro.Avros; import org.apache.crunch.types.avro.ReflectDataFactory; import org.apache.hadoop.conf.Configuration; @@ -64,6 +63,7 @@ import org.apache.hadoop.mapreduce.Counters; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.AbstractFuture; public class MemPipeline implements Pipeline { @@ -72,11 +72,11 @@ public class MemPipeline implements Pipeline { private static final MemPipeline INSTANCE = new MemPipeline(); private int outputIndex = 0; - + public static Counters getCounters() { return COUNTERS; } - + public static void clearCounters() { COUNTERS = new CountersWrapper(); } @@ -129,7 +129,7 @@ public class MemPipeline implements Pipeline { private Configuration conf = new Configuration(); private Set<Target> activeTargets = Sets.newHashSet(); - + private MemPipeline() { } @@ -177,7 +177,7 @@ public class MemPipeline implements Pipeline { public void write(PCollection<?> collection, Target target) { write(collection, target, Target.WriteMode.DEFAULT); } - + @Override public void write(PCollection<?> collection, Target target, Target.WriteMode writeMode) { target.handleExisting(writeMode, -1, getConfiguration()); @@ -193,18 +193,22 @@ public class MemPipeline implements Pipeline { FileSystem fs = path.getFileSystem(conf); outputIndex++; if (target instanceof SeqFileTarget) { + Path outputPath = new Path(path, "out" + outputIndex + ".seq"); if (collection instanceof PTable) { - writeSequenceFileFromPTable(fs, path, (PTable<?, ?>) collection); + writeSequenceFileFromPTable(fs, outputPath, (PTable<?, ?>) collection); } else { - writeSequenceFileFromPCollection(fs, path, collection); + writeSequenceFileFromPCollection(fs, outputPath, collection); } } else { - FSDataOutputStream os = fs.create(new Path(path, "out" + outputIndex)); if (target instanceof AvroFileTarget && !(collection instanceof PTable)) { - - writeAvroFile(os, collection.materialize()); + Path outputPath = new Path(path, "out" + outputIndex + ".avro"); + FSDataOutputStream os = fs.create(outputPath); + writeAvroFile(os, collection); + os.close(); } else { LOG.warn("Defaulting to write to a text file from MemPipeline"); + Path outputPath = new Path(path, "out" + outputIndex + ".txt"); + FSDataOutputStream os = fs.create(outputPath); if (collection instanceof PTable) { for (Object o : collection.materialize()) { Pair p = (Pair) o; @@ -218,8 +222,8 @@ public class MemPipeline implements Pipeline { os.writeBytes(o.toString() + "\r\n"); } } + os.close(); } - os.close(); } } catch (IOException e) { LOG.error("Exception writing target: " + target, e); @@ -230,31 +234,24 @@ public class MemPipeline implements Pipeline { } @SuppressWarnings({ "rawtypes", "unchecked" }) - private void writeAvroFile(FSDataOutputStream outputStream, Iterable genericRecords) throws IOException { - - Object r = genericRecords.iterator().next(); - - Schema schema = null; - - if (r instanceof GenericContainer) { - schema = ((GenericContainer) r).getSchema(); - } else { - schema = new ReflectDataFactory().getReflectData().getSchema(r.getClass()); - } - - DatumWriter datumWriter = Avros.newWriter(schema); + private void writeAvroFile(FSDataOutputStream outputStream, PCollection recordCollection) throws IOException { + AvroType avroType = (AvroType)recordCollection.getPType(); + if (avroType == null) { + throw new IllegalStateException("Can't write a non-typed Avro collection"); + } + DatumWriter datumWriter = Avros.newWriter((AvroType)recordCollection.getPType()); DataFileWriter dataFileWriter = new DataFileWriter(datumWriter); - dataFileWriter.create(schema, outputStream); + dataFileWriter.create(avroType.getSchema(), outputStream); - for (Object record : genericRecords) { + for (Object record : recordCollection.materialize()) { dataFileWriter.append(record); } dataFileWriter.close(); outputStream.close(); } - + @SuppressWarnings({ "rawtypes", "unchecked" }) private void writeSequenceFileFromPTable(final FileSystem fs, final Path path, final PTable table) throws IOException { @@ -274,7 +271,7 @@ public class MemPipeline implements Pipeline { writer.close(); } - + private void writeSequenceFileFromPCollection(final FileSystem fs, final Path path, final PCollection collection) throws IOException { final PType pType = collection.getPType(); @@ -291,7 +288,7 @@ public class MemPipeline implements Pipeline { writer.close(); } - + @Override public PCollection<String> readTextFile(String pathName) { return read(At.textFile(pathName)); @@ -312,7 +309,7 @@ public class MemPipeline implements Pipeline { activeTargets.clear(); return new MemExecution(); } - + @Override public PipelineResult run() { try {
