Updated Branches: refs/heads/master 5df1ccd08 -> 32c33eb6c
CRUNCH-52: Let avro types be written to text files transparently. Contributed by Rahul Sharma. Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/32c33eb6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/32c33eb6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/32c33eb6 Branch: refs/heads/master Commit: 32c33eb6cbc102947373fe9ddac79066c3abed03 Parents: 5df1ccd Author: Josh Wills <[email protected]> Authored: Fri Sep 7 05:14:32 2012 -0700 Committer: Josh Wills <[email protected]> Committed: Fri Sep 7 05:14:32 2012 -0700 ---------------------------------------------------------------------- .../org/apache/crunch/io/avro/AvroPipelineIT.java | 107 +++++++++++++++ .../org/apache/crunch/io/avro/AvroFileTarget.java | 3 +- .../org/apache/crunch/io/impl/FileTargetImpl.java | 5 +- .../org/apache/crunch/io/text/TextFileTarget.java | 27 ++++- .../crunch/types/avro/AvroTextOutputFormat.java | 60 ++++++++ 5 files changed, 196 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/32c33eb6/crunch/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java b/crunch/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java new file mode 100644 index 0000000..b096a42 --- /dev/null +++ b/crunch/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.crunch.io.avro; + +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.Collection; +import java.util.List; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.io.FileUtils; +import org.apache.crunch.PCollection; +import org.apache.crunch.Pipeline; +import org.apache.crunch.Target; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.At; +import org.apache.crunch.io.To; +import org.apache.crunch.test.Person; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.avro.Avros; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class AvroPipelineIT implements Serializable { + + private transient File avroFile; + @Rule + public transient TemporaryPath tmpDir = TemporaryPaths.create(); + + @Before + public void setUp() throws IOException { + avroFile = File.createTempFile("test", ".avro"); + } + + @After + public void tearDown() { + avroFile.delete(); + } + + private void populateGenericFile(List<GenericRecord> genericRecords, Schema schema) throws IOException { + FileOutputStream outputStream = new FileOutputStream(this.avroFile); + GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>(schema); + + DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(genericDatumWriter); + dataFileWriter.create(schema, outputStream); + + for (GenericRecord record : genericRecords) { + dataFileWriter.append(record); + } + + dataFileWriter.close(); + outputStream.close(); + + } + + @Test + public void toTextShouldWriteAvroDataAsDatumText() throws Exception { + GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$); + savedRecord.put("name", "John Doe"); + savedRecord.put("age", 42); + savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); + populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$); + + Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class, tmpDir.getDefaultConfiguration()); + PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(), + Avros.records(Person.class))); + File file = tmpDir.getRootFile(); + Target textFile = To.textFile(file.getAbsolutePath()); + pipeline.write(genericCollection, textFile); + pipeline.run(); + Person person = genericCollection.materialize().iterator().next(); + Collection<File> listFiles = FileUtils.listFiles(file, null, false); + File outputFile = null; + for (File foundfile : listFiles) { + outputFile = foundfile; + } + String outputString = FileUtils.readFileToString(outputFile); + assertTrue(outputString.contains(person.toString())); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/32c33eb6/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java index cc513c7..91deac4 100644 --- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java +++ b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java @@ -70,7 +70,8 @@ public class AvroFileTarget extends FileTargetImpl { throw new IllegalStateException("Avro targets must use the same output schema"); } Avros.configureReflectDataFactory(conf); - configureForMapReduce(job, AvroWrapper.class, NullWritable.class, outputPath, name); + configureForMapReduce(job, AvroWrapper.class, NullWritable.class, AvroOutputFormat.class, + outputPath, name); } @Override http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/32c33eb6/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java index 22df7f8..ecae0de 100644 --- a/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java +++ b/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java @@ -43,10 +43,11 @@ public class FileTargetImpl implements PathTarget { Converter converter = ptype.getConverter(); Class keyClass = converter.getKeyClass(); Class valueClass = converter.getValueClass(); - configureForMapReduce(job, keyClass, valueClass, outputPath, name); + configureForMapReduce(job, keyClass, valueClass, outputFormatClass, outputPath, name); } - protected void configureForMapReduce(Job job, Class keyClass, Class valueClass, Path outputPath, String name) { + protected void configureForMapReduce(Job job, Class keyClass, Class valueClass, + Class outputFormatClass, Path outputPath, String name) { try { FileOutputFormat.setOutputPath(job, outputPath); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/32c33eb6/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java index 1c41d97..aa2f8e8 100644 --- a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java +++ b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java @@ -19,18 +19,31 @@ package org.apache.crunch.io.text; import org.apache.crunch.SourceTarget; import org.apache.crunch.io.impl.FileTargetImpl; +import org.apache.crunch.types.Converter; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; +import org.apache.crunch.types.avro.AvroTextOutputFormat; +import org.apache.crunch.types.avro.AvroTypeFamily; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class TextFileTarget extends FileTargetImpl { - public TextFileTarget(String path) { + private static Class<? extends FileOutputFormat> getOutputFormat(PType<?> ptype) { + if (ptype.getFamily().equals(AvroTypeFamily.getInstance())) { + return AvroTextOutputFormat.class; + } else { + return TextOutputFormat.class; + } + } + + public <T> TextFileTarget(String path) { this(new Path(path)); } - public TextFileTarget(Path path) { - super(path, TextOutputFormat.class); + public <T> TextFileTarget(Path path) { + super(path, null); } @Override @@ -44,6 +57,14 @@ public class TextFileTarget extends FileTargetImpl { } @Override + public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) { + Converter converter = ptype.getConverter(); + Class keyClass = converter.getKeyClass(); + Class valueClass = converter.getValueClass(); + configureForMapReduce(job, keyClass, valueClass, getOutputFormat(ptype), outputPath, name); + } + + @Override public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) { if (ptype instanceof PTableType) { return null; http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/32c33eb6/crunch/src/main/java/org/apache/crunch/types/avro/AvroTextOutputFormat.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTextOutputFormat.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTextOutputFormat.java new file mode 100644 index 0000000..4930235 --- /dev/null +++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTextOutputFormat.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.types.avro; + +import java.io.IOException; + +import org.apache.avro.mapred.AvroWrapper; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; + +public class AvroTextOutputFormat<K, V> extends TextOutputFormat<K, V> { + class DatumRecordTextWriter extends RecordWriter<K, V> { + private RecordWriter lineRecordWriter; + + public DatumRecordTextWriter(RecordWriter recordWriter) { + this.lineRecordWriter = recordWriter; + } + + @Override + public void close(TaskAttemptContext context) throws IOException, InterruptedException { + lineRecordWriter.close(context); + } + + @Override + public void write(K arg0, V arg1) throws IOException, InterruptedException { + lineRecordWriter.write(getData(arg0), getData(arg1)); + } + + private Object getData(Object o) { + Object data = o; + if (o instanceof AvroWrapper) { + data = ((AvroWrapper) o).datum(); + } + return data; + } + } + + @Override + public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { + RecordWriter<K, V> recordWriter = super.getRecordWriter(context); + return new DatumRecordTextWriter(recordWriter); + } + +}
