Updated Branches: refs/heads/master 910b6afbe -> f47e778b7
CRUNCH-277. Support Parquet. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/f47e778b Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/f47e778b Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/f47e778b Branch: refs/heads/master Commit: f47e778b78e7d667556cfa0f9ff0c3a936e91e3e Parents: 910b6af Author: Tom White <[email protected]> Authored: Wed Oct 9 12:04:43 2013 +0100 Committer: Tom White <[email protected]> Committed: Wed Oct 9 12:04:43 2013 +0100 ---------------------------------------------------------------------- crunch-core/pom.xml | 5 + .../parquet/AvroParquetFileSourceTargetIT.java | 114 +++++++++ .../io/parquet/AvroParquetPipelineIT.java | 237 +++++++++++++++++++ .../crunch/io/parquet/AvroParquetConverter.java | 59 +++++ .../parquet/AvroParquetFileReaderFactory.java | 100 ++++++++ .../io/parquet/AvroParquetFileSource.java | 69 ++++++ .../io/parquet/AvroParquetFileSourceTarget.java | 41 ++++ .../io/parquet/AvroParquetFileTarget.java | 119 ++++++++++ .../AvroParquetFileReaderFactoryTest.java | 104 ++++++++ pom.xml | 13 + 10 files changed, 861 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/f47e778b/crunch-core/pom.xml ---------------------------------------------------------------------- diff --git a/crunch-core/pom.xml b/crunch-core/pom.xml index 129990f..8a5beb6 100644 --- a/crunch-core/pom.xml +++ b/crunch-core/pom.xml @@ -54,6 +54,11 @@ under the License. </dependency> <dependency> + <groupId>com.twitter</groupId> + <artifactId>parquet-avro</artifactId> + </dependency> + + <dependency> <groupId>org.javassist</groupId> <artifactId>javassist</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/crunch/blob/f47e778b/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java b/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java new file mode 100644 index 0000000..b6d51f2 --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.parquet; + +import com.google.common.collect.Lists; +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData.Record; +import org.apache.avro.generic.GenericRecord; +import org.apache.crunch.PCollection; +import org.apache.crunch.Pipeline; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.test.Person; +import org.apache.crunch.test.StringWrapper; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.avro.AvroType; +import org.apache.crunch.types.avro.Avros; +import org.apache.hadoop.fs.Path; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import parquet.avro.AvroParquetWriter; + +import static org.junit.Assert.assertEquals; + +@SuppressWarnings("serial") +public class AvroParquetFileSourceTargetIT implements Serializable { + + private transient File avroFile; + @Rule + public transient TemporaryPath tmpDir = TemporaryPaths.create(); + + @Before + public void setUp() throws IOException { + avroFile = tmpDir.getFile("test.avro.parquet"); + } + + private void populateGenericFile(List<GenericRecord> genericRecords, Schema schema) throws IOException { + AvroParquetWriter<GenericRecord> writer = new AvroParquetWriter<GenericRecord>( + new Path(avroFile.getPath()), schema); + + for (GenericRecord record : genericRecords) { + writer.write(record); + } + + writer.close(); + } + + @Test + public void testSpecific() throws IOException { + GenericRecord savedRecord = new Record(Person.SCHEMA$); + savedRecord.put("name", "John Doe"); + savedRecord.put("age", 42); + savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); + populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$); + + Pipeline pipeline = new MRPipeline(AvroParquetFileSourceTargetIT.class, tmpDir.getDefaultConfiguration()); + PCollection<Person> genericCollection = pipeline.read(new AvroParquetFileSource<Person>(new Path(avroFile.getAbsolutePath()), + Avros.records(Person.class))); + + List<Person> personList = Lists.newArrayList(genericCollection.materialize()); + + Person expectedPerson = new Person(); + expectedPerson.name = "John Doe"; + expectedPerson.age = 42; + + List<CharSequence> siblingNames = Lists.newArrayList(); + siblingNames.add("Jimmy"); + siblingNames.add("Jane"); + expectedPerson.siblingnames = siblingNames; + + assertEquals(Lists.newArrayList(expectedPerson), Lists.newArrayList(personList)); + } + + @Test + public void testGeneric() throws IOException { + String genericSchemaJson = Person.SCHEMA$.toString().replace("Person", "GenericPerson"); + Schema genericPersonSchema = new Schema.Parser().parse(genericSchemaJson); + GenericRecord savedRecord = new Record(genericPersonSchema); + savedRecord.put("name", "John Doe"); + savedRecord.put("age", 42); + savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); + populateGenericFile(Lists.newArrayList(savedRecord), genericPersonSchema); + + Pipeline pipeline = new MRPipeline(AvroParquetFileSourceTargetIT.class, tmpDir.getDefaultConfiguration()); + PCollection<Record> genericCollection = pipeline.read(new AvroParquetFileSource<Record>(new Path + (avroFile.getAbsolutePath()), + Avros.generics(genericPersonSchema))); + + List<Record> recordList = Lists.newArrayList(genericCollection.materialize()); + + assertEquals(Lists.newArrayList(savedRecord), Lists.newArrayList(recordList)); + } + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/f47e778b/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetPipelineIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetPipelineIT.java b/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetPipelineIT.java new file mode 100644 index 0000000..055d0d7 --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetPipelineIT.java @@ -0,0 +1,237 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.crunch.io.parquet; + +import com.google.common.collect.Lists; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.PCollection; +import org.apache.crunch.Pipeline; +import org.apache.crunch.Target; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.At; +import org.apache.crunch.test.Employee; +import org.apache.crunch.test.Person; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.avro.Avros; +import org.apache.hadoop.fs.Path; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import parquet.avro.AvroParquetReader; +import parquet.avro.AvroParquetWriter; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +public class AvroParquetPipelineIT implements Serializable { + + private transient File avroFile; + @Rule + public transient TemporaryPath tmpDir = TemporaryPaths.create(); + + @Before + public void setUp() throws IOException { + avroFile = tmpDir.getFile("test.avro.parquet"); + } + + private void populateGenericFile(List<GenericRecord> genericRecords, Schema schema) throws IOException { + FileOutputStream outputStream = new FileOutputStream(this.avroFile); + GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>(schema); + + DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(genericDatumWriter); + dataFileWriter.create(schema, outputStream); + + for (GenericRecord record : genericRecords) { + dataFileWriter.append(record); + } + + dataFileWriter.close(); + outputStream.close(); + } + + private void populateGenericParquetFile(List<GenericRecord> genericRecords, Schema schema) throws IOException { + AvroParquetWriter<GenericRecord> writer = new AvroParquetWriter<GenericRecord>( + new Path(avroFile.getPath()), schema); + + for (GenericRecord record : genericRecords) { + writer.write(record); + } + + writer.close(); + } + + @Test + public void toAvroParquetFileTarget() throws Exception { + GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$); + savedRecord.put("name", "John Doe"); + savedRecord.put("age", 42); + savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); + populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$); + + Pipeline pipeline = new MRPipeline(AvroParquetPipelineIT.class, tmpDir.getDefaultConfiguration()); + PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(), + Avros.records(Person.class))); + File outputFile = tmpDir.getFile("output"); + Target parquetFileTarget = new AvroParquetFileTarget(outputFile.getAbsolutePath()); + pipeline.write(genericCollection, parquetFileTarget); + pipeline.run(); + + Person person = genericCollection.materialize().iterator().next(); + + Path parquetFile = new Path(new File(outputFile, "part-m-00000.parquet").getPath()); + + AvroParquetReader<Person> reader = new AvroParquetReader<Person>(parquetFile); + + try { + Person readPerson = reader.read(); + assertThat(readPerson, is(person)); + } finally { + reader.close(); + } + } + + @Test + public void toAvroParquetFileTargetFromParquet() throws Exception { + GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$); + savedRecord.put("name", "John Doe"); + savedRecord.put("age", 42); + savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); + populateGenericParquetFile(Lists.newArrayList(savedRecord), Person.SCHEMA$); + + Pipeline pipeline = new MRPipeline(AvroParquetPipelineIT.class, tmpDir.getDefaultConfiguration()); + PCollection<Person> genericCollection = pipeline.read( + new AvroParquetFileSource<Person>(new Path(avroFile.getAbsolutePath()), Avros.records(Person.class))); + File outputFile = tmpDir.getFile("output"); + Target parquetFileTarget = new AvroParquetFileTarget(outputFile.getAbsolutePath()); + pipeline.write(genericCollection, parquetFileTarget); + pipeline.run(); + + Person person = genericCollection.materialize().iterator().next(); + + Path parquetFile = new Path(new File(outputFile, "part-m-00000.parquet").getPath()); + + AvroParquetReader<Person> reader = new AvroParquetReader<Person>(parquetFile); + + try { + Person readPerson = reader.read(); + assertThat(readPerson, is(person)); + } finally { + reader.close(); + } + } + + @Test + public void toAvroParquetFileMultipleTarget() throws Exception { + GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$); + savedRecord.put("name", "John Doe"); + savedRecord.put("age", 42); + savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); + populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$); + + Pipeline pipeline = new MRPipeline(AvroParquetPipelineIT.class, tmpDir.getDefaultConfiguration()); + PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(), + Avros.records(Person.class))); + + PCollection<Employee> employees = genericCollection.parallelDo(new DoFn<Person, Employee>() { + @Override + public void process(Person person, Emitter<Employee> emitter) { + emitter.emit(new Employee(person.getName(), 0, "Eng")); + } + }, Avros.records(Employee.class)); + + File output1File = tmpDir.getFile("output1"); + File output2File = tmpDir.getFile("output2"); + pipeline.write(genericCollection, new AvroParquetFileTarget(output1File.getAbsolutePath())); + pipeline.write(employees, new AvroParquetFileSourceTarget(new Path(output2File.getAbsolutePath()), + Avros.records(Employee.class))); + pipeline.run(); + + Person person = genericCollection.materialize().iterator().next(); + Employee employee = employees.materialize().iterator().next(); + + Path parquet1File = new Path(new File(output1File, "part-m-00000.parquet").getPath()); + Path parquet2File = new Path(new File(output2File, "part-m-00000.parquet").getPath()); + + AvroParquetReader<Person> personReader = new AvroParquetReader<Person>(parquet1File); + + try { + Person readPerson = personReader.read(); + assertThat(readPerson, is(person)); + } finally { + personReader.close(); + } + + AvroParquetReader<Employee> employeeReader = new AvroParquetReader<Employee>(parquet2File); + + try { + Employee readEmployee = employeeReader.read(); + assertThat(readEmployee, is(employee)); + } finally { + employeeReader.close(); + } + + } + + @Test + public void toAvroParquetFileTargetReadSource() throws Exception { + GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$); + savedRecord.put("name", "John Doe"); + savedRecord.put("age", 42); + savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); + populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$); + + Pipeline pipeline = new MRPipeline(AvroParquetPipelineIT.class, tmpDir.getDefaultConfiguration()); + PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(), + Avros.records(Person.class))); + File outputFile = tmpDir.getFile("output"); + Target parquetFileTarget = new AvroParquetFileTarget(outputFile.getAbsolutePath()); + pipeline.write(genericCollection, parquetFileTarget); + pipeline.run(); + + Person person = genericCollection.materialize().iterator().next(); + + PCollection<Person> retrievedPeople = pipeline.read(new AvroParquetFileSource<Person>( + new Path(outputFile.toURI()), Avros.records(Person.class))); + + Person retrievedPerson = retrievedPeople.materialize().iterator().next(); + + assertThat(retrievedPerson, is(person)); + + Path parquetFile = new Path(new File(outputFile, "part-m-00000.parquet").getPath()); + + AvroParquetReader<Person> reader = new AvroParquetReader<Person>(parquetFile); + + try { + Person readPerson = reader.read(); + assertThat(readPerson, is(person)); + } finally { + reader.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/f47e778b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetConverter.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetConverter.java b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetConverter.java new file mode 100644 index 0000000..5cb231f --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetConverter.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.parquet; + +import org.apache.crunch.types.Converter; +import org.apache.crunch.types.avro.AvroType; + +class AvroParquetConverter<T> implements Converter<Void, T, T, Iterable<T>> { + private AvroType<T> ptype; + + public AvroParquetConverter(AvroType<T> ptype) { + this.ptype = ptype; + } + + @Override + public T convertInput(Void key, T value) { + return value; + } + + @Override + public Iterable<T> convertIterableInput(Void key, Iterable<T> value) { + return value; + } + + @Override + public Void outputKey(T value) { + return null; + } + + @Override + public T outputValue(T value) { + return value; + } + + @Override + public Class<Void> getKeyClass() { + return Void.class; + } + + @Override + public Class<T> getValueClass() { + return ptype.getTypeClass(); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/f47e778b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileReaderFactory.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileReaderFactory.java b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileReaderFactory.java new file mode 100644 index 0000000..c193563 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileReaderFactory.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.parquet; + +import com.google.common.collect.UnmodifiableIterator; +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import org.apache.avro.generic.IndexedRecord; +import org.apache.crunch.CrunchRuntimeException; +import org.apache.crunch.io.FileReaderFactory; +import org.apache.crunch.io.impl.AutoClosingIterator; +import org.apache.crunch.types.avro.AvroType; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import parquet.avro.AvroReadSupport; +import parquet.hadoop.ParquetReader; +import parquet.schema.MessageType; + +class AvroParquetFileReaderFactory<T> implements FileReaderFactory<T> { + + private AvroType<T> avroType; + + public AvroParquetFileReaderFactory(AvroType<T> avroType) { + this.avroType = avroType; + } + + @Override + @SuppressWarnings("unchecked") + public Iterator<T> read(FileSystem fs, Path path) { + Path p = fs.makeQualified(path); + final ParquetReader reader; + try { + reader = new ParquetReader(p, new CrunchAvroReadSupport(avroType)); + } catch (IOException e) { + throw new CrunchRuntimeException(e); + } + return new AutoClosingIterator<T>(reader, new UnmodifiableIterator<T>() { + + private T next; + + @Override + public boolean hasNext() { + if (next != null) { + return true; + } + try { + next = (T) reader.read(); + } catch (IOException e) { + throw new CrunchRuntimeException(e); + } + return next != null; + } + + @Override + public T next() { + if (hasNext()) { + T ret = next; + next = null; + return ret; + } + throw new NoSuchElementException(); + } + }); + + } + + static class CrunchAvroReadSupport<T extends IndexedRecord> extends AvroReadSupport<T> { + private AvroType<T> avroType; + + public CrunchAvroReadSupport(AvroType<T> avroType) { + this.avroType = avroType; + } + + @Override + public ReadContext init(Configuration configuration, Map<String, String> keyValueMetaData, MessageType fileSchema) { + if (avroType != null) { + setRequestedProjection(configuration, avroType.getSchema()); + } + return super.init(configuration, keyValueMetaData, fileSchema); + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/f47e778b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java new file mode 100644 index 0000000..81678d4 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.parquet; + +import java.io.IOException; +import java.util.List; +import org.apache.crunch.impl.mr.run.RuntimeParameters; +import org.apache.crunch.io.FormatBundle; +import org.apache.crunch.io.ReadableSource; +import org.apache.crunch.io.impl.FileSourceImpl; +import org.apache.crunch.types.Converter; +import org.apache.crunch.types.avro.AvroType; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import parquet.avro.AvroParquetInputFormat; +import parquet.avro.AvroReadSupport; + +public class AvroParquetFileSource<T> extends FileSourceImpl<T> implements ReadableSource<T> { + + private static <S> FormatBundle<AvroParquetInputFormat> getBundle(AvroType<S> ptype) { + return FormatBundle.forInput(AvroParquetInputFormat.class) + .set(AvroReadSupport.AVRO_REQUESTED_PROJECTION, ptype.getSchema().toString()) + // ParquetRecordReader expects ParquetInputSplits, not FileSplits, so it + // doesn't work with CombineFileInputFormat + .set(RuntimeParameters.DISABLE_COMBINE_FILE, "true"); + } + + public AvroParquetFileSource(Path path, AvroType<T> ptype) { + super(path, ptype, getBundle(ptype)); + } + + public AvroParquetFileSource(List<Path> paths, AvroType<T> ptype) { + super(paths, ptype, getBundle(ptype)); + } + + @Override + public Iterable<T> read(Configuration conf) throws IOException { + return read(conf, getFileReaderFactory((AvroType<T>) ptype)); + } + + protected AvroParquetFileReaderFactory<T> getFileReaderFactory(AvroType<T> ptype){ + return new AvroParquetFileReaderFactory<T>(ptype); + } + + @Override + public Converter<?, ?, ?, ?> getConverter() { + return new AvroParquetConverter<T>((AvroType<T>) ptype); + } + + @Override + public String toString() { + return "Parquet(" + pathsAsString() + ")"; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/f47e778b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTarget.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTarget.java new file mode 100644 index 0000000..8d93eba --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTarget.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.parquet; + +import org.apache.crunch.io.FileNamingScheme; +import org.apache.crunch.io.SequentialFileNamingScheme; +import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl; +import org.apache.crunch.types.avro.AvroType; +import org.apache.hadoop.fs.Path; + +public class AvroParquetFileSourceTarget<T> extends ReadableSourcePathTargetImpl<T> { + + public AvroParquetFileSourceTarget(Path path, AvroType<T> atype) { + this(path, atype, SequentialFileNamingScheme.getInstance()); + } + + public AvroParquetFileSourceTarget(Path path, AvroType<T> atype, FileNamingScheme fileNamingScheme) { + super(new AvroParquetFileSource<T>(path, atype), new AvroParquetFileTarget(path), + fileNamingScheme); + } + + @Override + public String toString() { + return target.toString(); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/f47e778b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java new file mode 100644 index 0000000..c67b9f1 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.parquet; + +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.crunch.SourceTarget; +import org.apache.crunch.io.FileNamingScheme; +import org.apache.crunch.io.OutputHandler; +import org.apache.crunch.io.SequentialFileNamingScheme; +import org.apache.crunch.io.impl.FileTargetImpl; +import org.apache.crunch.types.Converter; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.avro.AvroType; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import parquet.avro.AvroWriteSupport; +import parquet.hadoop.ParquetOutputFormat; + +public class AvroParquetFileTarget extends FileTargetImpl { + + private static final String PARQUET_AVRO_SCHEMA_PARAMETER = "parquet.avro.schema"; + + public AvroParquetFileTarget(String path) { + this(new Path(path)); + } + + public AvroParquetFileTarget(Path path) { + this(path, SequentialFileNamingScheme.getInstance()); + } + + public AvroParquetFileTarget(Path path, FileNamingScheme fileNamingScheme) { + super(path, CrunchAvroParquetOutputFormat.class, fileNamingScheme); + } + + @Override + public String toString() { + return "Parquet(" + path.toString() + ")"; + } + + @Override + public boolean accept(OutputHandler handler, PType<?> ptype) { + if (!(ptype instanceof AvroType)) { + return false; + } + handler.configure(this, ptype); + return true; + } + + @Override + @SuppressWarnings("unchecked") + public Converter<?, ?, ?, ?> getConverter(PType<?> ptype) { + return new AvroParquetConverter<Object>((AvroType<Object>) ptype); + } + + @Override + public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) { + AvroType<?> atype = (AvroType<?>) ptype; + Configuration conf = job.getConfiguration(); + String schemaParam; + if (name == null) { + schemaParam = PARQUET_AVRO_SCHEMA_PARAMETER; + } else { + schemaParam = PARQUET_AVRO_SCHEMA_PARAMETER + "." + name; + } + String outputSchema = conf.get(schemaParam); + if (outputSchema == null) { + conf.set(schemaParam, atype.getSchema().toString()); + } else if (!outputSchema.equals(atype.getSchema().toString())) { + throw new IllegalStateException("Avro targets must use the same output schema"); + } + configureForMapReduce(job, Void.class, atype.getTypeClass(), + CrunchAvroParquetOutputFormat.class, outputPath, name); + } + + @Override + public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) { + if (ptype instanceof AvroType) { + return new AvroParquetFileSourceTarget<T>(path, (AvroType<T>) ptype); + } + return null; + } + + static class CrunchAvroWriteSupport extends AvroWriteSupport { + @Override + public WriteContext init(Configuration conf) { + String outputName = conf.get("crunch.namedoutput"); + if (outputName != null && !outputName.isEmpty()) { + String schema = conf.get(PARQUET_AVRO_SCHEMA_PARAMETER + "." + outputName); + setSchema(conf, new Schema.Parser().parse(schema)); + } + return super.init(conf); + } + } + + static class CrunchAvroParquetOutputFormat extends ParquetOutputFormat<IndexedRecord> { + + public CrunchAvroParquetOutputFormat() { + super(new CrunchAvroWriteSupport()); + } + } + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/f47e778b/crunch-core/src/test/java/org/apache/crunch/io/parquet/AvroParquetFileReaderFactoryTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/io/parquet/AvroParquetFileReaderFactoryTest.java b/crunch-core/src/test/java/org/apache/crunch/io/parquet/AvroParquetFileReaderFactoryTest.java new file mode 100644 index 0000000..9f5ff70 --- /dev/null +++ b/crunch-core/src/test/java/org/apache/crunch/io/parquet/AvroParquetFileReaderFactoryTest.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.parquet; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; + +import com.google.common.collect.Lists; +import java.io.File; +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData.Record; +import org.apache.avro.generic.GenericRecord; +import org.apache.crunch.test.Person; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.avro.AvroType; +import org.apache.crunch.types.avro.Avros; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import parquet.avro.AvroParquetWriter; + +public class AvroParquetFileReaderFactoryTest { + + private File parquetFile; + + @Rule + public transient TemporaryPath tmpDir = TemporaryPaths.create(); + + @Before + public void setUp() throws IOException { + parquetFile = tmpDir.getFile("test.avro.parquet"); + } + + @After + public void tearDown() { + parquetFile.delete(); + } + + private void populateGenericFile(List<GenericRecord> genericRecords, Schema schema) throws IOException { + AvroParquetWriter<GenericRecord> writer = new AvroParquetWriter<GenericRecord>( + new Path(parquetFile.getPath()), schema); + + for (GenericRecord record : genericRecords) { + writer.write(record); + } + + writer.close(); + } + + private <T> AvroParquetFileReaderFactory<T> createFileReaderFactory(AvroType<T> avroType) { + return new AvroParquetFileReaderFactory<T>(avroType); + } + + @Test + public void testProjection() throws IOException { + String genericSchemaJson = Person.SCHEMA$.toString().replace("Person", "GenericPerson"); + Schema genericPersonSchema = new Schema.Parser().parse(genericSchemaJson); + GenericRecord savedRecord = new Record(genericPersonSchema); + savedRecord.put("name", "John Doe"); + savedRecord.put("age", 42); + savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); + populateGenericFile(Lists.newArrayList(savedRecord), genericPersonSchema); + + Schema projection = Schema.createRecord("projection", null, null, false); + projection.setFields(Lists.newArrayList(cloneField(genericPersonSchema.getField("name")))); + AvroParquetFileReaderFactory<Record> genericReader = createFileReaderFactory(Avros.generics(projection)); + Iterator<Record> recordIterator = genericReader.read(FileSystem.getLocal(new Configuration()), + new Path(this.parquetFile.getAbsolutePath())); + + GenericRecord genericRecord = recordIterator.next(); + assertEquals(savedRecord.get("name"), genericRecord.get("name")); + assertNull(genericRecord.get("age")); + assertFalse(recordIterator.hasNext()); + } + + public static Schema.Field cloneField(Schema.Field field) { + return new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultValue()); + } + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/f47e778b/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index ba3258c..45283f3 100644 --- a/pom.xml +++ b/pom.xml @@ -73,6 +73,7 @@ under the License. <commons-logging.version>1.1.1</commons-logging.version> <commons-cli.version>1.2</commons-cli.version> <avro.version>1.7.4</avro.version> + <parquet.version>1.2.0</parquet.version> <javassist.version>3.16.1-GA</javassist.version> <jackson.version>1.8.8</jackson.version> <protobuf-java.version>2.4.0a</protobuf-java.version> @@ -209,6 +210,18 @@ under the License. </dependency> <dependency> + <groupId>com.twitter</groupId> + <artifactId>parquet-avro</artifactId> + <version>${parquet.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-core</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> <groupId>org.javassist</groupId> <artifactId>javassist</artifactId> <version>${javassist.version}</version>
