aromanenko-dev commented on code in PR #24294:
URL: https://github.com/apache/beam/pull/24294#discussion_r1054585075


##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java:
##########
@@ -0,0 +1,2026 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.avro.io;
+
+import static org.apache.beam.sdk.io.FileIO.ReadMatches.DirectoryTreatment;
+import static 
org.apache.beam.sdk.io.ReadAllViaFileBasedSource.ReadFileRangesFnExceptionHandler;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
+import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.io.DefaultFilenamePolicy;
+import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.FileBasedSource;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileIO.MatchConfiguration;
+import org.apache.beam.sdk.io.FileIO.ReadableFile;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.ReadAllViaFileBasedSource;
+import org.apache.beam.sdk.io.ShardNameTemplate;
+import org.apache.beam.sdk.io.WriteFiles;
+import org.apache.beam.sdk.io.WriteFilesResult;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Function;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+
+/**
+ * {@link PTransform}s for reading and writing Avro files.
+ *
+ * <h2>Reading Avro files</h2>
+ *
+ * <p>To read a {@link PCollection} from one or more Avro files with the same 
schema known at
+ * pipeline construction time, use {@link #read}, using {@link Read#from} to 
specify the filename or
+ * filepattern to read from. If the filepatterns to be read are themselves in 
a {@link PCollection}
+ * you can use {@link FileIO} to match them and {@link AvroIO#readFiles} to 
read them. If the schema
+ * is unknown at pipeline construction time, use {@link #parseGenericRecords} 
or {@link
+ * #parseFilesGenericRecords}.
+ *
+ * <p>Many configuration options below apply to several or all of these 
transforms.
+ *
+ * <p>See {@link FileSystems} for information on supported file systems and 
filepatterns.
+ *
+ * <h3>Filepattern expansion and watching</h3>
+ *
+ * <p>By default, the filepatterns are expanded only once. {@link 
Read#watchForNewFiles} or the
+ * combination of {@link FileIO.Match#continuously(Duration, 
TerminationCondition)} and {@link
+ * AvroIO#readFiles(Class)} allow streaming of new files matching the 
filepattern(s).
+ *
+ * <p>By default, {@link #read} prohibits filepatterns that match no files, 
and {@link
+ * AvroIO#readFiles(Class)} allows them in case the filepattern contains a 
glob wildcard character.
+ * Use {@link Read#withEmptyMatchTreatment} or {@link
+ * FileIO.Match#withEmptyMatchTreatment(EmptyMatchTreatment)} plus {@link 
AvroIO#readFiles(Class)}
+ * to configure this behavior.
+ *
+ * <h3>Reading records of a known schema</h3>
+ *
+ * <p>To read specific records, such as Avro-generated classes, use {@link 
#read(Class)}. To read
+ * {@link GenericRecord GenericRecords}, use {@link 
#readGenericRecords(Schema)} which takes a
+ * {@link Schema} object, or {@link #readGenericRecords(String)} which takes 
an Avro schema in a
+ * JSON-encoded string form. An exception will be thrown if a record doesn't 
match the specified
+ * schema. Likewise, to read a {@link PCollection} of filepatterns, apply 
{@link FileIO} matching
+ * plus {@link #readFilesGenericRecords}.
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * // Read Avro-generated classes from files on GCS
+ * PCollection<AvroAutoGenClass> records =
+ *     
p.apply(AvroIO.read(AvroAutoGenClass.class).from("gs://my_bucket/path/to/records-*.avro"));
+ *
+ * // Read GenericRecord's of the given schema from files on GCS
+ * Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
+ * PCollection<GenericRecord> records =
+ *     p.apply(AvroIO.readGenericRecords(schema)
+ *                .from("gs://my_bucket/path/to/records-*.avro"));
+ * }</pre>
+ *
+ * <h3>Reading records of an unknown schema</h3>
+ *
+ * <p>To read records from files whose schema is unknown at pipeline 
construction time or differs
+ * between files, use {@link #parseGenericRecords} - in this case, you will 
need to specify a
+ * parsing function for converting each {@link GenericRecord} into a value of 
your custom type.
+ * Likewise, to read a {@link PCollection} of filepatterns with unknown 
schema, use {@link FileIO}
+ * matching plus {@link #parseFilesGenericRecords(SerializableFunction)}.
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * PCollection<Foo> records =
+ *     p.apply(AvroIO.parseGenericRecords(new 
SerializableFunction<GenericRecord, Foo>() {
+ *       public Foo apply(GenericRecord record) {
+ *         // If needed, access the schema of the record using 
record.getSchema()
+ *         return ...;
+ *       }
+ *     }));
+ * }</pre>
+ *
+ * <h3>Reading from a {@link PCollection} of filepatterns</h3>
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * PCollection<String> filepatterns = p.apply(...);
+ * PCollection<AvroAutoGenClass> records =
+ *     filepatterns.apply(AvroIO.readAll(AvroAutoGenClass.class));
+ * PCollection<AvroAutoGenClass> records =
+ *     filepatterns
+ *         .apply(FileIO.matchAll())
+ *         .apply(FileIO.readMatches())
+ *         .apply(AvroIO.readFiles(AvroAutoGenClass.class));
+ * PCollection<GenericRecord> genericRecords =
+ *     filepatterns.apply(AvroIO.readGenericRecords(schema));
+ * PCollection<Foo> records =
+ *     filepatterns
+ *         .apply(FileIO.matchAll())
+ *         .apply(FileIO.readMatches())
+ *         .apply(AvroIO.parseFilesGenericRecords(new SerializableFunction...);
+ * }</pre>
+ *
+ * <h3>Streaming new files matching a filepattern</h3>
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * PCollection<AvroAutoGenClass> lines = p.apply(AvroIO
+ *     .read(AvroAutoGenClass.class)
+ *     .from("gs://my_bucket/path/to/records-*.avro")
+ *     .watchForNewFiles(
+ *       // Check for new files every minute
+ *       Duration.standardMinutes(1),
+ *       // Stop watching the filepattern if no new files appear within an hour
+ *       afterTimeSinceNewOutput(Duration.standardHours(1))));
+ * }</pre>
+ *
+ * <h3>Reading a very large number of files</h3>
+ *
+ * <p>If it is known that the filepattern will match a very large number of 
files (e.g. tens of
+ * thousands or more), use {@link Read#withHintMatchesManyFiles} for better 
performance and
+ * scalability. Note that it may decrease performance if the filepattern 
matches only a small number
+ * of files.
+ *
+ * <h3>Inferring Beam schemas from Avro files</h3>
+ *
+ * <p>If you want to use SQL or schema based operations on an Avro-based 
PCollection, you must
+ * configure the read transform to infer the Beam schema and automatically 
setup the Beam related
+ * coders by doing:
+ *
+ * <pre>{@code
+ * PCollection<AvroAutoGenClass> records =
+ *     p.apply(AvroIO.read(...).from(...).withBeamSchemas(true));
+ * }</pre>
+ *
+ * <h3>Inferring Beam schemas from Avro PCollections</h3>
+ *
+ * <p>If you created an Avro-based PCollection by other means e.g. reading 
records from Kafka or as
+ * the output of another PTransform, you may be interested on making your 
PCollection schema-aware
+ * so you can use the Schema-based APIs or Beam's SqlTransform.
+ *
+ * <p>If you are using Avro specific records (generated classes from an Avro 
schema), you can
+ * register a schema provider for the specific Avro class to make any 
PCollection of these objects
+ * schema-aware.
+ *
+ * <pre>{@code
+ * pipeline.getSchemaRegistry().registerSchemaProvider(AvroAutoGenClass.class, 
AvroAutoGenClass.getClassSchema());
+ * }</pre>
+ *
+ * You can also manually set an Avro-backed Schema coder for a PCollection 
using {@link
+ * AvroUtils#schemaCoder(Class, Schema)} to make it schema-aware.
+ *
+ * <pre>{@code
+ * PCollection<AvroAutoGenClass> records = ...
+ * AvroCoder<AvroAutoGenClass> coder = (AvroCoder<AvroAutoGenClass>) 
users.getCoder();
+ * records.setCoder(AvroUtils.schemaCoder(coder.getType(), coder.getSchema()));
+ * }</pre>
+ *
+ * <p>If you are using GenericRecords you may need to set a specific Beam 
schema coder for each
+ * PCollection to match their internal Avro schema.
+ *
+ * <pre>{@code
+ * org.apache.avro.Schema avroSchema = ...
+ * PCollection<GenericRecord> records = ...
+ * records.setCoder(AvroUtils.schemaCoder(avroSchema));
+ * }</pre>
+ *
+ * <h2>Writing Avro files</h2>
+ *
+ * <p>To write a {@link PCollection} to one or more Avro files, use {@link 
Write}, using {@code
+ * AvroIO.write().to(String)} to specify the output filename prefix. The 
default {@link
+ * DefaultFilenamePolicy} will use this prefix, in conjunction with a {@link 
ShardNameTemplate} (set
+ * via {@link Write#withShardNameTemplate(String)}) and optional filename 
suffix (set via {@link
+ * Write#withSuffix(String)}, to generate output filenames in a sharded way. 
You can override this
+ * default write filename policy using {@link Write#to(FilenamePolicy)} to 
specify a custom file
+ * naming policy.
+ *
+ * <p>By default, {@link Write} produces output files that are compressed 
using the {@link
+ * org.apache.avro.file.Codec CodecFactory.snappyCodec()}. This default can be 
changed or overridden
+ * using {@link Write#withCodec}.
+ *
+ * <h3>Writing specific or generic records</h3>
+ *
+ * <p>To write specific records, such as Avro-generated classes, use {@link 
#write(Class)}. To write
+ * {@link GenericRecord GenericRecords}, use either {@link 
#writeGenericRecords(Schema)} which takes
+ * a {@link Schema} object, or {@link #writeGenericRecords(String)} which 
takes a schema in a
+ * JSON-encoded string form. An exception will be thrown if a record doesn't 
match the specified
+ * schema.
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * // A simple Write to a local file (only runs locally):
+ * PCollection<AvroAutoGenClass> records = ...;
+ * 
records.apply(AvroIO.write(AvroAutoGenClass.class).to("/path/to/file.avro"));
+ *
+ * // A Write to a sharded GCS file (runs locally and using remote execution):
+ * Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
+ * PCollection<GenericRecord> records = ...;
+ * records.apply("WriteToAvro", AvroIO.writeGenericRecords(schema)
+ *     .to("gs://my_bucket/path/to/numbers")
+ *     .withSuffix(".avro"));
+ * }</pre>
+ *
+ * <h3>Writing windowed or unbounded data</h3>
+ *
+ * <p>By default, all input is put into the global window before writing. If 
per-window writes are
+ * desired - for example, when using a streaming runner - {@link 
Write#withWindowedWrites()} will
+ * cause windowing and triggering to be preserved. When producing windowed 
writes with a streaming
+ * runner that supports triggers, the number of output shards must be set 
explicitly using {@link
+ * Write#withNumShards(int)}; some runners may set this for you to a 
runner-chosen value, so you may
+ * need not set it yourself. A {@link FilenamePolicy} must be set, and unique 
windows and triggers
+ * must produce unique filenames.
+ *
+ * <h3>Writing data to multiple destinations</h3>
+ *
+ * <p>The following shows a more-complex example of AvroIO.Write usage, 
generating dynamic file
+ * destinations as well as a dynamic Avro schema per file. In this example, a 
PCollection of user
+ * events (e.g. actions on a website) is written out to Avro files. Each event 
contains the user id
+ * as an integer field. We want events for each user to go into a specific 
directory for that user,
+ * and each user's data should be written with a specific schema for that 
user; a side input is
+ * used, so the schema can be calculated in a different stage.
+ *
+ * <pre>{@code
+ * // This is the user class that controls dynamic destinations for this avro 
write. The input to
+ * // AvroIO.Write will be UserEvent, and we will be writing GenericRecords to 
the file (in order
+ * // to have dynamic schemas). Everything is per userid, so we define a 
dynamic destination type
+ * // of Integer.
+ * class UserDynamicAvroDestinations
+ *     extends DynamicAvroDestinations<UserEvent, Integer, GenericRecord> {
+ *   private final PCollectionView<Map<Integer, String>> userToSchemaMap;
+ *   public UserDynamicAvroDestinations( PCollectionView<Map<Integer, String>> 
userToSchemaMap) {
+ *     this.userToSchemaMap = userToSchemaMap;
+ *   }
+ *   public GenericRecord formatRecord(UserEvent record) {
+ *     return formatUserRecord(record, getSchema(record.getUserId()));
+ *   }
+ *   public Schema getSchema(Integer userId) {
+ *     return new 
Schema.Parser().parse(sideInput(userToSchemaMap).get(userId));
+ *   }
+ *   public Integer getDestination(UserEvent record) {
+ *     return record.getUserId();
+ *   }
+ *   public Integer getDefaultDestination() {
+ *     return 0;
+ *   }
+ *   public FilenamePolicy getFilenamePolicy(Integer userId) {
+ *     return DefaultFilenamePolicy.fromParams(new 
Params().withBaseFilename(baseDir + "/user-"
+ *     + userId + "/events"));
+ *   }
+ *   public List<PCollectionView<?>> getSideInputs() {
+ *     return ImmutableList.<PCollectionView<?>>of(userToSchemaMap);
+ *   }
+ * }
+ * PCollection<UserEvents> events = ...;
+ * PCollectionView<Map<Integer, String>> userToSchemaMap = events.apply(
+ *     "ComputePerUserSchemas", new ComputePerUserSchemas());
+ * events.apply("WriteAvros", AvroIO.<Integer>writeCustomTypeToGenericRecords()
+ *     .to(new UserDynamicAvroDestinations(userToSchemaMap)));
+ * }</pre>
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class AvroIO {
+  /**
+   * Reads records of the given type from an Avro file (or multiple Avro files 
matching a pattern).
+   *
+   * <p>The schema must be specified using one of the {@code withSchema} 
functions.
+   */
+  public static <T> Read<T> read(Class<T> recordClass) {
+    return new AutoValue_AvroIO_Read.Builder<T>()
+        
.setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
+        .setRecordClass(recordClass)
+        .setSchema(ReflectData.get().getSchema(recordClass))
+        .setInferBeamSchema(false)
+        .setHintMatchesManyFiles(false)
+        .build();
+  }
+
+  /**
+   * Like {@link #read}, but reads each file in a {@link PCollection} of 
{@link ReadableFile},
+   * returned by {@link FileIO#readMatches}.
+   *
+   * <p>You can read {@link GenericRecord} by using {@code 
#readFiles(GenericRecord.class)} or
+   * {@code #readFiles(new Schema.Parser().parse(schema))} if the schema is a 
String.
+   */
+  public static <T> ReadFiles<T> readFiles(Class<T> recordClass) {
+    return new AutoValue_AvroIO_ReadFiles.Builder<T>()
+        .setRecordClass(recordClass)
+        .setSchema(ReflectData.get().getSchema(recordClass))
+        .setInferBeamSchema(false)
+        .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+        .setUsesReshuffle(ReadAllViaFileBasedSource.DEFAULT_USES_RESHUFFLE)
+        .setFileExceptionHandler(new ReadFileRangesFnExceptionHandler())
+        .build();
+  }
+
+  /**
+   * Like {@link #read}, but reads each filepattern in the input {@link 
PCollection}.
+   *
+   * @deprecated You can achieve The functionality of {@link #readAll} using 
{@link FileIO} matching
+   *     plus {@link #readFiles(Class)}. This is the preferred method to make 
composition explicit.
+   *     {@link ReadAll} will not receive upgrades and will be removed in a 
future version of Beam.
+   */
+  @Deprecated
+  public static <T> ReadAll<T> readAll(Class<T> recordClass) {
+    return new AutoValue_AvroIO_ReadAll.Builder<T>()
+        
.setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
+        .setRecordClass(recordClass)
+        .setSchema(ReflectData.get().getSchema(recordClass))
+        .setInferBeamSchema(false)
+        .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+        .build();
+  }
+
+  /** Reads Avro file(s) containing records of the specified schema. */
+  public static Read<GenericRecord> readGenericRecords(Schema schema) {
+    return new AutoValue_AvroIO_Read.Builder<GenericRecord>()
+        
.setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
+        .setRecordClass(GenericRecord.class)
+        .setSchema(schema)
+        .setInferBeamSchema(false)
+        .setHintMatchesManyFiles(false)
+        .build();
+  }
+
+  /**
+   * Like {@link #readGenericRecords(Schema)}, but for a {@link PCollection} 
of {@link
+   * ReadableFile}, for example, returned by {@link FileIO#readMatches}.
+   */
+  public static ReadFiles<GenericRecord> readFilesGenericRecords(Schema 
schema) {
+    return new AutoValue_AvroIO_ReadFiles.Builder<GenericRecord>()
+        .setRecordClass(GenericRecord.class)
+        .setSchema(schema)
+        .setInferBeamSchema(false)
+        .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+        .setUsesReshuffle(ReadAllViaFileBasedSource.DEFAULT_USES_RESHUFFLE)
+        .setFileExceptionHandler(new ReadFileRangesFnExceptionHandler())
+        .build();
+  }
+
+  /**
+   * Like {@link #readGenericRecords(Schema)}, but for a {@link PCollection} 
of {@link
+   * ReadableFile}, for example, returned by {@link FileIO#readMatches}.
+   *
+   * @deprecated You can achieve The functionality of {@link 
#readAllGenericRecords(Schema)} using
+   *     {@link FileIO} matching plus {@link 
#readFilesGenericRecords(Schema)}. This is the
+   *     preferred method to make composition explicit. {@link ReadAll} will 
not receive upgrades
+   *     and will be removed in a future version of Beam.
+   */
+  @Deprecated
+  public static ReadAll<GenericRecord> readAllGenericRecords(Schema schema) {
+    return new AutoValue_AvroIO_ReadAll.Builder<GenericRecord>()
+        
.setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
+        .setRecordClass(GenericRecord.class)
+        .setSchema(schema)
+        .setInferBeamSchema(false)
+        .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+        .build();
+  }
+
+  /**
+   * Reads Avro file(s) containing records of the specified schema. The schema 
is specified as a
+   * JSON-encoded string.
+   */
+  public static Read<GenericRecord> readGenericRecords(String schema) {
+    return readGenericRecords(new Schema.Parser().parse(schema));
+  }
+
+  /** Like {@link #readGenericRecords(String)}, but for {@link ReadableFile} 
collections. */
+  public static ReadFiles<GenericRecord> readFilesGenericRecords(String 
schema) {
+    return readFilesGenericRecords(new Schema.Parser().parse(schema));
+  }
+
+  /**
+   * Like {@link #readGenericRecords(String)}, but reads each filepattern in 
the input {@link
+   * PCollection}.
+   *
+   * @deprecated You can achieve The functionality of {@link 
#readAllGenericRecords(String)} using
+   *     {@link FileIO} matching plus {@link 
#readFilesGenericRecords(String)}. This is the
+   *     preferred method to make composition explicit. {@link ReadAll} will 
not receive upgrades
+   *     and will be removed in a future version of Beam.
+   */
+  @Deprecated
+  public static ReadAll<GenericRecord> readAllGenericRecords(String schema) {
+    return readAllGenericRecords(new Schema.Parser().parse(schema));
+  }
+
+  /**
+   * Reads Avro file(s) containing records of an unspecified schema and 
converting each record to a
+   * custom type.
+   */
+  public static <T> Parse<T> 
parseGenericRecords(SerializableFunction<GenericRecord, T> parseFn) {
+    return new AutoValue_AvroIO_Parse.Builder<T>()
+        
.setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
+        .setParseFn(parseFn)
+        .setHintMatchesManyFiles(false)
+        .build();
+  }
+
+  /**
+   * Like {@link #parseGenericRecords(SerializableFunction)}, but reads each 
{@link ReadableFile} in
+   * the input {@link PCollection}.
+   */
+  public static <T> ParseFiles<T> parseFilesGenericRecords(
+      SerializableFunction<GenericRecord, T> parseFn) {
+    return new AutoValue_AvroIO_ParseFiles.Builder<T>()
+        .setParseFn(parseFn)
+        .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+        .setUsesReshuffle(ReadAllViaFileBasedSource.DEFAULT_USES_RESHUFFLE)
+        .setFileExceptionHandler(new ReadFileRangesFnExceptionHandler())
+        .build();
+  }
+
+  /**
+   * Like {@link #parseGenericRecords(SerializableFunction)}, but reads each 
filepattern in the
+   * input {@link PCollection}.
+   *
+   * @deprecated You can achieve The functionality of {@link
+   *     #parseAllGenericRecords(SerializableFunction)} using {@link FileIO} 
matching plus {@link
+   *     #parseFilesGenericRecords(SerializableFunction)} ()}. This is the 
preferred method to make
+   *     composition explicit. {@link ParseAll} will not receive upgrades and 
will be removed in a
+   *     future version of Beam.
+   */
+  @Deprecated
+  public static <T> ParseAll<T> parseAllGenericRecords(
+      SerializableFunction<GenericRecord, T> parseFn) {
+    return new AutoValue_AvroIO_ParseAll.Builder<T>()
+        
.setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
+        .setParseFn(parseFn)
+        .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+        .build();
+  }
+
+  /**
+   * Writes a {@link PCollection} to an Avro file (or multiple Avro files 
matching a sharding
+   * pattern).
+   */
+  public static <T> Write<T> write(Class<T> recordClass) {
+    return new Write<>(
+        AvroIO.<T, T>defaultWriteBuilder()
+            .setGenericRecords(false)
+            .setSchema(ReflectData.get().getSchema(recordClass))
+            .build());
+  }
+
+  /** Writes Avro records of the specified schema. */
+  public static Write<GenericRecord> writeGenericRecords(Schema schema) {
+    return new Write<>(
+        AvroIO.<GenericRecord, GenericRecord>defaultWriteBuilder()
+            .setGenericRecords(true)
+            .setSchema(schema)
+            .build());
+  }
+
+  /**
+   * A {@link PTransform} that writes a {@link PCollection} to an avro file 
(or multiple avro files
+   * matching a sharding pattern), with each element of the input collection 
encoded into its own
+   * record of type OutputT.
+   *
+   * <p>This version allows you to apply {@link AvroIO} writes to a 
PCollection of a custom type
+   * {@link UserT}. A format mechanism that converts the input type {@link 
UserT} to the output type
+   * that will be written to the file must be specified. If using a custom 
{@link
+   * DynamicAvroDestinations} object this is done using {@link
+   * DynamicAvroDestinations#formatRecord}, otherwise the {@link 
TypedWrite#withFormatFunction} can
+   * be used to specify a format function.
+   *
+   * <p>The advantage of using a custom type is that is it allows a 
user-provided {@link
+   * DynamicAvroDestinations} object, set via {@link 
Write#to(DynamicAvroDestinations)} to examine
+   * the custom type when choosing a destination.
+   *
+   * <p>If the output type is {@link GenericRecord} use {@link 
#writeCustomTypeToGenericRecords()}
+   * instead.
+   */
+  public static <UserT, OutputT> TypedWrite<UserT, Void, OutputT> 
writeCustomType() {
+    return AvroIO.<UserT, 
OutputT>defaultWriteBuilder().setGenericRecords(false).build();
+  }
+
+  /**
+   * Similar to {@link #writeCustomType()}, but specialized for the case where 
the output type is
+   * {@link GenericRecord}. A schema must be specified either in {@link
+   * DynamicAvroDestinations#getSchema} or if not using dynamic destinations, 
by using {@link
+   * TypedWrite#withSchema(Schema)}.
+   */
+  public static <UserT> TypedWrite<UserT, Void, GenericRecord> 
writeCustomTypeToGenericRecords() {
+    return AvroIO.<UserT, 
GenericRecord>defaultWriteBuilder().setGenericRecords(true).build();
+  }
+
+  /**
+   * Writes Avro records of the specified schema. The schema is specified as a 
JSON-encoded string.
+   */
+  public static Write<GenericRecord> writeGenericRecords(String schema) {
+    return writeGenericRecords(new Schema.Parser().parse(schema));
+  }
+
+  private static <UserT, OutputT> TypedWrite.Builder<UserT, Void, OutputT> 
defaultWriteBuilder() {
+    return new AutoValue_AvroIO_TypedWrite.Builder<UserT, Void, OutputT>()
+        .setFilenameSuffix(null)
+        .setShardTemplate(null)
+        .setNumShards(0)
+        .setCodec(TypedWrite.DEFAULT_SERIALIZABLE_CODEC)
+        .setMetadata(ImmutableMap.of())
+        .setWindowedWrites(false)
+        .setNoSpilling(false);
+  }
+
+  @Experimental(Kind.SCHEMAS)
+  private static <T> PCollection<T> setBeamSchema(
+      PCollection<T> pc, Class<T> clazz, @Nullable Schema schema) {
+    return pc.setCoder(AvroUtils.schemaCoder(clazz, schema));
+  }
+
+  /**
+   * 64MB is a reasonable value that allows to amortize the cost of opening 
files, but is not so
+   * large as to exhaust a typical runner's maximum amount of output per 
ProcessElement call.
+   */
+  private static final long DEFAULT_BUNDLE_SIZE_BYTES = 64 * 1024 * 1024L;
+
+  /** Implementation of {@link #read} and {@link #readGenericRecords}. */
+  @AutoValue
+  public abstract static class Read<T> extends PTransform<PBegin, 
PCollection<T>> {
+
+    abstract @Nullable ValueProvider<String> getFilepattern();
+
+    abstract MatchConfiguration getMatchConfiguration();
+
+    abstract @Nullable Class<T> getRecordClass();
+
+    abstract @Nullable Schema getSchema();
+
+    abstract boolean getInferBeamSchema();
+
+    abstract boolean getHintMatchesManyFiles();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setFilepattern(ValueProvider<String> filepattern);
+
+      abstract Builder<T> setMatchConfiguration(MatchConfiguration 
matchConfiguration);
+
+      abstract Builder<T> setRecordClass(Class<T> recordClass);
+
+      abstract Builder<T> setSchema(Schema schema);
+
+      abstract Builder<T> setInferBeamSchema(boolean infer);
+
+      abstract Builder<T> setHintMatchesManyFiles(boolean hintManyFiles);
+
+      abstract Read<T> build();
+    }
+
+    /**
+     * Reads from the given filename or filepattern.
+     *
+     * <p>If it is known that the filepattern will match a very large number 
of files (at least tens
+     * of thousands), use {@link #withHintMatchesManyFiles} for better 
performance and scalability.
+     */
+    public Read<T> from(ValueProvider<String> filepattern) {
+      return toBuilder().setFilepattern(filepattern).build();
+    }
+
+    /** Like {@link #from(ValueProvider)}. */
+    public Read<T> from(String filepattern) {
+      return from(StaticValueProvider.of(filepattern));
+    }
+
+    /** Sets the {@link MatchConfiguration}. */
+    public Read<T> withMatchConfiguration(MatchConfiguration 
matchConfiguration) {
+      return toBuilder().setMatchConfiguration(matchConfiguration).build();
+    }
+
+    /** Configures whether or not a filepattern matching no files is allowed. 
*/
+    public Read<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return 
withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment));
+    }
+
+    /**
+     * Continuously watches for new files matching the filepattern, polling it 
at the given
+     * interval, until the given termination condition is reached. The 
returned {@link PCollection}
+     * is unbounded. If {@code matchUpdatedFiles} is set, also watches for 
files with timestamp
+     * change.
+     *
+     * <p>This works only in runners supporting splittable {@link
+     * org.apache.beam.sdk.transforms.DoFn}.
+     */
+    public Read<T> watchForNewFiles(
+        Duration pollInterval,
+        TerminationCondition<String, ?> terminationCondition,
+        boolean matchUpdatedFiles) {
+      return withMatchConfiguration(
+          getMatchConfiguration()
+              .continuously(pollInterval, terminationCondition, 
matchUpdatedFiles));
+    }
+
+    /**
+     * Same as {@link Read#watchForNewFiles(Duration, TerminationCondition, 
boolean)} with {@code
+     * matchUpdatedFiles=false}.
+     */
+    public Read<T> watchForNewFiles(
+        Duration pollInterval, TerminationCondition<String, ?> 
terminationCondition) {
+      return watchForNewFiles(pollInterval, terminationCondition, false);
+    }
+
+    /**
+     * Hints that the filepattern specified in {@link #from(String)} matches a 
very large number of
+     * files.
+     *
+     * <p>This hint may cause a runner to execute the transform differently, 
in a way that improves
+     * performance for this case, but it may worsen performance if the 
filepattern matches only a
+     * small number of files (e.g., in a runner that supports dynamic work 
rebalancing, it will
+     * happen less efficiently within individual files).
+     */
+    public Read<T> withHintMatchesManyFiles() {
+      return toBuilder().setHintMatchesManyFiles(true).build();
+    }
+
+    /**
+     * If set to true, a Beam schema will be inferred from the AVRO schema. 
This allows the output
+     * to be used by SQL and by the schema-transform library.
+     */
+    @Experimental(Kind.SCHEMAS)
+    public Read<T> withBeamSchemas(boolean withBeamSchemas) {
+      return toBuilder().setInferBeamSchema(withBeamSchemas).build();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public PCollection<T> expand(PBegin input) {
+      checkNotNull(getFilepattern(), "filepattern");
+      checkNotNull(getSchema(), "schema");
+
+      if (getMatchConfiguration().getWatchInterval() == null && 
!getHintMatchesManyFiles()) {
+        PCollection<T> read =
+            input.apply(
+                "Read",
+                org.apache.beam.sdk.io.Read.from(
+                    createSource(
+                        getFilepattern(),
+                        getMatchConfiguration().getEmptyMatchTreatment(),
+                        getRecordClass(),
+                        getSchema(),
+                        null)));
+        return getInferBeamSchema() ? setBeamSchema(read, getRecordClass(), 
getSchema()) : read;
+      }
+
+      // All other cases go through FileIO + ReadFiles
+      ReadFiles<T> readFiles =
+          (getRecordClass() == GenericRecord.class)
+              ? (ReadFiles<T>) readFilesGenericRecords(getSchema())
+              : readFiles(getRecordClass());
+      return input
+          .apply("Create filepattern", Create.ofProvider(getFilepattern(), 
StringUtf8Coder.of()))
+          .apply("Match All", 
FileIO.matchAll().withConfiguration(getMatchConfiguration()))
+          .apply(
+              "Read Matches",
+              
FileIO.readMatches().withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
+          .apply("Via ReadFiles", readFiles);
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .add(
+              DisplayData.item("inferBeamSchema", getInferBeamSchema())
+                  .withLabel("Infer Beam Schema"))
+          .addIfNotNull(DisplayData.item("schema", 
String.valueOf(getSchema())))
+          .addIfNotNull(DisplayData.item("recordClass", 
getRecordClass()).withLabel("Record Class"))
+          .addIfNotNull(
+              DisplayData.item("filePattern", 
getFilepattern()).withLabel("Input File Pattern"))
+          .include("matchConfiguration", getMatchConfiguration());
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T> org.apache.beam.sdk.io.AvroSource<T> createSource(
+        ValueProvider<String> filepattern,
+        EmptyMatchTreatment emptyMatchTreatment,
+        Class<T> recordClass,
+        Schema schema,
+        org.apache.beam.sdk.io.AvroSource.@Nullable DatumReaderFactory<T> 
readerFactory) {
+      org.apache.beam.sdk.io.AvroSource<?> source =
+          org.apache.beam.sdk.io.AvroSource.from(filepattern)
+              .withEmptyMatchTreatment(emptyMatchTreatment);
+
+      if (readerFactory != null) {
+        source = source.withDatumReaderFactory(readerFactory);
+      }
+      return recordClass == GenericRecord.class
+          ? (org.apache.beam.sdk.io.AvroSource<T>) source.withSchema(schema)
+          : source.withSchema(recordClass);
+    }
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  /** Implementation of {@link #readFiles}. */
+  @AutoValue
+  public abstract static class ReadFiles<T>
+      extends PTransform<PCollection<ReadableFile>, PCollection<T>> {
+
+    abstract @Nullable Class<T> getRecordClass();
+
+    abstract @Nullable Schema getSchema();
+
+    abstract boolean getUsesReshuffle();
+
+    abstract ReadFileRangesFnExceptionHandler getFileExceptionHandler();
+
+    abstract long getDesiredBundleSizeBytes();
+
+    abstract boolean getInferBeamSchema();
+
+    abstract org.apache.beam.sdk.io.AvroSource.@Nullable DatumReaderFactory<T>
+        getDatumReaderFactory();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setRecordClass(Class<T> recordClass);
+
+      abstract Builder<T> setSchema(Schema schema);
+
+      abstract Builder<T> setUsesReshuffle(boolean usesReshuffle);
+
+      abstract Builder<T> setFileExceptionHandler(
+          ReadFileRangesFnExceptionHandler exceptionHandler);
+
+      abstract Builder<T> setDesiredBundleSizeBytes(long 
desiredBundleSizeBytes);
+
+      abstract Builder<T> setInferBeamSchema(boolean infer);
+
+      abstract Builder<T> setDatumReaderFactory(
+          org.apache.beam.sdk.io.AvroSource.DatumReaderFactory<T> factory);
+
+      abstract ReadFiles<T> build();
+    }
+
+    @VisibleForTesting
+    ReadFiles<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
+      return 
toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
+    }
+
+    /** Specifies if a Reshuffle should run before file reads occur. */
+    @Experimental(Kind.FILESYSTEM)
+    public ReadFiles<T> withUsesReshuffle(boolean usesReshuffle) {
+      return toBuilder().setUsesReshuffle(usesReshuffle).build();
+    }
+
+    /** Specifies if exceptions should be logged only for streaming pipelines. 
*/
+    @Experimental(Kind.FILESYSTEM)
+    public ReadFiles<T> withFileExceptionHandler(
+        ReadFileRangesFnExceptionHandler exceptionHandler) {
+      return toBuilder().setFileExceptionHandler(exceptionHandler).build();
+    }
+
+    /**
+     * If set to true, a Beam schema will be inferred from the AVRO schema. 
This allows the output
+     * to be used by SQL and by the schema-transform library.
+     */
+    @Experimental(Kind.SCHEMAS)
+    public ReadFiles<T> withBeamSchemas(boolean withBeamSchemas) {
+      return toBuilder().setInferBeamSchema(withBeamSchemas).build();
+    }
+
+    public ReadFiles<T> withDatumReaderFactory(
+        org.apache.beam.sdk.io.AvroSource.DatumReaderFactory<T> factory) {
+      return toBuilder().setDatumReaderFactory(factory).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PCollection<ReadableFile> input) {
+      checkNotNull(getSchema(), "schema");
+      PCollection<T> read =
+          input.apply(
+              "Read all via FileBasedSource",
+              new ReadAllViaFileBasedSource<>(
+                  getDesiredBundleSizeBytes(),
+                  new CreateSourceFn<>(
+                      getRecordClass(), getSchema().toString(), 
getDatumReaderFactory()),
+                  AvroCoder.of(getRecordClass(), getSchema()),
+                  getUsesReshuffle(),
+                  getFileExceptionHandler()));
+      return getInferBeamSchema() ? setBeamSchema(read, getRecordClass(), 
getSchema()) : read;
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .add(
+              DisplayData.item("inferBeamSchema", getInferBeamSchema())
+                  .withLabel("Infer Beam Schema"))
+          .addIfNotNull(DisplayData.item("schema", 
String.valueOf(getSchema())))
+          .addIfNotNull(
+              DisplayData.item("recordClass", 
getRecordClass()).withLabel("Record Class"));
+    }
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Implementation of {@link #readAll}.
+   *
+   * @deprecated See {@link #readAll(Class)} for details.
+   */
+  @Deprecated
+  @AutoValue
+  public abstract static class ReadAll<T> extends 
PTransform<PCollection<String>, PCollection<T>> {
+    abstract MatchConfiguration getMatchConfiguration();
+
+    abstract @Nullable Class<T> getRecordClass();
+
+    abstract @Nullable Schema getSchema();
+
+    abstract long getDesiredBundleSizeBytes();
+
+    abstract boolean getInferBeamSchema();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setMatchConfiguration(MatchConfiguration 
matchConfiguration);
+
+      abstract Builder<T> setRecordClass(Class<T> recordClass);
+
+      abstract Builder<T> setSchema(Schema schema);
+
+      abstract Builder<T> setDesiredBundleSizeBytes(long 
desiredBundleSizeBytes);
+
+      abstract Builder<T> setInferBeamSchema(boolean infer);
+
+      abstract ReadAll<T> build();
+    }
+
+    /** Sets the {@link MatchConfiguration}. */
+    public ReadAll<T> withMatchConfiguration(MatchConfiguration configuration) 
{
+      return toBuilder().setMatchConfiguration(configuration).build();
+    }
+
+    /** Like {@link Read#withEmptyMatchTreatment}. */
+    public ReadAll<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return 
withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment));
+    }
+
+    /** Like {@link Read#watchForNewFiles}. */
+    public ReadAll<T> watchForNewFiles(
+        Duration pollInterval, TerminationCondition<String, ?> 
terminationCondition) {
+      return withMatchConfiguration(
+          getMatchConfiguration().continuously(pollInterval, 
terminationCondition));
+    }
+
+    @VisibleForTesting
+    ReadAll<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
+      return 
toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
+    }
+
+    /**
+     * If set to true, a Beam schema will be inferred from the AVRO schema. 
This allows the output
+     * to be used by SQL and by the schema-transform library.
+     */
+    @Experimental(Kind.SCHEMAS)
+    public ReadAll<T> withBeamSchemas(boolean withBeamSchemas) {
+      return toBuilder().setInferBeamSchema(withBeamSchemas).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PCollection<String> input) {
+      checkNotNull(getSchema(), "schema");
+      PCollection<T> read =
+          input
+              
.apply(FileIO.matchAll().withConfiguration(getMatchConfiguration()))
+              
.apply(FileIO.readMatches().withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
+              .apply(readFiles(getRecordClass()));
+      return getInferBeamSchema() ? setBeamSchema(read, getRecordClass(), 
getSchema()) : read;
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .add(
+              DisplayData.item("inferBeamSchema", getInferBeamSchema())
+                  .withLabel("Infer Beam Schema"))
+          .addIfNotNull(DisplayData.item("schema", 
String.valueOf(getSchema())))
+          .addIfNotNull(DisplayData.item("recordClass", 
getRecordClass()).withLabel("Record Class"))
+          .include("matchConfiguration", getMatchConfiguration());
+    }
+  }
+
+  private static class CreateSourceFn<T>
+      implements SerializableFunction<String, FileBasedSource<T>> {
+    private final Class<T> recordClass;
+    private final Supplier<Schema> schemaSupplier;
+    private final org.apache.beam.sdk.io.AvroSource.DatumReaderFactory<T> 
readerFactory;
+
+    CreateSourceFn(
+        Class<T> recordClass,
+        String jsonSchema,
+        org.apache.beam.sdk.io.AvroSource.DatumReaderFactory<T> readerFactory) 
{
+      this.recordClass = recordClass;
+      this.schemaSupplier =
+          Suppliers.memoize(
+              Suppliers.compose(new JsonToSchema(), 
Suppliers.ofInstance(jsonSchema)));
+      this.readerFactory = readerFactory;
+    }
+
+    @Override
+    public FileBasedSource<T> apply(String input) {
+      return Read.createSource(
+          StaticValueProvider.of(input),
+          EmptyMatchTreatment.DISALLOW,
+          recordClass,
+          schemaSupplier.get(),
+          readerFactory);
+    }
+
+    private static class JsonToSchema implements Function<String, Schema>, 
Serializable {
+      @Override
+      public Schema apply(String input) {
+        return new Schema.Parser().parse(input);
+      }
+    }
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  /** Implementation of {@link #parseGenericRecords}. */
+  @AutoValue
+  public abstract static class Parse<T> extends PTransform<PBegin, 
PCollection<T>> {
+
+    abstract @Nullable ValueProvider<String> getFilepattern();
+
+    abstract MatchConfiguration getMatchConfiguration();
+
+    abstract SerializableFunction<GenericRecord, T> getParseFn();
+
+    abstract @Nullable Coder<T> getCoder();
+
+    abstract boolean getHintMatchesManyFiles();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setFilepattern(ValueProvider<String> filepattern);
+
+      abstract Builder<T> setMatchConfiguration(MatchConfiguration 
matchConfiguration);
+
+      abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> 
parseFn);
+
+      abstract Builder<T> setCoder(Coder<T> coder);
+
+      abstract Builder<T> setHintMatchesManyFiles(boolean 
hintMatchesManyFiles);
+
+      abstract Parse<T> build();
+    }
+
+    /** Reads from the given filename or filepattern. */
+    public Parse<T> from(String filepattern) {
+      return from(StaticValueProvider.of(filepattern));
+    }
+
+    /** Like {@link #from(String)}. */
+    public Parse<T> from(ValueProvider<String> filepattern) {
+      return toBuilder().setFilepattern(filepattern).build();
+    }
+
+    /** Sets the {@link MatchConfiguration}. */
+    public Parse<T> withMatchConfiguration(MatchConfiguration configuration) {
+      return toBuilder().setMatchConfiguration(configuration).build();
+    }
+
+    /** Like {@link Read#withEmptyMatchTreatment}. */
+    public Parse<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return 
withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment));
+    }
+
+    /** Like {@link Read#watchForNewFiles}. */
+    public Parse<T> watchForNewFiles(
+        Duration pollInterval, TerminationCondition<String, ?> 
terminationCondition) {
+      return withMatchConfiguration(
+          getMatchConfiguration().continuously(pollInterval, 
terminationCondition));
+    }
+
+    /** Sets a coder for the result of the parse function. */
+    public Parse<T> withCoder(Coder<T> coder) {
+      return toBuilder().setCoder(coder).build();
+    }
+
+    /** Like {@link Read#withHintMatchesManyFiles()}. */
+    public Parse<T> withHintMatchesManyFiles() {
+      return toBuilder().setHintMatchesManyFiles(true).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PBegin input) {
+      checkNotNull(getFilepattern(), "filepattern");
+      Coder<T> coder = inferCoder(getCoder(), getParseFn(), 
input.getPipeline().getCoderRegistry());
+
+      if (getMatchConfiguration().getWatchInterval() == null && 
!getHintMatchesManyFiles()) {
+        return input.apply(
+            org.apache.beam.sdk.io.Read.from(
+                org.apache.beam.sdk.io.AvroSource.from(getFilepattern())
+                    .withParseFn(getParseFn(), coder)));
+      }
+
+      // All other cases go through FileIO + ParseFilesGenericRecords.
+      return input
+          .apply("Create filepattern", Create.ofProvider(getFilepattern(), 
StringUtf8Coder.of()))
+          .apply("Match All", 
FileIO.matchAll().withConfiguration(getMatchConfiguration()))
+          .apply(
+              "Read Matches",
+              
FileIO.readMatches().withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
+          .apply("Via ParseFiles", 
parseFilesGenericRecords(getParseFn()).withCoder(coder));
+    }
+
+    private static <T> Coder<T> inferCoder(
+        @Nullable Coder<T> explicitCoder,
+        SerializableFunction<GenericRecord, T> parseFn,
+        CoderRegistry coderRegistry) {
+      if (explicitCoder != null) {
+        return explicitCoder;
+      }
+      // If a coder was not specified explicitly, infer it from parse fn.
+      try {
+        return coderRegistry.getCoder(TypeDescriptors.outputOf(parseFn));
+      } catch (CannotProvideCoderException e) {
+        throw new IllegalArgumentException(
+            "Unable to infer coder for output of parseFn. Specify it 
explicitly using withCoder().",
+            e);
+      }
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .addIfNotNull(
+              DisplayData.item("filePattern", 
getFilepattern()).withLabel("Input File Pattern"))
+          .add(DisplayData.item("parseFn", 
getParseFn().getClass()).withLabel("Parse function"))
+          .include("matchConfiguration", getMatchConfiguration());
+    }
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  /** Implementation of {@link #parseFilesGenericRecords}. */
+  @AutoValue
+  public abstract static class ParseFiles<T>
+      extends PTransform<PCollection<ReadableFile>, PCollection<T>> {
+    abstract SerializableFunction<GenericRecord, T> getParseFn();
+
+    abstract @Nullable Coder<T> getCoder();
+
+    abstract boolean getUsesReshuffle();
+
+    abstract ReadFileRangesFnExceptionHandler getFileExceptionHandler();
+
+    abstract long getDesiredBundleSizeBytes();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> 
parseFn);
+
+      abstract Builder<T> setCoder(Coder<T> coder);
+
+      abstract Builder<T> setUsesReshuffle(boolean usesReshuffle);
+
+      abstract Builder<T> setFileExceptionHandler(
+          ReadFileRangesFnExceptionHandler exceptionHandler);
+
+      abstract Builder<T> setDesiredBundleSizeBytes(long 
desiredBundleSizeBytes);
+
+      abstract ParseFiles<T> build();
+    }
+
+    /** Specifies the coder for the result of the {@code parseFn}. */
+    public ParseFiles<T> withCoder(Coder<T> coder) {
+      return toBuilder().setCoder(coder).build();
+    }
+
+    /** Specifies if a Reshuffle should run before file reads occur. */
+    @Experimental(Kind.FILESYSTEM)
+    public ParseFiles<T> withUsesReshuffle(boolean usesReshuffle) {
+      return toBuilder().setUsesReshuffle(usesReshuffle).build();
+    }
+
+    /** Specifies if exceptions should be logged only for streaming pipelines. 
*/
+    @Experimental(Kind.FILESYSTEM)
+    public ParseFiles<T> withFileExceptionHandler(
+        ReadFileRangesFnExceptionHandler exceptionHandler) {
+      return toBuilder().setFileExceptionHandler(exceptionHandler).build();
+    }
+
+    @VisibleForTesting
+    ParseFiles<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
+      return 
toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PCollection<ReadableFile> input) {
+      final Coder<T> coder =
+          Parse.inferCoder(getCoder(), getParseFn(), 
input.getPipeline().getCoderRegistry());
+      final SerializableFunction<GenericRecord, T> parseFn = getParseFn();
+      final SerializableFunction<String, FileBasedSource<T>> createSource =
+          new CreateParseSourceFn<>(parseFn, coder);
+      return input.apply(
+          "Parse Files via FileBasedSource",
+          new ReadAllViaFileBasedSource<>(
+              getDesiredBundleSizeBytes(),
+              createSource,
+              coder,
+              getUsesReshuffle(),
+              getFileExceptionHandler()));
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.add(DisplayData.item("parseFn", 
getParseFn().getClass()).withLabel("Parse function"));
+    }
+
+    private static class CreateParseSourceFn<T>
+        implements SerializableFunction<String, FileBasedSource<T>> {
+      private final SerializableFunction<GenericRecord, T> parseFn;
+      private final Coder<T> coder;
+
+      CreateParseSourceFn(SerializableFunction<GenericRecord, T> parseFn, 
Coder<T> coder) {
+        this.parseFn = parseFn;
+        this.coder = coder;
+      }
+
+      @Override
+      public FileBasedSource<T> apply(String input) {
+        return AvroSource.from(input).withParseFn(parseFn, coder);
+      }
+    }
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Implementation of {@link #parseAllGenericRecords}.
+   *
+   * @deprecated See {@link #parseAllGenericRecords(SerializableFunction)} for 
details.
+   */
+  @Deprecated
+  @AutoValue
+  public abstract static class ParseAll<T> extends 
PTransform<PCollection<String>, PCollection<T>> {
+    abstract MatchConfiguration getMatchConfiguration();
+
+    abstract SerializableFunction<GenericRecord, T> getParseFn();
+
+    abstract @Nullable Coder<T> getCoder();
+
+    abstract long getDesiredBundleSizeBytes();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setMatchConfiguration(MatchConfiguration 
matchConfiguration);
+
+      abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> 
parseFn);
+
+      abstract Builder<T> setCoder(Coder<T> coder);
+
+      abstract Builder<T> setDesiredBundleSizeBytes(long 
desiredBundleSizeBytes);
+
+      abstract ParseAll<T> build();
+    }
+
+    /** Sets the {@link MatchConfiguration}. */
+    public ParseAll<T> withMatchConfiguration(MatchConfiguration 
configuration) {
+      return toBuilder().setMatchConfiguration(configuration).build();
+    }
+
+    /** Like {@link Read#withEmptyMatchTreatment}. */
+    public ParseAll<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return 
withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment));
+    }
+
+    /** Like {@link Read#watchForNewFiles(Duration, TerminationCondition, 
boolean)}. */
+    public ParseAll<T> watchForNewFiles(
+        Duration pollInterval,
+        TerminationCondition<String, ?> terminationCondition,
+        boolean matchUpdatedFiles) {
+      return withMatchConfiguration(
+          getMatchConfiguration()
+              .continuously(pollInterval, terminationCondition, 
matchUpdatedFiles));
+    }
+
+    /** Like {@link Read#watchForNewFiles(Duration, TerminationCondition)}. */
+    public ParseAll<T> watchForNewFiles(
+        Duration pollInterval, TerminationCondition<String, ?> 
terminationCondition) {
+      return watchForNewFiles(pollInterval, terminationCondition, false);
+    }
+
+    /** Specifies the coder for the result of the {@code parseFn}. */
+    public ParseAll<T> withCoder(Coder<T> coder) {
+      return toBuilder().setCoder(coder).build();
+    }
+
+    @VisibleForTesting
+    ParseAll<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
+      return 
toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PCollection<String> input) {
+      return input
+          .apply(FileIO.matchAll().withConfiguration(getMatchConfiguration()))
+          
.apply(FileIO.readMatches().withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
+          .apply(
+              "Parse all via FileBasedSource",
+              parseFilesGenericRecords(getParseFn()).withCoder(getCoder()));
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .add(DisplayData.item("parseFn", 
getParseFn().getClass()).withLabel("Parse function"))
+          .include("matchConfiguration", getMatchConfiguration());
+    }
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  /** Implementation of {@link #write}. */
+  @AutoValue
+  public abstract static class TypedWrite<UserT, DestinationT, OutputT>

Review Comment:
   I believe it was just added recently and it was not synced to the new class. 
I'll check the other classes and we have to do this before deprecation of 
"core" avro classes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to