Repository: crunch Updated Branches: refs/heads/master 95e92fc89 -> 29d1ce4cd
CRUNCH-552: Add support/tests for Parquet files w/Crunch on Spark Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/29d1ce4c Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/29d1ce4c Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/29d1ce4c Branch: refs/heads/master Commit: 29d1ce4cddcddc86cda36bd464c26df4bdff3ae4 Parents: 95e92fc Author: Josh Wills <[email protected]> Authored: Mon Jul 27 17:13:30 2015 -0700 Committer: Josh Wills <[email protected]> Committed: Mon Jul 27 19:07:12 2015 -0700 ---------------------------------------------------------------------- crunch-core/pom.xml | 17 -- .../io/parquet/AvroParquetFileTarget.java | 5 +- crunch-core/src/test/avro/employee.avsc | 26 -- crunch-core/src/test/avro/person.avsc | 26 -- .../crunch/SparkAvroParquetPipelineIT.java | 242 +++++++++++++++++++ .../apache/crunch/impl/spark/SparkRuntime.java | 1 + crunch-test/pom.xml | 17 ++ crunch-test/src/main/avro/employee.avsc | 26 ++ crunch-test/src/main/avro/person.avsc | 26 ++ 9 files changed, 314 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/29d1ce4c/crunch-core/pom.xml ---------------------------------------------------------------------- diff --git a/crunch-core/pom.xml b/crunch-core/pom.xml index 59794f0..d88a652 100644 --- a/crunch-core/pom.xml +++ b/crunch-core/pom.xml @@ -176,23 +176,6 @@ under the License. <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-failsafe-plugin</artifactId> </plugin> - <plugin> - <groupId>org.apache.avro</groupId> - <artifactId>avro-maven-plugin</artifactId> - <executions> - <execution> - <id>schemas</id> - <phase>generate-sources</phase> - <goals> - <goal>schema</goal> - </goals> - <configuration> - <testSourceDirectory>${project.basedir}/src/test/avro/</testSourceDirectory> - <testOutputDirectory>target/generated-test-sources/</testOutputDirectory> - </configuration> - </execution> - </executions> - </plugin> </plugins> </build> http://git-wip-us.apache.org/repos/asf/crunch/blob/29d1ce4c/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java index 3c2847d..5fb4c53 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java @@ -107,7 +107,7 @@ public class AvroParquetFileTarget extends FileTargetImpl { return null; } - static class CrunchAvroWriteSupport extends AvroWriteSupport { + public static class CrunchAvroWriteSupport extends AvroWriteSupport { @Override public WriteContext init(Configuration conf) { String outputName = conf.get("crunch.namedoutput"); @@ -119,8 +119,7 @@ public class AvroParquetFileTarget extends FileTargetImpl { } } - static class CrunchAvroParquetOutputFormat extends ParquetOutputFormat<IndexedRecord> { - + public static class CrunchAvroParquetOutputFormat extends ParquetOutputFormat<IndexedRecord> { public CrunchAvroParquetOutputFormat() { super(new CrunchAvroWriteSupport()); } http://git-wip-us.apache.org/repos/asf/crunch/blob/29d1ce4c/crunch-core/src/test/avro/employee.avsc ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/avro/employee.avsc b/crunch-core/src/test/avro/employee.avsc deleted file mode 100644 index 35726e1..0000000 --- a/crunch-core/src/test/avro/employee.avsc +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -{ -"namespace": "org.apache.crunch.test", -"name": "Employee", -"type": "record", -"fields": [ - {"name": "name", "type": ["string", "null"] }, - {"name": "salary", "type": "int"}, - {"name": "department", "type": ["string", "null"] } ] -} http://git-wip-us.apache.org/repos/asf/crunch/blob/29d1ce4c/crunch-core/src/test/avro/person.avsc ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/avro/person.avsc b/crunch-core/src/test/avro/person.avsc deleted file mode 100644 index eb24071..0000000 --- a/crunch-core/src/test/avro/person.avsc +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -{ -"namespace": "org.apache.crunch.test", -"name": "Person", -"type": "record", -"fields": [ - {"name": "name", "type": ["string", "null"] }, - {"name": "age", "type": "int"}, - {"name": "siblingnames", "type" : ["null", { "type": "array", "items": "string" }], "default": null } ] -} http://git-wip-us.apache.org/repos/asf/crunch/blob/29d1ce4c/crunch-spark/src/it/java/org/apache/crunch/SparkAvroParquetPipelineIT.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/it/java/org/apache/crunch/SparkAvroParquetPipelineIT.java b/crunch-spark/src/it/java/org/apache/crunch/SparkAvroParquetPipelineIT.java new file mode 100644 index 0000000..f5e2c25 --- /dev/null +++ b/crunch-spark/src/it/java/org/apache/crunch/SparkAvroParquetPipelineIT.java @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch; + +import com.google.common.collect.Lists; +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.crunch.impl.mr.run.RuntimeParameters; +import org.apache.crunch.impl.spark.SparkPipeline; +import org.apache.crunch.io.At; +import org.apache.crunch.io.parquet.AvroParquetFileSource; +import org.apache.crunch.io.parquet.AvroParquetFileSourceTarget; +import org.apache.crunch.io.parquet.AvroParquetFileTarget; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.Employee; +import org.apache.crunch.test.Person; +import org.apache.crunch.types.avro.Avros; +import org.apache.hadoop.fs.Path; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import parquet.avro.AvroParquetReader; +import parquet.avro.AvroParquetWriter; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.List; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +public class SparkAvroParquetPipelineIT implements Serializable { + + private transient File avroFile; + + @Rule + public transient TemporaryPath tmpDir = new TemporaryPath(RuntimeParameters.TMP_DIR, "hadoop.tmp.dir"); + + @Before + public void setUp() throws IOException { + avroFile = tmpDir.getFile("test.avro.parquet"); + } + + private void populateGenericFile(List<GenericRecord> genericRecords, Schema schema) throws IOException { + FileOutputStream outputStream = new FileOutputStream(this.avroFile); + GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>(schema); + + DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(genericDatumWriter); + dataFileWriter.create(schema, outputStream); + + for (GenericRecord record : genericRecords) { + dataFileWriter.append(record); + } + + dataFileWriter.close(); + outputStream.close(); + } + + private void populateGenericParquetFile(List<GenericRecord> genericRecords, Schema schema) throws IOException { + AvroParquetWriter<GenericRecord> writer = new AvroParquetWriter<GenericRecord>( + new Path(avroFile.getPath()), schema); + + for (GenericRecord record : genericRecords) { + writer.write(record); + } + + writer.close(); + } + + @Test + public void toAvroParquetFileTarget() throws Exception { + GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$); + savedRecord.put("name", "John Doe"); + savedRecord.put("age", 42); + savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); + populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$); + + Pipeline pipeline = new SparkPipeline("local", "avroparq"); + PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(), + Avros.records(Person.class))); + File outputFile = tmpDir.getFile("output"); + Target parquetFileTarget = new AvroParquetFileTarget(outputFile.getAbsolutePath()); + pipeline.write(genericCollection, parquetFileTarget); + pipeline.run(); + + Person person = genericCollection.materialize().iterator().next(); + + Path parquetFile = new Path(new File(outputFile, "part-r-00000.parquet").getPath()); + + AvroParquetReader<Person> reader = new AvroParquetReader<Person>(parquetFile); + + try { + Person readPerson = reader.read(); + assertThat(readPerson, is(person)); + } finally { + reader.close(); + pipeline.done(); + } + } + + @Test + public void toAvroParquetFileTargetFromParquet() throws Exception { + GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$); + savedRecord.put("name", "John Doe"); + savedRecord.put("age", 42); + savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); + populateGenericParquetFile(Lists.newArrayList(savedRecord), Person.SCHEMA$); + + Pipeline pipeline = new SparkPipeline("local", "avroparq"); + PCollection<Person> genericCollection = pipeline.read( + new AvroParquetFileSource<Person>(new Path(avroFile.getAbsolutePath()), Avros.records(Person.class))); + File outputFile = tmpDir.getFile("output"); + Target parquetFileTarget = new AvroParquetFileTarget(outputFile.getAbsolutePath()); + pipeline.write(genericCollection, parquetFileTarget); + pipeline.run(); + + Person person = genericCollection.materialize().iterator().next(); + + Path parquetFile = new Path(new File(outputFile, "part-r-00000.parquet").getPath()); + + AvroParquetReader<Person> reader = new AvroParquetReader<Person>(parquetFile); + + try { + Person readPerson = reader.read(); + assertThat(readPerson, is(person)); + } finally { + reader.close(); + pipeline.done(); + } + } + + @Test + public void toAvroParquetFileMultipleTarget() throws Exception { + GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$); + savedRecord.put("name", "John Doe"); + savedRecord.put("age", 42); + savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); + populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$); + + Pipeline pipeline = new SparkPipeline("local", "avroparq"); + PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(), + Avros.records(Person.class))); + + PCollection<Employee> employees = genericCollection.parallelDo(new DoFn<Person, Employee>() { + @Override + public void process(Person person, Emitter<Employee> emitter) { + emitter.emit(new Employee(person.getName(), 0, "Eng")); + } + }, Avros.records(Employee.class)); + + File output1File = tmpDir.getFile("output1"); + File output2File = tmpDir.getFile("output2"); + pipeline.write(genericCollection, new AvroParquetFileTarget(output1File.getAbsolutePath())); + pipeline.write(employees, new AvroParquetFileSourceTarget(new Path(output2File.getAbsolutePath()), + Avros.records(Employee.class))); + pipeline.run(); + + Person person = genericCollection.materialize().iterator().next(); + Employee employee = employees.materialize().iterator().next(); + + Path parquet1File = new Path(new File(output1File, "part-r-00000.parquet").getPath()); + Path parquet2File = new Path(new File(output2File, "part-r-00000.parquet").getPath()); + + AvroParquetReader<Person> personReader = new AvroParquetReader<Person>(parquet1File); + + try { + Person readPerson = personReader.read(); + assertThat(readPerson, is(person)); + } finally { + personReader.close(); + pipeline.done(); + } + + AvroParquetReader<Employee> employeeReader = new AvroParquetReader<Employee>(parquet2File); + + try { + Employee readEmployee = employeeReader.read(); + assertThat(readEmployee, is(employee)); + } finally { + employeeReader.close(); + } + + } + + @Test + public void toAvroParquetFileTargetReadSource() throws Exception { + GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$); + savedRecord.put("name", "John Doe"); + savedRecord.put("age", 42); + savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); + populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$); + + Pipeline pipeline = new SparkPipeline("local", "avroparq"); + PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(), + Avros.records(Person.class))); + File outputFile = tmpDir.getFile("output"); + Target parquetFileTarget = new AvroParquetFileTarget(outputFile.getAbsolutePath()); + pipeline.write(genericCollection, parquetFileTarget); + pipeline.run(); + + Person person = genericCollection.materialize().iterator().next(); + + PCollection<Person> retrievedPeople = pipeline.read(new AvroParquetFileSource<Person>( + new Path(outputFile.toURI()), Avros.records(Person.class))); + + Person retrievedPerson = retrievedPeople.materialize().iterator().next(); + + assertThat(retrievedPerson, is(person)); + + Path parquetFile = new Path(new File(outputFile, "part-r-00000.parquet").getPath()); + + AvroParquetReader<Person> reader = new AvroParquetReader<Person>(parquetFile); + + try { + Person readPerson = reader.read(); + assertThat(readPerson, is(person)); + } finally { + reader.close(); + pipeline.done(); + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/29d1ce4c/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java index 5798e4c..f1dce0b 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java @@ -331,6 +331,7 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe job.setOutputKeyClass(outConfig.keyClass); job.setOutputValueClass(outConfig.valueClass); outConfig.bundle.configure(job.getConfiguration()); + job.getConfiguration().set("crunch.namedoutput", "out0"); Path tmpPath = pipeline.createTempPath(); outRDD.saveAsNewAPIHadoopFile( tmpPath.toString(), http://git-wip-us.apache.org/repos/asf/crunch/blob/29d1ce4c/crunch-test/pom.xml ---------------------------------------------------------------------- diff --git a/crunch-test/pom.xml b/crunch-test/pom.xml index bb6f493..a023066 100644 --- a/crunch-test/pom.xml +++ b/crunch-test/pom.xml @@ -89,6 +89,23 @@ under the License. <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-failsafe-plugin</artifactId> </plugin> + <plugin> + <groupId>org.apache.avro</groupId> + <artifactId>avro-maven-plugin</artifactId> + <executions> + <execution> + <id>schemas</id> + <phase>generate-sources</phase> + <goals> + <goal>schema</goal> + </goals> + <configuration> + <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory> + <outputDirectory>target/generated-sources/</outputDirectory> + </configuration> + </execution> + </executions> + </plugin> </plugins> </build> http://git-wip-us.apache.org/repos/asf/crunch/blob/29d1ce4c/crunch-test/src/main/avro/employee.avsc ---------------------------------------------------------------------- diff --git a/crunch-test/src/main/avro/employee.avsc b/crunch-test/src/main/avro/employee.avsc new file mode 100644 index 0000000..35726e1 --- /dev/null +++ b/crunch-test/src/main/avro/employee.avsc @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ +"namespace": "org.apache.crunch.test", +"name": "Employee", +"type": "record", +"fields": [ + {"name": "name", "type": ["string", "null"] }, + {"name": "salary", "type": "int"}, + {"name": "department", "type": ["string", "null"] } ] +} http://git-wip-us.apache.org/repos/asf/crunch/blob/29d1ce4c/crunch-test/src/main/avro/person.avsc ---------------------------------------------------------------------- diff --git a/crunch-test/src/main/avro/person.avsc b/crunch-test/src/main/avro/person.avsc new file mode 100644 index 0000000..eb24071 --- /dev/null +++ b/crunch-test/src/main/avro/person.avsc @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ +"namespace": "org.apache.crunch.test", +"name": "Person", +"type": "record", +"fields": [ + {"name": "name", "type": ["string", "null"] }, + {"name": "age", "type": "int"}, + {"name": "siblingnames", "type" : ["null", { "type": "array", "items": "string" }], "default": null } ] +}
