aromanenko-dev commented on code in PR #24294: URL: https://github.com/apache/beam/pull/24294#discussion_r1054585075
########## sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java: ########## @@ -0,0 +1,2026 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.avro.io; + +import static org.apache.beam.sdk.io.FileIO.ReadMatches.DirectoryTreatment; +import static org.apache.beam.sdk.io.ReadAllViaFileBasedSource.ReadFileRangesFnExceptionHandler; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; + +import com.google.auto.value.AutoValue; +import java.io.IOException; +import java.io.Serializable; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.util.Map; +import org.apache.avro.Schema; +import org.apache.avro.file.CodecFactory; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; +import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils; +import org.apache.beam.sdk.io.DefaultFilenamePolicy; +import org.apache.beam.sdk.io.FileBasedSink; +import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; +import org.apache.beam.sdk.io.FileBasedSource; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.FileIO.MatchConfiguration; +import org.apache.beam.sdk.io.FileIO.ReadableFile; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.ReadAllViaFileBasedSource; +import org.apache.beam.sdk.io.ShardNameTemplate; +import org.apache.beam.sdk.io.WriteFiles; +import org.apache.beam.sdk.io.WriteFilesResult; +import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SerializableFunctions; +import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Function; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; + +/** + * {@link PTransform}s for reading and writing Avro files. + * + * <h2>Reading Avro files</h2> + * + * <p>To read a {@link PCollection} from one or more Avro files with the same schema known at + * pipeline construction time, use {@link #read}, using {@link Read#from} to specify the filename or + * filepattern to read from. If the filepatterns to be read are themselves in a {@link PCollection} + * you can use {@link FileIO} to match them and {@link AvroIO#readFiles} to read them. If the schema + * is unknown at pipeline construction time, use {@link #parseGenericRecords} or {@link + * #parseFilesGenericRecords}. + * + * <p>Many configuration options below apply to several or all of these transforms. + * + * <p>See {@link FileSystems} for information on supported file systems and filepatterns. + * + * <h3>Filepattern expansion and watching</h3> + * + * <p>By default, the filepatterns are expanded only once. {@link Read#watchForNewFiles} or the + * combination of {@link FileIO.Match#continuously(Duration, TerminationCondition)} and {@link + * AvroIO#readFiles(Class)} allow streaming of new files matching the filepattern(s). + * + * <p>By default, {@link #read} prohibits filepatterns that match no files, and {@link + * AvroIO#readFiles(Class)} allows them in case the filepattern contains a glob wildcard character. + * Use {@link Read#withEmptyMatchTreatment} or {@link + * FileIO.Match#withEmptyMatchTreatment(EmptyMatchTreatment)} plus {@link AvroIO#readFiles(Class)} + * to configure this behavior. + * + * <h3>Reading records of a known schema</h3> + * + * <p>To read specific records, such as Avro-generated classes, use {@link #read(Class)}. To read + * {@link GenericRecord GenericRecords}, use {@link #readGenericRecords(Schema)} which takes a + * {@link Schema} object, or {@link #readGenericRecords(String)} which takes an Avro schema in a + * JSON-encoded string form. An exception will be thrown if a record doesn't match the specified + * schema. Likewise, to read a {@link PCollection} of filepatterns, apply {@link FileIO} matching + * plus {@link #readFilesGenericRecords}. + * + * <p>For example: + * + * <pre>{@code + * Pipeline p = ...; + * + * // Read Avro-generated classes from files on GCS + * PCollection<AvroAutoGenClass> records = + * p.apply(AvroIO.read(AvroAutoGenClass.class).from("gs://my_bucket/path/to/records-*.avro")); + * + * // Read GenericRecord's of the given schema from files on GCS + * Schema schema = new Schema.Parser().parse(new File("schema.avsc")); + * PCollection<GenericRecord> records = + * p.apply(AvroIO.readGenericRecords(schema) + * .from("gs://my_bucket/path/to/records-*.avro")); + * }</pre> + * + * <h3>Reading records of an unknown schema</h3> + * + * <p>To read records from files whose schema is unknown at pipeline construction time or differs + * between files, use {@link #parseGenericRecords} - in this case, you will need to specify a + * parsing function for converting each {@link GenericRecord} into a value of your custom type. + * Likewise, to read a {@link PCollection} of filepatterns with unknown schema, use {@link FileIO} + * matching plus {@link #parseFilesGenericRecords(SerializableFunction)}. + * + * <p>For example: + * + * <pre>{@code + * Pipeline p = ...; + * + * PCollection<Foo> records = + * p.apply(AvroIO.parseGenericRecords(new SerializableFunction<GenericRecord, Foo>() { + * public Foo apply(GenericRecord record) { + * // If needed, access the schema of the record using record.getSchema() + * return ...; + * } + * })); + * }</pre> + * + * <h3>Reading from a {@link PCollection} of filepatterns</h3> + * + * <pre>{@code + * Pipeline p = ...; + * + * PCollection<String> filepatterns = p.apply(...); + * PCollection<AvroAutoGenClass> records = + * filepatterns.apply(AvroIO.readAll(AvroAutoGenClass.class)); + * PCollection<AvroAutoGenClass> records = + * filepatterns + * .apply(FileIO.matchAll()) + * .apply(FileIO.readMatches()) + * .apply(AvroIO.readFiles(AvroAutoGenClass.class)); + * PCollection<GenericRecord> genericRecords = + * filepatterns.apply(AvroIO.readGenericRecords(schema)); + * PCollection<Foo> records = + * filepatterns + * .apply(FileIO.matchAll()) + * .apply(FileIO.readMatches()) + * .apply(AvroIO.parseFilesGenericRecords(new SerializableFunction...); + * }</pre> + * + * <h3>Streaming new files matching a filepattern</h3> + * + * <pre>{@code + * Pipeline p = ...; + * + * PCollection<AvroAutoGenClass> lines = p.apply(AvroIO + * .read(AvroAutoGenClass.class) + * .from("gs://my_bucket/path/to/records-*.avro") + * .watchForNewFiles( + * // Check for new files every minute + * Duration.standardMinutes(1), + * // Stop watching the filepattern if no new files appear within an hour + * afterTimeSinceNewOutput(Duration.standardHours(1)))); + * }</pre> + * + * <h3>Reading a very large number of files</h3> + * + * <p>If it is known that the filepattern will match a very large number of files (e.g. tens of + * thousands or more), use {@link Read#withHintMatchesManyFiles} for better performance and + * scalability. Note that it may decrease performance if the filepattern matches only a small number + * of files. + * + * <h3>Inferring Beam schemas from Avro files</h3> + * + * <p>If you want to use SQL or schema based operations on an Avro-based PCollection, you must + * configure the read transform to infer the Beam schema and automatically setup the Beam related + * coders by doing: + * + * <pre>{@code + * PCollection<AvroAutoGenClass> records = + * p.apply(AvroIO.read(...).from(...).withBeamSchemas(true)); + * }</pre> + * + * <h3>Inferring Beam schemas from Avro PCollections</h3> + * + * <p>If you created an Avro-based PCollection by other means e.g. reading records from Kafka or as + * the output of another PTransform, you may be interested on making your PCollection schema-aware + * so you can use the Schema-based APIs or Beam's SqlTransform. + * + * <p>If you are using Avro specific records (generated classes from an Avro schema), you can + * register a schema provider for the specific Avro class to make any PCollection of these objects + * schema-aware. + * + * <pre>{@code + * pipeline.getSchemaRegistry().registerSchemaProvider(AvroAutoGenClass.class, AvroAutoGenClass.getClassSchema()); + * }</pre> + * + * You can also manually set an Avro-backed Schema coder for a PCollection using {@link + * AvroUtils#schemaCoder(Class, Schema)} to make it schema-aware. + * + * <pre>{@code + * PCollection<AvroAutoGenClass> records = ... + * AvroCoder<AvroAutoGenClass> coder = (AvroCoder<AvroAutoGenClass>) users.getCoder(); + * records.setCoder(AvroUtils.schemaCoder(coder.getType(), coder.getSchema())); + * }</pre> + * + * <p>If you are using GenericRecords you may need to set a specific Beam schema coder for each + * PCollection to match their internal Avro schema. + * + * <pre>{@code + * org.apache.avro.Schema avroSchema = ... + * PCollection<GenericRecord> records = ... + * records.setCoder(AvroUtils.schemaCoder(avroSchema)); + * }</pre> + * + * <h2>Writing Avro files</h2> + * + * <p>To write a {@link PCollection} to one or more Avro files, use {@link Write}, using {@code + * AvroIO.write().to(String)} to specify the output filename prefix. The default {@link + * DefaultFilenamePolicy} will use this prefix, in conjunction with a {@link ShardNameTemplate} (set + * via {@link Write#withShardNameTemplate(String)}) and optional filename suffix (set via {@link + * Write#withSuffix(String)}, to generate output filenames in a sharded way. You can override this + * default write filename policy using {@link Write#to(FilenamePolicy)} to specify a custom file + * naming policy. + * + * <p>By default, {@link Write} produces output files that are compressed using the {@link + * org.apache.avro.file.Codec CodecFactory.snappyCodec()}. This default can be changed or overridden + * using {@link Write#withCodec}. + * + * <h3>Writing specific or generic records</h3> + * + * <p>To write specific records, such as Avro-generated classes, use {@link #write(Class)}. To write + * {@link GenericRecord GenericRecords}, use either {@link #writeGenericRecords(Schema)} which takes + * a {@link Schema} object, or {@link #writeGenericRecords(String)} which takes a schema in a + * JSON-encoded string form. An exception will be thrown if a record doesn't match the specified + * schema. + * + * <p>For example: + * + * <pre>{@code + * // A simple Write to a local file (only runs locally): + * PCollection<AvroAutoGenClass> records = ...; + * records.apply(AvroIO.write(AvroAutoGenClass.class).to("/path/to/file.avro")); + * + * // A Write to a sharded GCS file (runs locally and using remote execution): + * Schema schema = new Schema.Parser().parse(new File("schema.avsc")); + * PCollection<GenericRecord> records = ...; + * records.apply("WriteToAvro", AvroIO.writeGenericRecords(schema) + * .to("gs://my_bucket/path/to/numbers") + * .withSuffix(".avro")); + * }</pre> + * + * <h3>Writing windowed or unbounded data</h3> + * + * <p>By default, all input is put into the global window before writing. If per-window writes are + * desired - for example, when using a streaming runner - {@link Write#withWindowedWrites()} will + * cause windowing and triggering to be preserved. When producing windowed writes with a streaming + * runner that supports triggers, the number of output shards must be set explicitly using {@link + * Write#withNumShards(int)}; some runners may set this for you to a runner-chosen value, so you may + * need not set it yourself. A {@link FilenamePolicy} must be set, and unique windows and triggers + * must produce unique filenames. + * + * <h3>Writing data to multiple destinations</h3> + * + * <p>The following shows a more-complex example of AvroIO.Write usage, generating dynamic file + * destinations as well as a dynamic Avro schema per file. In this example, a PCollection of user + * events (e.g. actions on a website) is written out to Avro files. Each event contains the user id + * as an integer field. We want events for each user to go into a specific directory for that user, + * and each user's data should be written with a specific schema for that user; a side input is + * used, so the schema can be calculated in a different stage. + * + * <pre>{@code + * // This is the user class that controls dynamic destinations for this avro write. The input to + * // AvroIO.Write will be UserEvent, and we will be writing GenericRecords to the file (in order + * // to have dynamic schemas). Everything is per userid, so we define a dynamic destination type + * // of Integer. + * class UserDynamicAvroDestinations + * extends DynamicAvroDestinations<UserEvent, Integer, GenericRecord> { + * private final PCollectionView<Map<Integer, String>> userToSchemaMap; + * public UserDynamicAvroDestinations( PCollectionView<Map<Integer, String>> userToSchemaMap) { + * this.userToSchemaMap = userToSchemaMap; + * } + * public GenericRecord formatRecord(UserEvent record) { + * return formatUserRecord(record, getSchema(record.getUserId())); + * } + * public Schema getSchema(Integer userId) { + * return new Schema.Parser().parse(sideInput(userToSchemaMap).get(userId)); + * } + * public Integer getDestination(UserEvent record) { + * return record.getUserId(); + * } + * public Integer getDefaultDestination() { + * return 0; + * } + * public FilenamePolicy getFilenamePolicy(Integer userId) { + * return DefaultFilenamePolicy.fromParams(new Params().withBaseFilename(baseDir + "/user-" + * + userId + "/events")); + * } + * public List<PCollectionView<?>> getSideInputs() { + * return ImmutableList.<PCollectionView<?>>of(userToSchemaMap); + * } + * } + * PCollection<UserEvents> events = ...; + * PCollectionView<Map<Integer, String>> userToSchemaMap = events.apply( + * "ComputePerUserSchemas", new ComputePerUserSchemas()); + * events.apply("WriteAvros", AvroIO.<Integer>writeCustomTypeToGenericRecords() + * .to(new UserDynamicAvroDestinations(userToSchemaMap))); + * }</pre> + */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class AvroIO { + /** + * Reads records of the given type from an Avro file (or multiple Avro files matching a pattern). + * + * <p>The schema must be specified using one of the {@code withSchema} functions. + */ + public static <T> Read<T> read(Class<T> recordClass) { + return new AutoValue_AvroIO_Read.Builder<T>() + .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW)) + .setRecordClass(recordClass) + .setSchema(ReflectData.get().getSchema(recordClass)) + .setInferBeamSchema(false) + .setHintMatchesManyFiles(false) + .build(); + } + + /** + * Like {@link #read}, but reads each file in a {@link PCollection} of {@link ReadableFile}, + * returned by {@link FileIO#readMatches}. + * + * <p>You can read {@link GenericRecord} by using {@code #readFiles(GenericRecord.class)} or + * {@code #readFiles(new Schema.Parser().parse(schema))} if the schema is a String. + */ + public static <T> ReadFiles<T> readFiles(Class<T> recordClass) { + return new AutoValue_AvroIO_ReadFiles.Builder<T>() + .setRecordClass(recordClass) + .setSchema(ReflectData.get().getSchema(recordClass)) + .setInferBeamSchema(false) + .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES) + .setUsesReshuffle(ReadAllViaFileBasedSource.DEFAULT_USES_RESHUFFLE) + .setFileExceptionHandler(new ReadFileRangesFnExceptionHandler()) + .build(); + } + + /** + * Like {@link #read}, but reads each filepattern in the input {@link PCollection}. + * + * @deprecated You can achieve The functionality of {@link #readAll} using {@link FileIO} matching + * plus {@link #readFiles(Class)}. This is the preferred method to make composition explicit. + * {@link ReadAll} will not receive upgrades and will be removed in a future version of Beam. + */ + @Deprecated + public static <T> ReadAll<T> readAll(Class<T> recordClass) { + return new AutoValue_AvroIO_ReadAll.Builder<T>() + .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD)) + .setRecordClass(recordClass) + .setSchema(ReflectData.get().getSchema(recordClass)) + .setInferBeamSchema(false) + .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES) + .build(); + } + + /** Reads Avro file(s) containing records of the specified schema. */ + public static Read<GenericRecord> readGenericRecords(Schema schema) { + return new AutoValue_AvroIO_Read.Builder<GenericRecord>() + .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW)) + .setRecordClass(GenericRecord.class) + .setSchema(schema) + .setInferBeamSchema(false) + .setHintMatchesManyFiles(false) + .build(); + } + + /** + * Like {@link #readGenericRecords(Schema)}, but for a {@link PCollection} of {@link + * ReadableFile}, for example, returned by {@link FileIO#readMatches}. + */ + public static ReadFiles<GenericRecord> readFilesGenericRecords(Schema schema) { + return new AutoValue_AvroIO_ReadFiles.Builder<GenericRecord>() + .setRecordClass(GenericRecord.class) + .setSchema(schema) + .setInferBeamSchema(false) + .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES) + .setUsesReshuffle(ReadAllViaFileBasedSource.DEFAULT_USES_RESHUFFLE) + .setFileExceptionHandler(new ReadFileRangesFnExceptionHandler()) + .build(); + } + + /** + * Like {@link #readGenericRecords(Schema)}, but for a {@link PCollection} of {@link + * ReadableFile}, for example, returned by {@link FileIO#readMatches}. + * + * @deprecated You can achieve The functionality of {@link #readAllGenericRecords(Schema)} using + * {@link FileIO} matching plus {@link #readFilesGenericRecords(Schema)}. This is the + * preferred method to make composition explicit. {@link ReadAll} will not receive upgrades + * and will be removed in a future version of Beam. + */ + @Deprecated + public static ReadAll<GenericRecord> readAllGenericRecords(Schema schema) { + return new AutoValue_AvroIO_ReadAll.Builder<GenericRecord>() + .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD)) + .setRecordClass(GenericRecord.class) + .setSchema(schema) + .setInferBeamSchema(false) + .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES) + .build(); + } + + /** + * Reads Avro file(s) containing records of the specified schema. The schema is specified as a + * JSON-encoded string. + */ + public static Read<GenericRecord> readGenericRecords(String schema) { + return readGenericRecords(new Schema.Parser().parse(schema)); + } + + /** Like {@link #readGenericRecords(String)}, but for {@link ReadableFile} collections. */ + public static ReadFiles<GenericRecord> readFilesGenericRecords(String schema) { + return readFilesGenericRecords(new Schema.Parser().parse(schema)); + } + + /** + * Like {@link #readGenericRecords(String)}, but reads each filepattern in the input {@link + * PCollection}. + * + * @deprecated You can achieve The functionality of {@link #readAllGenericRecords(String)} using + * {@link FileIO} matching plus {@link #readFilesGenericRecords(String)}. This is the + * preferred method to make composition explicit. {@link ReadAll} will not receive upgrades + * and will be removed in a future version of Beam. + */ + @Deprecated + public static ReadAll<GenericRecord> readAllGenericRecords(String schema) { + return readAllGenericRecords(new Schema.Parser().parse(schema)); + } + + /** + * Reads Avro file(s) containing records of an unspecified schema and converting each record to a + * custom type. + */ + public static <T> Parse<T> parseGenericRecords(SerializableFunction<GenericRecord, T> parseFn) { + return new AutoValue_AvroIO_Parse.Builder<T>() + .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW)) + .setParseFn(parseFn) + .setHintMatchesManyFiles(false) + .build(); + } + + /** + * Like {@link #parseGenericRecords(SerializableFunction)}, but reads each {@link ReadableFile} in + * the input {@link PCollection}. + */ + public static <T> ParseFiles<T> parseFilesGenericRecords( + SerializableFunction<GenericRecord, T> parseFn) { + return new AutoValue_AvroIO_ParseFiles.Builder<T>() + .setParseFn(parseFn) + .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES) + .setUsesReshuffle(ReadAllViaFileBasedSource.DEFAULT_USES_RESHUFFLE) + .setFileExceptionHandler(new ReadFileRangesFnExceptionHandler()) + .build(); + } + + /** + * Like {@link #parseGenericRecords(SerializableFunction)}, but reads each filepattern in the + * input {@link PCollection}. + * + * @deprecated You can achieve The functionality of {@link + * #parseAllGenericRecords(SerializableFunction)} using {@link FileIO} matching plus {@link + * #parseFilesGenericRecords(SerializableFunction)} ()}. This is the preferred method to make + * composition explicit. {@link ParseAll} will not receive upgrades and will be removed in a + * future version of Beam. + */ + @Deprecated + public static <T> ParseAll<T> parseAllGenericRecords( + SerializableFunction<GenericRecord, T> parseFn) { + return new AutoValue_AvroIO_ParseAll.Builder<T>() + .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD)) + .setParseFn(parseFn) + .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES) + .build(); + } + + /** + * Writes a {@link PCollection} to an Avro file (or multiple Avro files matching a sharding + * pattern). + */ + public static <T> Write<T> write(Class<T> recordClass) { + return new Write<>( + AvroIO.<T, T>defaultWriteBuilder() + .setGenericRecords(false) + .setSchema(ReflectData.get().getSchema(recordClass)) + .build()); + } + + /** Writes Avro records of the specified schema. */ + public static Write<GenericRecord> writeGenericRecords(Schema schema) { + return new Write<>( + AvroIO.<GenericRecord, GenericRecord>defaultWriteBuilder() + .setGenericRecords(true) + .setSchema(schema) + .build()); + } + + /** + * A {@link PTransform} that writes a {@link PCollection} to an avro file (or multiple avro files + * matching a sharding pattern), with each element of the input collection encoded into its own + * record of type OutputT. + * + * <p>This version allows you to apply {@link AvroIO} writes to a PCollection of a custom type + * {@link UserT}. A format mechanism that converts the input type {@link UserT} to the output type + * that will be written to the file must be specified. If using a custom {@link + * DynamicAvroDestinations} object this is done using {@link + * DynamicAvroDestinations#formatRecord}, otherwise the {@link TypedWrite#withFormatFunction} can + * be used to specify a format function. + * + * <p>The advantage of using a custom type is that is it allows a user-provided {@link + * DynamicAvroDestinations} object, set via {@link Write#to(DynamicAvroDestinations)} to examine + * the custom type when choosing a destination. + * + * <p>If the output type is {@link GenericRecord} use {@link #writeCustomTypeToGenericRecords()} + * instead. + */ + public static <UserT, OutputT> TypedWrite<UserT, Void, OutputT> writeCustomType() { + return AvroIO.<UserT, OutputT>defaultWriteBuilder().setGenericRecords(false).build(); + } + + /** + * Similar to {@link #writeCustomType()}, but specialized for the case where the output type is + * {@link GenericRecord}. A schema must be specified either in {@link + * DynamicAvroDestinations#getSchema} or if not using dynamic destinations, by using {@link + * TypedWrite#withSchema(Schema)}. + */ + public static <UserT> TypedWrite<UserT, Void, GenericRecord> writeCustomTypeToGenericRecords() { + return AvroIO.<UserT, GenericRecord>defaultWriteBuilder().setGenericRecords(true).build(); + } + + /** + * Writes Avro records of the specified schema. The schema is specified as a JSON-encoded string. + */ + public static Write<GenericRecord> writeGenericRecords(String schema) { + return writeGenericRecords(new Schema.Parser().parse(schema)); + } + + private static <UserT, OutputT> TypedWrite.Builder<UserT, Void, OutputT> defaultWriteBuilder() { + return new AutoValue_AvroIO_TypedWrite.Builder<UserT, Void, OutputT>() + .setFilenameSuffix(null) + .setShardTemplate(null) + .setNumShards(0) + .setCodec(TypedWrite.DEFAULT_SERIALIZABLE_CODEC) + .setMetadata(ImmutableMap.of()) + .setWindowedWrites(false) + .setNoSpilling(false); + } + + @Experimental(Kind.SCHEMAS) + private static <T> PCollection<T> setBeamSchema( + PCollection<T> pc, Class<T> clazz, @Nullable Schema schema) { + return pc.setCoder(AvroUtils.schemaCoder(clazz, schema)); + } + + /** + * 64MB is a reasonable value that allows to amortize the cost of opening files, but is not so + * large as to exhaust a typical runner's maximum amount of output per ProcessElement call. + */ + private static final long DEFAULT_BUNDLE_SIZE_BYTES = 64 * 1024 * 1024L; + + /** Implementation of {@link #read} and {@link #readGenericRecords}. */ + @AutoValue + public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> { + + abstract @Nullable ValueProvider<String> getFilepattern(); + + abstract MatchConfiguration getMatchConfiguration(); + + abstract @Nullable Class<T> getRecordClass(); + + abstract @Nullable Schema getSchema(); + + abstract boolean getInferBeamSchema(); + + abstract boolean getHintMatchesManyFiles(); + + abstract Builder<T> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<T> { + abstract Builder<T> setFilepattern(ValueProvider<String> filepattern); + + abstract Builder<T> setMatchConfiguration(MatchConfiguration matchConfiguration); + + abstract Builder<T> setRecordClass(Class<T> recordClass); + + abstract Builder<T> setSchema(Schema schema); + + abstract Builder<T> setInferBeamSchema(boolean infer); + + abstract Builder<T> setHintMatchesManyFiles(boolean hintManyFiles); + + abstract Read<T> build(); + } + + /** + * Reads from the given filename or filepattern. + * + * <p>If it is known that the filepattern will match a very large number of files (at least tens + * of thousands), use {@link #withHintMatchesManyFiles} for better performance and scalability. + */ + public Read<T> from(ValueProvider<String> filepattern) { + return toBuilder().setFilepattern(filepattern).build(); + } + + /** Like {@link #from(ValueProvider)}. */ + public Read<T> from(String filepattern) { + return from(StaticValueProvider.of(filepattern)); + } + + /** Sets the {@link MatchConfiguration}. */ + public Read<T> withMatchConfiguration(MatchConfiguration matchConfiguration) { + return toBuilder().setMatchConfiguration(matchConfiguration).build(); + } + + /** Configures whether or not a filepattern matching no files is allowed. */ + public Read<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) { + return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment)); + } + + /** + * Continuously watches for new files matching the filepattern, polling it at the given + * interval, until the given termination condition is reached. The returned {@link PCollection} + * is unbounded. If {@code matchUpdatedFiles} is set, also watches for files with timestamp + * change. + * + * <p>This works only in runners supporting splittable {@link + * org.apache.beam.sdk.transforms.DoFn}. + */ + public Read<T> watchForNewFiles( + Duration pollInterval, + TerminationCondition<String, ?> terminationCondition, + boolean matchUpdatedFiles) { + return withMatchConfiguration( + getMatchConfiguration() + .continuously(pollInterval, terminationCondition, matchUpdatedFiles)); + } + + /** + * Same as {@link Read#watchForNewFiles(Duration, TerminationCondition, boolean)} with {@code + * matchUpdatedFiles=false}. + */ + public Read<T> watchForNewFiles( + Duration pollInterval, TerminationCondition<String, ?> terminationCondition) { + return watchForNewFiles(pollInterval, terminationCondition, false); + } + + /** + * Hints that the filepattern specified in {@link #from(String)} matches a very large number of + * files. + * + * <p>This hint may cause a runner to execute the transform differently, in a way that improves + * performance for this case, but it may worsen performance if the filepattern matches only a + * small number of files (e.g., in a runner that supports dynamic work rebalancing, it will + * happen less efficiently within individual files). + */ + public Read<T> withHintMatchesManyFiles() { + return toBuilder().setHintMatchesManyFiles(true).build(); + } + + /** + * If set to true, a Beam schema will be inferred from the AVRO schema. This allows the output + * to be used by SQL and by the schema-transform library. + */ + @Experimental(Kind.SCHEMAS) + public Read<T> withBeamSchemas(boolean withBeamSchemas) { + return toBuilder().setInferBeamSchema(withBeamSchemas).build(); + } + + @Override + @SuppressWarnings("unchecked") + public PCollection<T> expand(PBegin input) { + checkNotNull(getFilepattern(), "filepattern"); + checkNotNull(getSchema(), "schema"); + + if (getMatchConfiguration().getWatchInterval() == null && !getHintMatchesManyFiles()) { + PCollection<T> read = + input.apply( + "Read", + org.apache.beam.sdk.io.Read.from( + createSource( + getFilepattern(), + getMatchConfiguration().getEmptyMatchTreatment(), + getRecordClass(), + getSchema(), + null))); + return getInferBeamSchema() ? setBeamSchema(read, getRecordClass(), getSchema()) : read; + } + + // All other cases go through FileIO + ReadFiles + ReadFiles<T> readFiles = + (getRecordClass() == GenericRecord.class) + ? (ReadFiles<T>) readFilesGenericRecords(getSchema()) + : readFiles(getRecordClass()); + return input + .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of())) + .apply("Match All", FileIO.matchAll().withConfiguration(getMatchConfiguration())) + .apply( + "Read Matches", + FileIO.readMatches().withDirectoryTreatment(DirectoryTreatment.PROHIBIT)) + .apply("Via ReadFiles", readFiles); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder + .add( + DisplayData.item("inferBeamSchema", getInferBeamSchema()) + .withLabel("Infer Beam Schema")) + .addIfNotNull(DisplayData.item("schema", String.valueOf(getSchema()))) + .addIfNotNull(DisplayData.item("recordClass", getRecordClass()).withLabel("Record Class")) + .addIfNotNull( + DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern")) + .include("matchConfiguration", getMatchConfiguration()); + } + + @SuppressWarnings("unchecked") + private static <T> org.apache.beam.sdk.io.AvroSource<T> createSource( + ValueProvider<String> filepattern, + EmptyMatchTreatment emptyMatchTreatment, + Class<T> recordClass, + Schema schema, + org.apache.beam.sdk.io.AvroSource.@Nullable DatumReaderFactory<T> readerFactory) { + org.apache.beam.sdk.io.AvroSource<?> source = + org.apache.beam.sdk.io.AvroSource.from(filepattern) + .withEmptyMatchTreatment(emptyMatchTreatment); + + if (readerFactory != null) { + source = source.withDatumReaderFactory(readerFactory); + } + return recordClass == GenericRecord.class + ? (org.apache.beam.sdk.io.AvroSource<T>) source.withSchema(schema) + : source.withSchema(recordClass); + } + } + + ///////////////////////////////////////////////////////////////////////////// + + /** Implementation of {@link #readFiles}. */ + @AutoValue + public abstract static class ReadFiles<T> + extends PTransform<PCollection<ReadableFile>, PCollection<T>> { + + abstract @Nullable Class<T> getRecordClass(); + + abstract @Nullable Schema getSchema(); + + abstract boolean getUsesReshuffle(); + + abstract ReadFileRangesFnExceptionHandler getFileExceptionHandler(); + + abstract long getDesiredBundleSizeBytes(); + + abstract boolean getInferBeamSchema(); + + abstract org.apache.beam.sdk.io.AvroSource.@Nullable DatumReaderFactory<T> + getDatumReaderFactory(); + + abstract Builder<T> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<T> { + abstract Builder<T> setRecordClass(Class<T> recordClass); + + abstract Builder<T> setSchema(Schema schema); + + abstract Builder<T> setUsesReshuffle(boolean usesReshuffle); + + abstract Builder<T> setFileExceptionHandler( + ReadFileRangesFnExceptionHandler exceptionHandler); + + abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes); + + abstract Builder<T> setInferBeamSchema(boolean infer); + + abstract Builder<T> setDatumReaderFactory( + org.apache.beam.sdk.io.AvroSource.DatumReaderFactory<T> factory); + + abstract ReadFiles<T> build(); + } + + @VisibleForTesting + ReadFiles<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) { + return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build(); + } + + /** Specifies if a Reshuffle should run before file reads occur. */ + @Experimental(Kind.FILESYSTEM) + public ReadFiles<T> withUsesReshuffle(boolean usesReshuffle) { + return toBuilder().setUsesReshuffle(usesReshuffle).build(); + } + + /** Specifies if exceptions should be logged only for streaming pipelines. */ + @Experimental(Kind.FILESYSTEM) + public ReadFiles<T> withFileExceptionHandler( + ReadFileRangesFnExceptionHandler exceptionHandler) { + return toBuilder().setFileExceptionHandler(exceptionHandler).build(); + } + + /** + * If set to true, a Beam schema will be inferred from the AVRO schema. This allows the output + * to be used by SQL and by the schema-transform library. + */ + @Experimental(Kind.SCHEMAS) + public ReadFiles<T> withBeamSchemas(boolean withBeamSchemas) { + return toBuilder().setInferBeamSchema(withBeamSchemas).build(); + } + + public ReadFiles<T> withDatumReaderFactory( + org.apache.beam.sdk.io.AvroSource.DatumReaderFactory<T> factory) { + return toBuilder().setDatumReaderFactory(factory).build(); + } + + @Override + public PCollection<T> expand(PCollection<ReadableFile> input) { + checkNotNull(getSchema(), "schema"); + PCollection<T> read = + input.apply( + "Read all via FileBasedSource", + new ReadAllViaFileBasedSource<>( + getDesiredBundleSizeBytes(), + new CreateSourceFn<>( + getRecordClass(), getSchema().toString(), getDatumReaderFactory()), + AvroCoder.of(getRecordClass(), getSchema()), + getUsesReshuffle(), + getFileExceptionHandler())); + return getInferBeamSchema() ? setBeamSchema(read, getRecordClass(), getSchema()) : read; + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder + .add( + DisplayData.item("inferBeamSchema", getInferBeamSchema()) + .withLabel("Infer Beam Schema")) + .addIfNotNull(DisplayData.item("schema", String.valueOf(getSchema()))) + .addIfNotNull( + DisplayData.item("recordClass", getRecordClass()).withLabel("Record Class")); + } + } + + ///////////////////////////////////////////////////////////////////////////// + + /** + * Implementation of {@link #readAll}. + * + * @deprecated See {@link #readAll(Class)} for details. + */ + @Deprecated + @AutoValue + public abstract static class ReadAll<T> extends PTransform<PCollection<String>, PCollection<T>> { + abstract MatchConfiguration getMatchConfiguration(); + + abstract @Nullable Class<T> getRecordClass(); + + abstract @Nullable Schema getSchema(); + + abstract long getDesiredBundleSizeBytes(); + + abstract boolean getInferBeamSchema(); + + abstract Builder<T> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<T> { + abstract Builder<T> setMatchConfiguration(MatchConfiguration matchConfiguration); + + abstract Builder<T> setRecordClass(Class<T> recordClass); + + abstract Builder<T> setSchema(Schema schema); + + abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes); + + abstract Builder<T> setInferBeamSchema(boolean infer); + + abstract ReadAll<T> build(); + } + + /** Sets the {@link MatchConfiguration}. */ + public ReadAll<T> withMatchConfiguration(MatchConfiguration configuration) { + return toBuilder().setMatchConfiguration(configuration).build(); + } + + /** Like {@link Read#withEmptyMatchTreatment}. */ + public ReadAll<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) { + return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment)); + } + + /** Like {@link Read#watchForNewFiles}. */ + public ReadAll<T> watchForNewFiles( + Duration pollInterval, TerminationCondition<String, ?> terminationCondition) { + return withMatchConfiguration( + getMatchConfiguration().continuously(pollInterval, terminationCondition)); + } + + @VisibleForTesting + ReadAll<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) { + return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build(); + } + + /** + * If set to true, a Beam schema will be inferred from the AVRO schema. This allows the output + * to be used by SQL and by the schema-transform library. + */ + @Experimental(Kind.SCHEMAS) + public ReadAll<T> withBeamSchemas(boolean withBeamSchemas) { + return toBuilder().setInferBeamSchema(withBeamSchemas).build(); + } + + @Override + public PCollection<T> expand(PCollection<String> input) { + checkNotNull(getSchema(), "schema"); + PCollection<T> read = + input + .apply(FileIO.matchAll().withConfiguration(getMatchConfiguration())) + .apply(FileIO.readMatches().withDirectoryTreatment(DirectoryTreatment.PROHIBIT)) + .apply(readFiles(getRecordClass())); + return getInferBeamSchema() ? setBeamSchema(read, getRecordClass(), getSchema()) : read; + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder + .add( + DisplayData.item("inferBeamSchema", getInferBeamSchema()) + .withLabel("Infer Beam Schema")) + .addIfNotNull(DisplayData.item("schema", String.valueOf(getSchema()))) + .addIfNotNull(DisplayData.item("recordClass", getRecordClass()).withLabel("Record Class")) + .include("matchConfiguration", getMatchConfiguration()); + } + } + + private static class CreateSourceFn<T> + implements SerializableFunction<String, FileBasedSource<T>> { + private final Class<T> recordClass; + private final Supplier<Schema> schemaSupplier; + private final org.apache.beam.sdk.io.AvroSource.DatumReaderFactory<T> readerFactory; + + CreateSourceFn( + Class<T> recordClass, + String jsonSchema, + org.apache.beam.sdk.io.AvroSource.DatumReaderFactory<T> readerFactory) { + this.recordClass = recordClass; + this.schemaSupplier = + Suppliers.memoize( + Suppliers.compose(new JsonToSchema(), Suppliers.ofInstance(jsonSchema))); + this.readerFactory = readerFactory; + } + + @Override + public FileBasedSource<T> apply(String input) { + return Read.createSource( + StaticValueProvider.of(input), + EmptyMatchTreatment.DISALLOW, + recordClass, + schemaSupplier.get(), + readerFactory); + } + + private static class JsonToSchema implements Function<String, Schema>, Serializable { + @Override + public Schema apply(String input) { + return new Schema.Parser().parse(input); + } + } + } + + ///////////////////////////////////////////////////////////////////////////// + + /** Implementation of {@link #parseGenericRecords}. */ + @AutoValue + public abstract static class Parse<T> extends PTransform<PBegin, PCollection<T>> { + + abstract @Nullable ValueProvider<String> getFilepattern(); + + abstract MatchConfiguration getMatchConfiguration(); + + abstract SerializableFunction<GenericRecord, T> getParseFn(); + + abstract @Nullable Coder<T> getCoder(); + + abstract boolean getHintMatchesManyFiles(); + + abstract Builder<T> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<T> { + abstract Builder<T> setFilepattern(ValueProvider<String> filepattern); + + abstract Builder<T> setMatchConfiguration(MatchConfiguration matchConfiguration); + + abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn); + + abstract Builder<T> setCoder(Coder<T> coder); + + abstract Builder<T> setHintMatchesManyFiles(boolean hintMatchesManyFiles); + + abstract Parse<T> build(); + } + + /** Reads from the given filename or filepattern. */ + public Parse<T> from(String filepattern) { + return from(StaticValueProvider.of(filepattern)); + } + + /** Like {@link #from(String)}. */ + public Parse<T> from(ValueProvider<String> filepattern) { + return toBuilder().setFilepattern(filepattern).build(); + } + + /** Sets the {@link MatchConfiguration}. */ + public Parse<T> withMatchConfiguration(MatchConfiguration configuration) { + return toBuilder().setMatchConfiguration(configuration).build(); + } + + /** Like {@link Read#withEmptyMatchTreatment}. */ + public Parse<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) { + return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment)); + } + + /** Like {@link Read#watchForNewFiles}. */ + public Parse<T> watchForNewFiles( + Duration pollInterval, TerminationCondition<String, ?> terminationCondition) { + return withMatchConfiguration( + getMatchConfiguration().continuously(pollInterval, terminationCondition)); + } + + /** Sets a coder for the result of the parse function. */ + public Parse<T> withCoder(Coder<T> coder) { + return toBuilder().setCoder(coder).build(); + } + + /** Like {@link Read#withHintMatchesManyFiles()}. */ + public Parse<T> withHintMatchesManyFiles() { + return toBuilder().setHintMatchesManyFiles(true).build(); + } + + @Override + public PCollection<T> expand(PBegin input) { + checkNotNull(getFilepattern(), "filepattern"); + Coder<T> coder = inferCoder(getCoder(), getParseFn(), input.getPipeline().getCoderRegistry()); + + if (getMatchConfiguration().getWatchInterval() == null && !getHintMatchesManyFiles()) { + return input.apply( + org.apache.beam.sdk.io.Read.from( + org.apache.beam.sdk.io.AvroSource.from(getFilepattern()) + .withParseFn(getParseFn(), coder))); + } + + // All other cases go through FileIO + ParseFilesGenericRecords. + return input + .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of())) + .apply("Match All", FileIO.matchAll().withConfiguration(getMatchConfiguration())) + .apply( + "Read Matches", + FileIO.readMatches().withDirectoryTreatment(DirectoryTreatment.PROHIBIT)) + .apply("Via ParseFiles", parseFilesGenericRecords(getParseFn()).withCoder(coder)); + } + + private static <T> Coder<T> inferCoder( + @Nullable Coder<T> explicitCoder, + SerializableFunction<GenericRecord, T> parseFn, + CoderRegistry coderRegistry) { + if (explicitCoder != null) { + return explicitCoder; + } + // If a coder was not specified explicitly, infer it from parse fn. + try { + return coderRegistry.getCoder(TypeDescriptors.outputOf(parseFn)); + } catch (CannotProvideCoderException e) { + throw new IllegalArgumentException( + "Unable to infer coder for output of parseFn. Specify it explicitly using withCoder().", + e); + } + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder + .addIfNotNull( + DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern")) + .add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function")) + .include("matchConfiguration", getMatchConfiguration()); + } + } + + ///////////////////////////////////////////////////////////////////////////// + + /** Implementation of {@link #parseFilesGenericRecords}. */ + @AutoValue + public abstract static class ParseFiles<T> + extends PTransform<PCollection<ReadableFile>, PCollection<T>> { + abstract SerializableFunction<GenericRecord, T> getParseFn(); + + abstract @Nullable Coder<T> getCoder(); + + abstract boolean getUsesReshuffle(); + + abstract ReadFileRangesFnExceptionHandler getFileExceptionHandler(); + + abstract long getDesiredBundleSizeBytes(); + + abstract Builder<T> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<T> { + abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn); + + abstract Builder<T> setCoder(Coder<T> coder); + + abstract Builder<T> setUsesReshuffle(boolean usesReshuffle); + + abstract Builder<T> setFileExceptionHandler( + ReadFileRangesFnExceptionHandler exceptionHandler); + + abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes); + + abstract ParseFiles<T> build(); + } + + /** Specifies the coder for the result of the {@code parseFn}. */ + public ParseFiles<T> withCoder(Coder<T> coder) { + return toBuilder().setCoder(coder).build(); + } + + /** Specifies if a Reshuffle should run before file reads occur. */ + @Experimental(Kind.FILESYSTEM) + public ParseFiles<T> withUsesReshuffle(boolean usesReshuffle) { + return toBuilder().setUsesReshuffle(usesReshuffle).build(); + } + + /** Specifies if exceptions should be logged only for streaming pipelines. */ + @Experimental(Kind.FILESYSTEM) + public ParseFiles<T> withFileExceptionHandler( + ReadFileRangesFnExceptionHandler exceptionHandler) { + return toBuilder().setFileExceptionHandler(exceptionHandler).build(); + } + + @VisibleForTesting + ParseFiles<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) { + return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build(); + } + + @Override + public PCollection<T> expand(PCollection<ReadableFile> input) { + final Coder<T> coder = + Parse.inferCoder(getCoder(), getParseFn(), input.getPipeline().getCoderRegistry()); + final SerializableFunction<GenericRecord, T> parseFn = getParseFn(); + final SerializableFunction<String, FileBasedSource<T>> createSource = + new CreateParseSourceFn<>(parseFn, coder); + return input.apply( + "Parse Files via FileBasedSource", + new ReadAllViaFileBasedSource<>( + getDesiredBundleSizeBytes(), + createSource, + coder, + getUsesReshuffle(), + getFileExceptionHandler())); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function")); + } + + private static class CreateParseSourceFn<T> + implements SerializableFunction<String, FileBasedSource<T>> { + private final SerializableFunction<GenericRecord, T> parseFn; + private final Coder<T> coder; + + CreateParseSourceFn(SerializableFunction<GenericRecord, T> parseFn, Coder<T> coder) { + this.parseFn = parseFn; + this.coder = coder; + } + + @Override + public FileBasedSource<T> apply(String input) { + return AvroSource.from(input).withParseFn(parseFn, coder); + } + } + } + + ///////////////////////////////////////////////////////////////////////////// + + /** + * Implementation of {@link #parseAllGenericRecords}. + * + * @deprecated See {@link #parseAllGenericRecords(SerializableFunction)} for details. + */ + @Deprecated + @AutoValue + public abstract static class ParseAll<T> extends PTransform<PCollection<String>, PCollection<T>> { + abstract MatchConfiguration getMatchConfiguration(); + + abstract SerializableFunction<GenericRecord, T> getParseFn(); + + abstract @Nullable Coder<T> getCoder(); + + abstract long getDesiredBundleSizeBytes(); + + abstract Builder<T> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<T> { + abstract Builder<T> setMatchConfiguration(MatchConfiguration matchConfiguration); + + abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn); + + abstract Builder<T> setCoder(Coder<T> coder); + + abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes); + + abstract ParseAll<T> build(); + } + + /** Sets the {@link MatchConfiguration}. */ + public ParseAll<T> withMatchConfiguration(MatchConfiguration configuration) { + return toBuilder().setMatchConfiguration(configuration).build(); + } + + /** Like {@link Read#withEmptyMatchTreatment}. */ + public ParseAll<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) { + return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment)); + } + + /** Like {@link Read#watchForNewFiles(Duration, TerminationCondition, boolean)}. */ + public ParseAll<T> watchForNewFiles( + Duration pollInterval, + TerminationCondition<String, ?> terminationCondition, + boolean matchUpdatedFiles) { + return withMatchConfiguration( + getMatchConfiguration() + .continuously(pollInterval, terminationCondition, matchUpdatedFiles)); + } + + /** Like {@link Read#watchForNewFiles(Duration, TerminationCondition)}. */ + public ParseAll<T> watchForNewFiles( + Duration pollInterval, TerminationCondition<String, ?> terminationCondition) { + return watchForNewFiles(pollInterval, terminationCondition, false); + } + + /** Specifies the coder for the result of the {@code parseFn}. */ + public ParseAll<T> withCoder(Coder<T> coder) { + return toBuilder().setCoder(coder).build(); + } + + @VisibleForTesting + ParseAll<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) { + return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build(); + } + + @Override + public PCollection<T> expand(PCollection<String> input) { + return input + .apply(FileIO.matchAll().withConfiguration(getMatchConfiguration())) + .apply(FileIO.readMatches().withDirectoryTreatment(DirectoryTreatment.PROHIBIT)) + .apply( + "Parse all via FileBasedSource", + parseFilesGenericRecords(getParseFn()).withCoder(getCoder())); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder + .add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function")) + .include("matchConfiguration", getMatchConfiguration()); + } + } + + ///////////////////////////////////////////////////////////////////////////// + + /** Implementation of {@link #write}. */ + @AutoValue + public abstract static class TypedWrite<UserT, DestinationT, OutputT> Review Comment: I believe it was just added recently and it was not synced to the new class. I'll check the other classes and we have to do this before deprecation of "core" avro classes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
