Updated Branches: refs/heads/master 22965cf87 -> b46c2b8b4
CRUNCH-310: AvroParquetFileSource with builder interface for selecting fields and specifying a filter class. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/b46c2b8b Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/b46c2b8b Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/b46c2b8b Branch: refs/heads/master Commit: b46c2b8b4decdf6825f91f68a296e7671b13a76b Parents: 22965cf Author: Josh Wills <[email protected]> Authored: Tue Dec 10 17:50:39 2013 -0800 Committer: Josh Wills <[email protected]> Committed: Tue Dec 10 22:30:57 2013 -0800 ---------------------------------------------------------------------- .../parquet/AvroParquetFileSourceTargetIT.java | 122 +++++++++++++++++- .../io/parquet/AvroParquetFileSource.java | 125 +++++++++++++++++-- .../io/parquet/AvroParquetFileSourceTarget.java | 3 +- .../io/parquet/AvroParquetFileTarget.java | 4 +- crunch-core/src/test/avro/person.avsc | 2 +- 5 files changed, 240 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/b46c2b8b/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java b/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java index b6d51f2..5c7d9e0 100644 --- a/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java @@ -17,30 +17,40 @@ */ package org.apache.crunch.io.parquet; -import com.google.common.collect.Lists; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + import java.io.File; import java.io.IOException; import java.io.Serializable; import java.util.List; + import org.apache.avro.Schema; import org.apache.avro.generic.GenericData.Record; import org.apache.avro.generic.GenericRecord; +import org.apache.crunch.FilterFn; import org.apache.crunch.PCollection; import org.apache.crunch.Pipeline; +import org.apache.crunch.Target; import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.To; +import org.apache.crunch.io.avro.AvroFileSource; import org.apache.crunch.test.Person; -import org.apache.crunch.test.StringWrapper; import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.test.TemporaryPaths; -import org.apache.crunch.types.avro.AvroType; import org.apache.crunch.types.avro.Avros; import org.apache.hadoop.fs.Path; import org.junit.Before; import org.junit.Rule; import org.junit.Test; + import parquet.avro.AvroParquetWriter; -import static org.junit.Assert.assertEquals; +import com.google.common.collect.Lists; +import parquet.column.ColumnReader; +import parquet.filter.RecordFilter; +import parquet.filter.UnboundRecordFilter; @SuppressWarnings("serial") public class AvroParquetFileSourceTargetIT implements Serializable { @@ -110,5 +120,109 @@ public class AvroParquetFileSourceTargetIT implements Serializable { assertEquals(Lists.newArrayList(savedRecord), Lists.newArrayList(recordList)); } + + @Test + public void testProjectionSpecific() throws IOException { + GenericRecord savedRecord = new Record(Person.SCHEMA$); + savedRecord.put("name", "John Doe"); + savedRecord.put("age", 42); + savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); + populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$); + + Pipeline pipeline = new MRPipeline(AvroParquetFileSourceTargetIT.class, tmpDir.getDefaultConfiguration()); + PCollection<Person> genericCollection = pipeline.read( + AvroParquetFileSource.builder(Person.class) + .includeField("age") + .build(new Path(avroFile.getAbsolutePath()))); + + File outputFile = tmpDir.getFile("output"); + Target avroFile = To.avroFile(outputFile.getAbsolutePath()); + genericCollection.write(avroFile); + pipeline.done(); + + Pipeline pipeline2 = new MRPipeline(AvroParquetFileSourceTargetIT.class, + tmpDir.getDefaultConfiguration()); + PCollection<Person> ageOnly = pipeline2.read( + new AvroFileSource<Person>(new Path(outputFile.getAbsolutePath()), Avros.specifics(Person.class))); + + for (Person person : ageOnly.materialize()) { + assertNull(person.getName()); + assertEquals(person.getAge(), new Integer(42)); + assertNull(person.getSiblingnames()); + } + } + + @Test(expected = IndexOutOfBoundsException.class) + public void testProjectionGeneric() throws IOException { + GenericRecord savedRecord = new Record(Person.SCHEMA$); + savedRecord.put("name", "John Doe"); + savedRecord.put("age", 42); + savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); + populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$); + + AvroParquetFileSource<GenericRecord> src = AvroParquetFileSource.builder(Person.SCHEMA$) + .includeField("age") + .build(new Path(avroFile.getAbsolutePath())); + Pipeline pipeline = new MRPipeline(AvroParquetFileSourceTargetIT.class, tmpDir.getDefaultConfiguration()); + PCollection<GenericRecord> genericCollection = pipeline.read(src); + + File outputFile = tmpDir.getFile("output"); + Target avroFile = To.avroFile(outputFile.getAbsolutePath()); + genericCollection.write(avroFile); + pipeline.done(); + + Pipeline pipeline2 = new MRPipeline(AvroParquetFileSourceTargetIT.class, + tmpDir.getDefaultConfiguration()); + PCollection<Record> ageOnly = pipeline2.read( + new AvroFileSource<Record>(new Path(outputFile.getAbsolutePath()), Avros.generics(src.getProjectedSchema()))); + + for (Record person : ageOnly.materialize()) { + assertEquals(person.get(0), 42); + Object notAge = person.get(1); + } + } + @Test + public void testProjectionFiltered() throws IOException { + GenericRecord savedRecord = new Record(Person.SCHEMA$); + savedRecord.put("name", "John Doe"); + savedRecord.put("age", 42); + savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); + populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$); + + Pipeline pipeline = new MRPipeline(AvroParquetFileSourceTargetIT.class, tmpDir.getDefaultConfiguration()); + PCollection<Person> genericCollection = pipeline.read( + AvroParquetFileSource.builder(Person.class) + .includeField("age") + .filterClass(RejectAllFilter.class) + .build(new Path(avroFile.getAbsolutePath()))); + + File outputFile = tmpDir.getFile("output"); + Target avroFile = To.avroFile(outputFile.getAbsolutePath()); + genericCollection.filter(new FilterFn<Person>() { + @Override + public boolean accept(Person input) { + return input != null; + } + }).write(avroFile); + pipeline.done(); + + Pipeline pipeline2 = new MRPipeline(AvroParquetFileSourceTargetIT.class, + tmpDir.getDefaultConfiguration()); + PCollection<Person> ageOnly = pipeline2.read( + new AvroFileSource<Person>(new Path(outputFile.getAbsolutePath()), Avros.specifics(Person.class))); + assertTrue(Lists.newArrayList(ageOnly.materialize()).isEmpty()); + } + + public static class RejectAllFilter implements UnboundRecordFilter { + @Override + public RecordFilter bind(Iterable<ColumnReader> readers) { + return new RecordFilter() { + @Override + public boolean isMatch() { + return false; + } + }; + } + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/b46c2b8b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java index 76e80ef..41e0d8e 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java @@ -19,34 +19,80 @@ package org.apache.crunch.io.parquet; import java.io.IOException; import java.util.List; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.specific.SpecificRecord; +import org.apache.crunch.ReadableData; import org.apache.crunch.impl.mr.run.RuntimeParameters; import org.apache.crunch.io.FormatBundle; import org.apache.crunch.io.ReadableSource; -import org.apache.crunch.ReadableData; import org.apache.crunch.io.impl.FileSourceImpl; import org.apache.crunch.types.Converter; import org.apache.crunch.types.avro.AvroType; +import org.apache.crunch.types.avro.Avros; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; + +import org.apache.hadoop.util.ReflectionUtils; import parquet.avro.AvroParquetInputFormat; import parquet.avro.AvroReadSupport; +import parquet.filter.UnboundRecordFilter; -public class AvroParquetFileSource<T> extends FileSourceImpl<T> implements ReadableSource<T> { +public class AvroParquetFileSource<T extends IndexedRecord> extends FileSourceImpl<T> implements ReadableSource<T> { - private static <S> FormatBundle<AvroParquetInputFormat> getBundle(AvroType<S> ptype) { - return FormatBundle.forInput(AvroParquetInputFormat.class) - .set(AvroReadSupport.AVRO_REQUESTED_PROJECTION, ptype.getSchema().toString()) + private final String projSchema; + + private static <S> FormatBundle<AvroParquetInputFormat> getBundle( + AvroType<S> ptype, + Schema extSchema, + Class<? extends UnboundRecordFilter> filterClass) { + Schema schema = extSchema == null ? ptype.getSchema() : extSchema; + // Need to check that all fields are accounted for in ptype schema... + FormatBundle<AvroParquetInputFormat> fb = FormatBundle.forInput(AvroParquetInputFormat.class) + .set(AvroReadSupport.AVRO_REQUESTED_PROJECTION, schema.toString()) // ParquetRecordReader expects ParquetInputSplits, not FileSplits, so it // doesn't work with CombineFileInputFormat .set(RuntimeParameters.DISABLE_COMBINE_FILE, "true"); + if (filterClass != null) { + fb.set("parquet.read.filter", filterClass.getName()); + } + return fb; } public AvroParquetFileSource(Path path, AvroType<T> ptype) { - super(path, ptype, getBundle(ptype)); + this(ImmutableList.of(path), ptype); + } + + public AvroParquetFileSource(Path path, AvroType<T> ptype, Schema schema) { + this(ImmutableList.of(path), ptype, schema); } public AvroParquetFileSource(List<Path> paths, AvroType<T> ptype) { - super(paths, ptype, getBundle(ptype)); + this(paths, ptype, null, null); + } + + public AvroParquetFileSource(List<Path> paths, AvroType<T> ptype, Schema schema) { + this(paths, ptype, schema, null); + } + + public AvroParquetFileSource(List<Path> paths, AvroType<T> ptype, + Class<? extends UnboundRecordFilter> filterClass) { + this(paths, ptype, null, filterClass); + } + + public AvroParquetFileSource(List<Path> paths, AvroType<T> ptype, Schema schema, + Class<? extends UnboundRecordFilter> filterClass) { + super(paths, ptype, getBundle(ptype, schema, filterClass)); + projSchema = schema == null ? null : schema.toString(); + } + + public Schema getProjectedSchema() { + return (new Schema.Parser()).parse(projSchema); } @Override @@ -70,6 +116,69 @@ public class AvroParquetFileSource<T> extends FileSourceImpl<T> implements Reada @Override public String toString() { - return "Parquet(" + pathsAsString() + ")"; + return "Parquet(" + pathsAsString() + ((projSchema == null) ? ")" : ") -> " + projSchema); + } + + public static <T extends SpecificRecord> Builder<T> builder(Class<T> clazz) { + return new Builder<T>(Preconditions.checkNotNull(clazz)); + } + + public static Builder<GenericRecord> builder(Schema schema) { + Preconditions.checkNotNull(schema); + Preconditions.checkArgument(Schema.Type.RECORD.equals(schema.getType())); + return new Builder(schema); + } + + /** + * Helper class for constructing an {@code AvroParquetFileSource} that only reads a subset of the + * fields defined in an Avro schema. + */ + public static class Builder<T extends IndexedRecord> { + private Class<T> clazz; + private Schema baseSchema; + private List<Schema.Field> fields = Lists.newArrayList(); + private Class<? extends UnboundRecordFilter> filterClass; + + private Builder(Class<T> clazz) { + this.clazz = clazz; + this.baseSchema = ReflectionUtils.newInstance(clazz, null).getSchema(); + } + + private Builder(Schema baseSchema) { + this.baseSchema = baseSchema; + } + + public Builder includeField(String fieldName) { + Schema.Field field = baseSchema.getField(fieldName); + if (field == null) { + throw new IllegalArgumentException("No field " + fieldName + " in schema: " + baseSchema.getName()); + } + fields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultValue(), field.order())); + return this; + } + + public Builder filterClass(Class<? extends UnboundRecordFilter> filterClass) { + this.filterClass = filterClass; + return this; + } + + public AvroParquetFileSource<T> build(Path path) { + return build(ImmutableList.of(path)); + } + + public AvroParquetFileSource<T> build(List<Path> paths) { + AvroType<T> at = clazz == null ? Avros.generics(baseSchema) : Avros.specifics((Class) clazz); + if (fields.isEmpty()) { + return new AvroParquetFileSource<T>(paths, at, filterClass); + } else { + Schema projected = Schema.createRecord( + baseSchema.getName(), + baseSchema.getDoc(), + baseSchema.getNamespace(), + baseSchema.isError()); + projected.setFields(fields); + return new AvroParquetFileSource<T>(paths, at, projected, filterClass); + } + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/b46c2b8b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTarget.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTarget.java index 8d93eba..359a484 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTarget.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTarget.java @@ -17,13 +17,14 @@ */ package org.apache.crunch.io.parquet; +import org.apache.avro.generic.IndexedRecord; import org.apache.crunch.io.FileNamingScheme; import org.apache.crunch.io.SequentialFileNamingScheme; import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl; import org.apache.crunch.types.avro.AvroType; import org.apache.hadoop.fs.Path; -public class AvroParquetFileSourceTarget<T> extends ReadableSourcePathTargetImpl<T> { +public class AvroParquetFileSourceTarget<T extends IndexedRecord> extends ReadableSourcePathTargetImpl<T> { public AvroParquetFileSourceTarget(Path path, AvroType<T> atype) { this(path, atype, SequentialFileNamingScheme.getInstance()); http://git-wip-us.apache.org/repos/asf/crunch/blob/b46c2b8b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java index c67b9f1..a6a34cd 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java @@ -91,8 +91,8 @@ public class AvroParquetFileTarget extends FileTargetImpl { @Override public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) { - if (ptype instanceof AvroType) { - return new AvroParquetFileSourceTarget<T>(path, (AvroType<T>) ptype); + if (ptype instanceof AvroType && IndexedRecord.class.isAssignableFrom(((AvroType) ptype).getTypeClass())) { + return new AvroParquetFileSourceTarget(path, (AvroType<T>) ptype); } return null; } http://git-wip-us.apache.org/repos/asf/crunch/blob/b46c2b8b/crunch-core/src/test/avro/person.avsc ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/avro/person.avsc b/crunch-core/src/test/avro/person.avsc index babd808..e0fff30 100644 --- a/crunch-core/src/test/avro/person.avsc +++ b/crunch-core/src/test/avro/person.avsc @@ -22,5 +22,5 @@ "fields": [ {"name": "name", "type": ["string", "null"] }, {"name": "age", "type": "int"}, - {"name": "siblingnames", "type": {"type": "array", "items": "string"}} ] + {"name": "siblingnames", "type" : [{ "type": "array", "items": "string" }, "null"], "default": null } ] }
