Repository: beam Updated Branches: refs/heads/master 71196ec9c -> d919394c7
Introduces AvroIO.readAll() and readAllGenericRecords() Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ee1bcbae Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ee1bcbae Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ee1bcbae Branch: refs/heads/master Commit: ee1bcbae08fb221e392175fbd0387594653d4a86 Parents: eaf0b36 Author: Eugene Kirpichov <[email protected]> Authored: Fri Jul 21 14:09:35 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Tue Jul 25 17:36:49 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/AvroIO.java | 132 +++++++++++++++++-- .../java/org/apache/beam/sdk/io/AvroUtils.java | 40 ++++++ .../java/org/apache/beam/sdk/io/AvroIOTest.java | 65 ++++++++- 3 files changed, 223 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ee1bcbae/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index d308c85..f201114 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -21,6 +21,8 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import com.google.auto.value.AutoValue; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.io.BaseEncoding; @@ -54,14 +56,17 @@ import org.apache.beam.sdk.values.PDone; * {@link PTransform}s for reading and writing Avro files. * * <p>To read a {@link PCollection} from one or more Avro files, use {@code AvroIO.read()}, using - * {@link AvroIO.Read#from} to specify the filename or filepattern to read from. See {@link - * FileSystems} for information on supported file systems and filepatterns. + * {@link AvroIO.Read#from} to specify the filename or filepattern to read from. Alternatively, if + * the filepatterns to be read are themselves in a {@link PCollection}, apply {@link #readAll}. + * + * <p>See {@link FileSystems} for information on supported file systems and filepatterns. * * <p>To read specific records, such as Avro-generated classes, use {@link #read(Class)}. To read * {@link GenericRecord GenericRecords}, use {@link #readGenericRecords(Schema)} which takes a * {@link Schema} object, or {@link #readGenericRecords(String)} which takes an Avro schema in a * JSON-encoded string form. An exception will be thrown if a record doesn't match the specified - * schema. + * schema. Likewise, to read a {@link PCollection} of filepatterns, apply {@link + * #readAllGenericRecords}. * * <p>For example: * @@ -79,6 +84,18 @@ import org.apache.beam.sdk.values.PDone; * .from("gs://my_bucket/path/to/records-*.avro")); * }</pre> * + * <p>Reading from a {@link PCollection} of filepatterns: + * + * <pre>{@code + * Pipeline p = ...; + * + * PCollection<String> filepatterns = p.apply(...); + * PCollection<AvroAutoGenClass> records = + * filepatterns.apply(AvroIO.read(AvroAutoGenClass.class)); + * PCollection<GenericRecord> genericRecords = + * filepatterns.apply(AvroIO.readGenericRecords(schema)); + * }</pre> + * * <p>To write a {@link PCollection} to one or more Avro files, use {@link AvroIO.Write}, using * {@code AvroIO.write().to(String)} to specify the output filename prefix. The default {@link * DefaultFilenamePolicy} will use this prefix, in conjunction with a {@link ShardNameTemplate} (set @@ -133,6 +150,18 @@ public class AvroIO { .build(); } + /** Like {@link #read}, but reads each filepattern in the input {@link PCollection}. */ + public static <T> ReadAll<T> readAll(Class<T> recordClass) { + return new AutoValue_AvroIO_ReadAll.Builder<T>() + .setRecordClass(recordClass) + .setSchema(ReflectData.get().getSchema(recordClass)) + // 64MB is a reasonable value that allows to amortize the cost of opening files, + // but is not so large as to exhaust a typical runner's maximum amount of output per + // ProcessElement call. + .setDesiredBundleSizeBytes(64 * 1024 * 1024L) + .build(); + } + /** Reads Avro file(s) containing records of the specified schema. */ public static Read<GenericRecord> readGenericRecords(Schema schema) { return new AutoValue_AvroIO_Read.Builder<GenericRecord>() @@ -142,6 +171,17 @@ public class AvroIO { } /** + * Like {@link #readGenericRecords(Schema)}, but reads each filepattern in the input {@link + * PCollection}. + */ + public static ReadAll<GenericRecord> readAllGenericRecords(Schema schema) { + return new AutoValue_AvroIO_ReadAll.Builder<GenericRecord>() + .setRecordClass(GenericRecord.class) + .setSchema(schema) + .build(); + } + + /** * Reads Avro file(s) containing records of the specified schema. The schema is specified as a * JSON-encoded string. */ @@ -150,6 +190,14 @@ public class AvroIO { } /** + * Like {@link #readGenericRecords(String)}, but reads each filepattern in the input {@link + * PCollection}. + */ + public static ReadAll<GenericRecord> readAllGenericRecords(String schema) { + return readAllGenericRecords(new Schema.Parser().parse(schema)); + } + + /** * Writes a {@link PCollection} to an Avro file (or multiple Avro files matching a sharding * pattern). */ @@ -217,14 +265,12 @@ public class AvroIO { public PCollection<T> expand(PBegin input) { checkNotNull(getFilepattern(), "filepattern"); checkNotNull(getSchema(), "schema"); - - @SuppressWarnings("unchecked") - AvroSource<T> source = - getRecordClass() == GenericRecord.class - ? (AvroSource<T>) AvroSource.from(getFilepattern()).withSchema(getSchema()) - : AvroSource.from(getFilepattern()).withSchema(getRecordClass()); - - return input.getPipeline().apply("Read", org.apache.beam.sdk.io.Read.from(source)); + return input + .getPipeline() + .apply( + "Read", + org.apache.beam.sdk.io.Read.from( + createSource(getFilepattern(), getRecordClass(), getSchema()))); } @Override @@ -233,6 +279,70 @@ public class AvroIO { builder.addIfNotNull( DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern")); } + + @SuppressWarnings("unchecked") + private static <T> AvroSource<T> createSource( + ValueProvider<String> filepattern, Class<T> recordClass, Schema schema) { + return recordClass == GenericRecord.class + ? (AvroSource<T>) AvroSource.from(filepattern).withSchema(schema) + : AvroSource.from(filepattern).withSchema(recordClass); + } + } + + ///////////////////////////////////////////////////////////////////////////// + + /** Implementation of {@link #readAll}. */ + @AutoValue + public abstract static class ReadAll<T> extends PTransform<PCollection<String>, PCollection<T>> { + @Nullable abstract Class<T> getRecordClass(); + @Nullable abstract Schema getSchema(); + abstract long getDesiredBundleSizeBytes(); + + abstract Builder<T> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<T> { + abstract Builder<T> setRecordClass(Class<T> recordClass); + abstract Builder<T> setSchema(Schema schema); + abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes); + + abstract ReadAll<T> build(); + } + + @VisibleForTesting + ReadAll<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) { + return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build(); + } + + @Override + public PCollection<T> expand(PCollection<String> input) { + checkNotNull(getSchema(), "schema"); + return input + .apply( + "Read all via FileBasedSource", + new ReadAllViaFileBasedSource<>( + SerializableFunctions.<String, Boolean>constant(true) /* isSplittable */, + getDesiredBundleSizeBytes(), + new CreateSourceFn<>(getRecordClass(), getSchema().toString()))) + .setCoder(AvroCoder.of(getRecordClass(), getSchema())); + } + } + + private static class CreateSourceFn<T> + implements SerializableFunction<String, FileBasedSource<T>> { + private final Class<T> recordClass; + private final Supplier<Schema> schemaSupplier; + + public CreateSourceFn(Class<T> recordClass, String jsonSchema) { + this.recordClass = recordClass; + this.schemaSupplier = AvroUtils.serializableSchemaSupplier(jsonSchema); + } + + @Override + public FileBasedSource<T> apply(String input) { + return Read.createSource( + StaticValueProvider.of(input), recordClass, schemaSupplier.get()); + } } ///////////////////////////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/beam/blob/ee1bcbae/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroUtils.java new file mode 100644 index 0000000..65c5bf1 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroUtils.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import com.google.common.base.Function; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import java.io.Serializable; +import org.apache.avro.Schema; + +/** Helpers for working with Avro. */ +class AvroUtils { + /** Helper to get around the fact that {@link Schema} itself is not serializable. */ + public static Supplier<Schema> serializableSchemaSupplier(String jsonSchema) { + return Suppliers.memoize( + Suppliers.compose(new JsonToSchema(), Suppliers.ofInstance(jsonSchema))); + } + + private static class JsonToSchema implements Function<String, Schema>, Serializable { + @Override + public Schema apply(String input) { + return new Schema.Parser().parse(input); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ee1bcbae/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index 4380c57..df5d26c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -30,6 +30,7 @@ import static org.junit.Assert.assertTrue; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import java.io.File; @@ -152,10 +153,68 @@ public class AvroIOTest { .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()).withoutSharding()); writePipeline.run().waitUntilFinish(); - PCollection<GenericClass> input = - readPipeline.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath())); + // Test both read() and readAll() + PAssert.that( + readPipeline.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath()))) + .containsInAnyOrder(values); + PAssert.that( + readPipeline + .apply(Create.of(outputFile.getAbsolutePath())) + .apply(AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10))) + .containsInAnyOrder(values); + + readPipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testAvroIOWriteAndReadMultipleFilepatterns() throws Throwable { + List<GenericClass> firstValues = Lists.newArrayList(); + List<GenericClass> secondValues = Lists.newArrayList(); + for (int i = 0; i < 10; ++i) { + firstValues.add(new GenericClass(i, "a" + i)); + secondValues.add(new GenericClass(i, "b" + i)); + } + writePipeline + .apply("Create first", Create.of(firstValues)) + .apply( + "Write first", + AvroIO.write(GenericClass.class) + .to(tmpFolder.getRoot().getAbsolutePath() + "/first") + .withNumShards(2)); + writePipeline + .apply("Create second", Create.of(secondValues)) + .apply( + "Write second", + AvroIO.write(GenericClass.class) + .to(tmpFolder.getRoot().getAbsolutePath() + "/second") + .withNumShards(3)); + writePipeline.run().waitUntilFinish(); + + // Test both read() and readAll() + PAssert.that( + readPipeline.apply( + "Read first", + AvroIO.read(GenericClass.class) + .from(tmpFolder.getRoot().getAbsolutePath() + "/first*"))) + .containsInAnyOrder(firstValues); + PAssert.that( + readPipeline.apply( + "Read second", + AvroIO.read(GenericClass.class) + .from(tmpFolder.getRoot().getAbsolutePath() + "/second*"))) + .containsInAnyOrder(secondValues); + PAssert.that( + readPipeline + .apply( + "Create paths", + Create.of( + tmpFolder.getRoot().getAbsolutePath() + "/first*", + tmpFolder.getRoot().getAbsolutePath() + "/second*")) + .apply( + "Read all", AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10))) + .containsInAnyOrder(Iterables.concat(firstValues, secondValues)); - PAssert.that(input).containsInAnyOrder(values); readPipeline.run(); }
