Repository: crunch Updated Branches: refs/heads/master 253326148 -> 2c7821fd3
CRUNCH-480: Differentiate between read/projection schemas and selectively enable/disable the combine file format by default Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/2c7821fd Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/2c7821fd Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/2c7821fd Branch: refs/heads/master Commit: 2c7821fd353dd1618af112d27cbc3a936142d61c Parents: 2533261 Author: Josh Wills <[email protected]> Authored: Wed Nov 12 11:08:13 2014 -0800 Committer: Josh Wills <[email protected]> Committed: Wed Nov 12 11:08:13 2014 -0800 ---------------------------------------------------------------------- .../parquet/AvroParquetFileSourceTargetIT.java | 139 +++++++++++++++++-- .../io/parquet/AvroParquetFileSource.java | 65 ++++++--- pom.xml | 2 +- 3 files changed, 180 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/2c7821fd/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java b/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java index 5c7d9e0..d75d9da 100644 --- a/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java @@ -20,20 +20,26 @@ package org.apache.crunch.io.parquet; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.File; import java.io.IOException; import java.io.Serializable; import java.util.List; +import com.google.common.collect.Iterables; import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData.Record; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.util.Utf8; import org.apache.crunch.FilterFn; import org.apache.crunch.PCollection; import org.apache.crunch.Pipeline; import org.apache.crunch.Target; import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.From; import org.apache.crunch.io.To; import org.apache.crunch.io.avro.AvroFileSource; import org.apache.crunch.test.Person; @@ -145,14 +151,13 @@ public class AvroParquetFileSourceTargetIT implements Serializable { PCollection<Person> ageOnly = pipeline2.read( new AvroFileSource<Person>(new Path(outputFile.getAbsolutePath()), Avros.specifics(Person.class))); - for (Person person : ageOnly.materialize()) { - assertNull(person.getName()); - assertEquals(person.getAge(), new Integer(42)); - assertNull(person.getSiblingnames()); - } + Person person = Iterables.getOnlyElement(ageOnly.materialize()); + assertNull(person.getName()); + assertEquals(person.getAge(), new Integer(42)); + assertNull(person.getSiblingnames()); } - @Test(expected = IndexOutOfBoundsException.class) + @Test public void testProjectionGeneric() throws IOException { GenericRecord savedRecord = new Record(Person.SCHEMA$); savedRecord.put("name", "John Doe"); @@ -176,13 +181,129 @@ public class AvroParquetFileSourceTargetIT implements Serializable { PCollection<Record> ageOnly = pipeline2.read( new AvroFileSource<Record>(new Path(outputFile.getAbsolutePath()), Avros.generics(src.getProjectedSchema()))); - for (Record person : ageOnly.materialize()) { - assertEquals(person.get(0), 42); - Object notAge = person.get(1); + Record person = Iterables.getOnlyElement(ageOnly.materialize()); + assertEquals(person.get(0), 42); + try { + person.get(1); + fail("Trying to get field outside of projection should fail"); + } catch (IndexOutOfBoundsException e) { + // Expected } } @Test + public void testCustomReadSchema_FieldSubset() throws IOException { + Schema readSchema = SchemaBuilder.record("PersonSubset") + .namespace("org.apache.crunch.test") + .fields() + .optionalString("name") + .endRecord(); + GenericRecord savedRecord = new Record(Person.SCHEMA$); + savedRecord.put("name", "John Doe"); + savedRecord.put("age", 42); + savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); + populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$); + + Pipeline pipeline = new MRPipeline(AvroParquetFileSourceTargetIT.class, tmpDir.getDefaultConfiguration()); + PCollection<GenericRecord> genericCollection = pipeline.read( + AvroParquetFileSource.builder(readSchema) + .includeField("name") + .build(new Path(avroFile.getAbsolutePath()))); + + File outputFile = tmpDir.getFile("output"); + Target avroFile = To.avroFile(outputFile.getAbsolutePath()); + genericCollection.write(avroFile); + pipeline.done(); + + Pipeline pipeline2 = new MRPipeline(AvroParquetFileSourceTargetIT.class, + tmpDir.getDefaultConfiguration()); + PCollection<GenericData.Record> namedPersonRecords = pipeline2.read( + From.avroFile(new Path(outputFile.getAbsolutePath()))); + + GenericRecord personSubset = Iterables.getOnlyElement(namedPersonRecords.materialize()); + + assertEquals(readSchema, personSubset.getSchema()); + assertEquals(new Utf8("John Doe"), personSubset.get("name")); + } + + @Test + public void testCustomReadSchemaGeneric_FieldSuperset() throws IOException { + Schema readSchema = SchemaBuilder.record("PersonSuperset") + .namespace("org.apache.crunch.test") + .fields() + .optionalString("name") + .optionalInt("age") + .name("siblingnames").type(Person.SCHEMA$.getField("siblingnames").schema()).withDefault(null) + .name("employer").type().stringType().stringDefault("Acme Corp") + .endRecord(); + GenericRecord savedRecord = new Record(Person.SCHEMA$); + savedRecord.put("name", "John Doe"); + savedRecord.put("age", 42); + savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); + populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$); + + Pipeline pipeline = new MRPipeline(AvroParquetFileSourceTargetIT.class, tmpDir.getDefaultConfiguration()); + PCollection<GenericRecord> genericCollection = pipeline.read( + AvroParquetFileSource.builder(readSchema) + .build(new Path(avroFile.getAbsolutePath()))); + + File outputFile = tmpDir.getFile("output"); + Target avroFile = To.avroFile(outputFile.getAbsolutePath()); + genericCollection.write(avroFile); + pipeline.done(); + + Pipeline pipeline2 = new MRPipeline(AvroParquetFileSourceTargetIT.class, + tmpDir.getDefaultConfiguration()); + PCollection<GenericData.Record> namedPersonRecords = pipeline2.read( + From.avroFile(new Path(outputFile.getAbsolutePath()))); + + GenericRecord personSuperset = Iterables.getOnlyElement(namedPersonRecords.materialize()); + + assertEquals(readSchema, personSuperset.getSchema()); + assertEquals(new Utf8("John Doe"), personSuperset.get("name")); + assertEquals(42, personSuperset.get("age")); + assertEquals(Lists.newArrayList(new Utf8("Jimmy"), new Utf8("Jane")), personSuperset.get("siblingnames")); + assertEquals(new Utf8("Acme Corp"), personSuperset.get("employer")); + } + + @Test + public void testCustomReadSchemaWithProjection() throws IOException { + Schema readSchema = SchemaBuilder.record("PersonSubsetWithProjection") + .namespace("org.apache.crunch.test") + .fields() + .optionalString("name") + .optionalInt("age") + .endRecord(); + GenericRecord savedRecord = new Record(Person.SCHEMA$); + savedRecord.put("name", "John Doe"); + savedRecord.put("age", 42); + savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); + populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$); + + Pipeline pipeline = new MRPipeline(AvroParquetFileSourceTargetIT.class, tmpDir.getDefaultConfiguration()); + PCollection<GenericRecord> genericCollection = pipeline.read( + AvroParquetFileSource.builder(readSchema) + .includeField("name") + .build(new Path(avroFile.getAbsolutePath()))); + + File outputFile = tmpDir.getFile("output"); + Target avroFile = To.avroFile(outputFile.getAbsolutePath()); + genericCollection.write(avroFile); + pipeline.done(); + + Pipeline pipeline2 = new MRPipeline(AvroParquetFileSourceTargetIT.class, + tmpDir.getDefaultConfiguration()); + PCollection<GenericData.Record> namedPersonRecords = pipeline2.read( + From.avroFile(new Path(outputFile.getAbsolutePath()))); + + GenericRecord personSubset = Iterables.getOnlyElement(namedPersonRecords.materialize()); + + assertEquals(readSchema, personSubset.getSchema()); + assertEquals(new Utf8("John Doe"), personSubset.get("name")); + assertNull(personSubset.get("age")); + } + + @Test public void testProjectionFiltered() throws IOException { GenericRecord savedRecord = new Record(Person.SCHEMA$); savedRecord.put("name", "John Doe"); http://git-wip-us.apache.org/repos/asf/crunch/blob/2c7821fd/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java index 41e0d8e..ffca414 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java @@ -38,29 +38,37 @@ import org.apache.crunch.types.avro.Avros; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.util.ReflectionUtils; import parquet.avro.AvroParquetInputFormat; import parquet.avro.AvroReadSupport; import parquet.filter.UnboundRecordFilter; +import parquet.hadoop.ParquetInputSplit; public class AvroParquetFileSource<T extends IndexedRecord> extends FileSourceImpl<T> implements ReadableSource<T> { + private static final String AVRO_READ_SCHEMA = "parquet.avro.read.schema"; + private final String projSchema; private static <S> FormatBundle<AvroParquetInputFormat> getBundle( AvroType<S> ptype, - Schema extSchema, + Schema projSchema, Class<? extends UnboundRecordFilter> filterClass) { - Schema schema = extSchema == null ? ptype.getSchema() : extSchema; - // Need to check that all fields are accounted for in ptype schema... FormatBundle<AvroParquetInputFormat> fb = FormatBundle.forInput(AvroParquetInputFormat.class) - .set(AvroReadSupport.AVRO_REQUESTED_PROJECTION, schema.toString()) - // ParquetRecordReader expects ParquetInputSplits, not FileSplits, so it - // doesn't work with CombineFileInputFormat - .set(RuntimeParameters.DISABLE_COMBINE_FILE, "true"); + .set(AVRO_READ_SCHEMA, ptype.getSchema().toString()); + + if (projSchema != null) { + fb.set(AvroReadSupport.AVRO_REQUESTED_PROJECTION, projSchema.toString()); + } if (filterClass != null) { fb.set("parquet.read.filter", filterClass.getName()); } + if (!FileSplit.class.isAssignableFrom(ParquetInputSplit.class)) { + // Older ParquetRecordReader expects ParquetInputSplits, not FileSplits, so it + // doesn't work with CombineFileInputFormat + fb.set(RuntimeParameters.DISABLE_COMBINE_FILE, "true"); + } return fb; } @@ -68,16 +76,32 @@ public class AvroParquetFileSource<T extends IndexedRecord> extends FileSourceIm this(ImmutableList.of(path), ptype); } - public AvroParquetFileSource(Path path, AvroType<T> ptype, Schema schema) { - this(ImmutableList.of(path), ptype, schema); + /** + * Read the Parquet data at the given path using the schema of the {@code AvroType}, and projecting + * a subset of the columns from this schema via the separately given {@code Schema}. + * + * @param path the path of the file to read + * @param ptype the AvroType to use in reading the file + * @param projSchema the subset of columns from the input schema to read + */ + public AvroParquetFileSource(Path path, AvroType<T> ptype, Schema projSchema) { + this(ImmutableList.of(path), ptype, projSchema); } public AvroParquetFileSource(List<Path> paths, AvroType<T> ptype) { this(paths, ptype, null, null); } - - public AvroParquetFileSource(List<Path> paths, AvroType<T> ptype, Schema schema) { - this(paths, ptype, schema, null); + + /** + * Read the Parquet data at the given paths using the schema of the {@code AvroType}, and projecting + * a subset of the columns from this schema via the separately given {@code Schema}. + * + * @param paths the list of paths to read + * @param ptype the AvroType to use in reading the file + * @param projSchema the subset of columns from the input schema to read + */ + public AvroParquetFileSource(List<Path> paths, AvroType<T> ptype, Schema projSchema) { + this(paths, ptype, projSchema, null); } public AvroParquetFileSource(List<Path> paths, AvroType<T> ptype, @@ -85,10 +109,19 @@ public class AvroParquetFileSource<T extends IndexedRecord> extends FileSourceIm this(paths, ptype, null, filterClass); } - public AvroParquetFileSource(List<Path> paths, AvroType<T> ptype, Schema schema, + /** + * Read the Parquet data at the given paths using the schema of the {@code AvroType}, projecting + * a subset of the columns from this schema via the separately given {@code Schema}, and using + * the filter class to select the input records. + * + * @param paths the list of paths to read + * @param ptype the AvroType to use in reading the file + * @param projSchema the subset of columns from the input schema to read + */ + public AvroParquetFileSource(List<Path> paths, AvroType<T> ptype, Schema projSchema, Class<? extends UnboundRecordFilter> filterClass) { - super(paths, ptype, getBundle(ptype, schema, filterClass)); - projSchema = schema == null ? null : schema.toString(); + super(paths, ptype, getBundle(ptype, projSchema, filterClass)); + this.projSchema = projSchema == null ? null : projSchema.toString(); } public Schema getProjectedSchema() { @@ -181,4 +214,4 @@ public class AvroParquetFileSource<T extends IndexedRecord> extends FileSourceIm } } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/crunch/blob/2c7821fd/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 569c2e3..1237347 100644 --- a/pom.xml +++ b/pom.xml @@ -75,7 +75,7 @@ under the License. <commons-cli.version>1.2</commons-cli.version> <avro.version>1.7.7</avro.version> <hive.version>0.13.1</hive.version> - <parquet.version>1.3.2</parquet.version> + <parquet.version>1.4.3</parquet.version> <javassist.version>3.16.1-GA</javassist.version> <jackson.version>1.8.8</jackson.version> <protobuf-java.version>2.5.0</protobuf-java.version>
