damondouglas commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1174144957


##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/AvroReadSchemaTransformFormatProvider.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.extensions.avro.io.AvroIO;
+import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.io.FileIO.ReadableFile;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@AutoService(FileReadSchemaTransformFormatProvider.class)
+public class AvroReadSchemaTransformFormatProvider
+    implements FileReadSchemaTransformFormatProvider {
+  @Override
+  public String identifier() {
+    return "avro";
+  }
+
+  @Override
+  public PTransform<PCollection<ReadableFile>, PCollection<Row>> 
buildTransform(
+      FileReadSchemaTransformConfiguration configuration) {
+
+    return new PTransform<PCollection<ReadableFile>, PCollection<Row>>() {
+      @Override
+      public PCollection<Row> expand(PCollection<ReadableFile> input) {
+        Schema beamSchema =
+            AvroUtils.toBeamSchema(
+                new 
org.apache.avro.Schema.Parser().parse(configuration.getSchema()));

Review Comment:
   Consider `configuration.getSafeSchema()` from 
[comment](https://github.com/apache/beam/pull/25927#discussion_r1174141095).



##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/ParquetReadSchemaTransformFormatProvider.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.io.FileIO.ReadableFile;
+import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@AutoService(FileReadSchemaTransformFormatProvider.class)
+public class ParquetReadSchemaTransformFormatProvider
+    implements FileReadSchemaTransformFormatProvider {
+  @Override
+  public String identifier() {
+    return "parquet";
+  }
+
+  @Override
+  public PTransform<PCollection<ReadableFile>, PCollection<Row>> 
buildTransform(
+      FileReadSchemaTransformConfiguration configuration) {
+
+    return new PTransform<PCollection<ReadableFile>, PCollection<Row>>() {
+      @Override
+      public PCollection<Row> expand(PCollection<ReadableFile> input) {
+        org.apache.avro.Schema avroSchema =
+            new 
org.apache.avro.Schema.Parser().parse(configuration.getSchema());

Review Comment:
   In the context of this 
[comment](https://github.com/apache/beam/pull/25927#discussion_r1174141095) may 
we consider `configuration.getSafeSchema()`?



##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformProvider.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.schemas.io.Providers;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Watch.Growth;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CharStreams;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@AutoService(SchemaTransformProvider.class)
+public class FileReadSchemaTransformProvider
+    extends TypedSchemaTransformProvider<FileReadSchemaTransformConfiguration> 
{
+  private static final Logger LOG = 
LoggerFactory.getLogger(FileReadSchemaTransformProvider.class);
+  private static final String IDENTIFIER = 
"beam:schematransform:org.apache.beam:file_read:v1";
+  static final String INPUT_TAG = "input";
+  static final String OUTPUT_TAG = "output";
+
+  @Override
+  protected Class<FileReadSchemaTransformConfiguration> configurationClass() {
+    return FileReadSchemaTransformConfiguration.class;
+  }
+
+  @Override
+  protected SchemaTransform from(FileReadSchemaTransformConfiguration 
configuration) {
+    return new FileReadSchemaTransform(configuration);
+  }
+
+  @Override
+  public String identifier() {
+    return IDENTIFIER;
+  }
+
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT_TAG);
+  }
+
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  private static class FileReadSchemaTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> implements 
SchemaTransform {
+    private FileReadSchemaTransformConfiguration configuration;
+
+    FileReadSchemaTransform(FileReadSchemaTransformConfiguration 
configuration) {
+      configuration.validate();
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      checkArgument(
+          input.getAll().isEmpty() ^ 
Strings.isNullOrEmpty(configuration.getFilepattern()),
+          "Either an input PCollection of file patterns or the filepattern 
parameter must be set,"
+              + "but not both.");
+
+      // Input schema can be a schema String or a path to a file containing 
the schema
+      // Resolve to get a schema String
+      String schema = configuration.getSchema();
+      if (!Strings.isNullOrEmpty(schema)) {
+        schema = resolveSchemaStringOrFilePath(schema);
+        configuration = configuration.toBuilder().setSchema(schema).build();
+      }
+
+      PCollection<MatchResult.Metadata> files;
+      if (!Strings.isNullOrEmpty(configuration.getFilepattern())) {
+        Pipeline p = input.getPipeline();
+        FileIO.Match matchFiles = 
FileIO.match().filepattern(configuration.getFilepattern());
+        // Handle streaming case
+        matchFiles = (FileIO.Match) maybeApplyStreaming(matchFiles);
+
+        files = p.apply(matchFiles);
+      } else {
+        FileIO.MatchAll matchAllFiles = FileIO.matchAll();
+        // Handle streaming case
+        matchAllFiles = (FileIO.MatchAll) maybeApplyStreaming(matchAllFiles);
+
+        files =
+            input
+                .get(INPUT_TAG)
+                .apply(
+                    "Get filepatterns",
+                    MapElements.into(TypeDescriptors.strings())
+                        .via((row) -> row.getString("filepattern")))
+                .apply("Match files", matchAllFiles);
+      }
+
+      // Pass readable files to the appropriate source and output rows.
+      PCollection<Row> output =
+          files
+              .apply(FileIO.readMatches())
+              .apply("Read files", 
getProvider().buildTransform(configuration));
+
+      return PCollectionRowTuple.of(OUTPUT_TAG, output);
+    }
+
+    public PTransform<?, PCollection<MatchResult.Metadata>> 
maybeApplyStreaming(
+        PTransform<?, PCollection<MatchResult.Metadata>> matchTransform) {
+      // Two parameters are provided to configure watching for new files.
+      Long terminateAfterSeconds = 
configuration.getTerminateAfterSecondsSinceNewOutput();
+      Long pollIntervalMillis = configuration.getPollIntervalMillis();
+
+      // Streaming is enabled when a poll interval is provided
+      if (pollIntervalMillis != null && pollIntervalMillis > 0L) {
+        Duration pollDuration = Duration.millis(pollIntervalMillis);
+
+        // By default, the file match transform never terminates
+        TerminationCondition<String, ?> terminationCondition = Growth.never();
+
+        // If provided, will terminate after this many seconds since seeing a 
new file
+        if (terminateAfterSeconds != null && terminateAfterSeconds > 0L) {
+          terminationCondition =
+              
Growth.afterTimeSinceNewOutput(Duration.standardSeconds(terminateAfterSeconds));
+        }
+
+        // Apply watch for new files
+        if (matchTransform instanceof FileIO.Match) {
+          matchTransform =
+              ((FileIO.Match) matchTransform).continuously(pollDuration, 
terminationCondition);
+        } else if (matchTransform instanceof FileIO.MatchAll) {
+          matchTransform =
+              ((FileIO.MatchAll) matchTransform).continuously(pollDuration, 
terminationCondition);
+        }
+      }
+      return matchTransform;
+    }
+
+    public String resolveSchemaStringOrFilePath(String schema) {

Review Comment:
   Instead of this method, may we consider an additional configuration field 
and throwing an error when the user provides both the Schema and the Schema 
file source property?  It took me some time to realize that the configuration's 
"Schema" property could be either a Schema or a file path.  Having a more 
declared intent by the user from where the Schema derives would simplify the 
code.



##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/AvroReadSchemaTransformFormatProvider.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.extensions.avro.io.AvroIO;
+import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.io.FileIO.ReadableFile;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@AutoService(FileReadSchemaTransformFormatProvider.class)
+public class AvroReadSchemaTransformFormatProvider
+    implements FileReadSchemaTransformFormatProvider {
+  @Override
+  public String identifier() {
+    return "avro";
+  }
+
+  @Override
+  public PTransform<PCollection<ReadableFile>, PCollection<Row>> 
buildTransform(
+      FileReadSchemaTransformConfiguration configuration) {
+
+    return new PTransform<PCollection<ReadableFile>, PCollection<Row>>() {
+      @Override
+      public PCollection<Row> expand(PCollection<ReadableFile> input) {
+        Schema beamSchema =
+            AvroUtils.toBeamSchema(
+                new 
org.apache.avro.Schema.Parser().parse(configuration.getSchema()));
+
+        return input
+            
.apply(AvroIO.readFilesGenericRecords(configuration.getSchema()).withBeamSchemas(true))

Review Comment:
   Consider `configuration.getSafeSchema()` from 
[comment](https://github.com/apache/beam/pull/25927#discussion_r1174141095).



##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/JsonReadSchemaTransformFormatProvider.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import com.google.auto.service.AutoService;
+import java.nio.charset.StandardCharsets;
+import org.apache.beam.sdk.io.FileIO.ReadableFile;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.payloads.JsonPayloadSerializerProvider;
+import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
+import org.apache.beam.sdk.schemas.utils.JsonUtils;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/** A {@link FileReadSchemaTransformFormatProvider} that reads 
newline-delimited JSONs. */
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@AutoService(FileReadSchemaTransformFormatProvider.class)
+public class JsonReadSchemaTransformFormatProvider
+    implements FileReadSchemaTransformFormatProvider {
+  @Override
+  public String identifier() {
+    return "json";
+  }
+
+  @Override
+  public PTransform<PCollection<ReadableFile>, PCollection<Row>> 
buildTransform(
+      FileReadSchemaTransformConfiguration configuration) {
+
+    return new PTransform<PCollection<ReadableFile>, PCollection<Row>>() {
+      @Override
+      public PCollection<Row> expand(PCollection<ReadableFile> input) {
+        Schema beamSchema = 
JsonUtils.beamSchemaFromJsonSchema(configuration.getSchema());

Review Comment:
   In the context of this 
[comment](https://github.com/apache/beam/pull/25927#discussion_r1174141095) may 
we consider `configuration.getSafeSchema()`?



##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformProvider.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.schemas.io.Providers;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Watch.Growth;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CharStreams;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@AutoService(SchemaTransformProvider.class)
+public class FileReadSchemaTransformProvider
+    extends TypedSchemaTransformProvider<FileReadSchemaTransformConfiguration> 
{
+  private static final Logger LOG = 
LoggerFactory.getLogger(FileReadSchemaTransformProvider.class);
+  private static final String IDENTIFIER = 
"beam:schematransform:org.apache.beam:file_read:v1";
+  static final String INPUT_TAG = "input";
+  static final String OUTPUT_TAG = "output";
+
+  @Override
+  protected Class<FileReadSchemaTransformConfiguration> configurationClass() {
+    return FileReadSchemaTransformConfiguration.class;
+  }
+
+  @Override
+  protected SchemaTransform from(FileReadSchemaTransformConfiguration 
configuration) {
+    return new FileReadSchemaTransform(configuration);
+  }
+
+  @Override
+  public String identifier() {
+    return IDENTIFIER;
+  }
+
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT_TAG);
+  }
+
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  private static class FileReadSchemaTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> implements 
SchemaTransform {
+    private FileReadSchemaTransformConfiguration configuration;
+
+    FileReadSchemaTransform(FileReadSchemaTransformConfiguration 
configuration) {
+      configuration.validate();
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      checkArgument(
+          input.getAll().isEmpty() ^ 
Strings.isNullOrEmpty(configuration.getFilepattern()),
+          "Either an input PCollection of file patterns or the filepattern 
parameter must be set,"
+              + "but not both.");
+
+      // Input schema can be a schema String or a path to a file containing 
the schema
+      // Resolve to get a schema String
+      String schema = configuration.getSchema();
+      if (!Strings.isNullOrEmpty(schema)) {
+        schema = resolveSchemaStringOrFilePath(schema);
+        configuration = configuration.toBuilder().setSchema(schema).build();
+      }
+
+      PCollection<MatchResult.Metadata> files;
+      if (!Strings.isNullOrEmpty(configuration.getFilepattern())) {
+        Pipeline p = input.getPipeline();
+        FileIO.Match matchFiles = 
FileIO.match().filepattern(configuration.getFilepattern());
+        // Handle streaming case
+        matchFiles = (FileIO.Match) maybeApplyStreaming(matchFiles);
+
+        files = p.apply(matchFiles);
+      } else {
+        FileIO.MatchAll matchAllFiles = FileIO.matchAll();
+        // Handle streaming case
+        matchAllFiles = (FileIO.MatchAll) maybeApplyStreaming(matchAllFiles);
+
+        files =
+            input
+                .get(INPUT_TAG)
+                .apply(
+                    "Get filepatterns",
+                    MapElements.into(TypeDescriptors.strings())
+                        .via((row) -> row.getString("filepattern")))

Review Comment:
   1) In the context of this 
[comment](https://github.com/apache/beam/pull/25927#discussion_r1174141095), 
this could change to `.via((row) -> 
Objects.requireNonNull(row.getString("filepattern"))))`
   
   2) Instead of hardcoding things like "filepattern", could we instead have 
these constants just below the class declaration as `static final String ...`?
   
   3) Should we check that the incoming row's Schema adhere's to having the 
`FieldType.STRING` field with the name "filepattern" and tests to show that 
meaningful errors display to the user about this expectation?



##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformProvider.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.schemas.io.Providers;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Watch.Growth;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CharStreams;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@AutoService(SchemaTransformProvider.class)
+public class FileReadSchemaTransformProvider
+    extends TypedSchemaTransformProvider<FileReadSchemaTransformConfiguration> 
{
+  private static final Logger LOG = 
LoggerFactory.getLogger(FileReadSchemaTransformProvider.class);
+  private static final String IDENTIFIER = 
"beam:schematransform:org.apache.beam:file_read:v1";
+  static final String INPUT_TAG = "input";
+  static final String OUTPUT_TAG = "output";
+
+  @Override
+  protected Class<FileReadSchemaTransformConfiguration> configurationClass() {
+    return FileReadSchemaTransformConfiguration.class;
+  }
+
+  @Override
+  protected SchemaTransform from(FileReadSchemaTransformConfiguration 
configuration) {
+    return new FileReadSchemaTransform(configuration);
+  }
+
+  @Override
+  public String identifier() {
+    return IDENTIFIER;
+  }
+
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT_TAG);
+  }
+
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  private static class FileReadSchemaTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> implements 
SchemaTransform {
+    private FileReadSchemaTransformConfiguration configuration;
+
+    FileReadSchemaTransform(FileReadSchemaTransformConfiguration 
configuration) {
+      configuration.validate();
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      checkArgument(
+          input.getAll().isEmpty() ^ 
Strings.isNullOrEmpty(configuration.getFilepattern()),
+          "Either an input PCollection of file patterns or the filepattern 
parameter must be set,"
+              + "but not both.");
+
+      // Input schema can be a schema String or a path to a file containing 
the schema
+      // Resolve to get a schema String
+      String schema = configuration.getSchema();
+      if (!Strings.isNullOrEmpty(schema)) {
+        schema = resolveSchemaStringOrFilePath(schema);
+        configuration = configuration.toBuilder().setSchema(schema).build();
+      }
+
+      PCollection<MatchResult.Metadata> files;
+      if (!Strings.isNullOrEmpty(configuration.getFilepattern())) {
+        Pipeline p = input.getPipeline();
+        FileIO.Match matchFiles = 
FileIO.match().filepattern(configuration.getFilepattern());
+        // Handle streaming case
+        matchFiles = (FileIO.Match) maybeApplyStreaming(matchFiles);
+
+        files = p.apply(matchFiles);
+      } else {
+        FileIO.MatchAll matchAllFiles = FileIO.matchAll();
+        // Handle streaming case
+        matchAllFiles = (FileIO.MatchAll) maybeApplyStreaming(matchAllFiles);
+
+        files =
+            input
+                .get(INPUT_TAG)
+                .apply(
+                    "Get filepatterns",
+                    MapElements.into(TypeDescriptors.strings())
+                        .via((row) -> row.getString("filepattern")))
+                .apply("Match files", matchAllFiles);
+      }
+
+      // Pass readable files to the appropriate source and output rows.
+      PCollection<Row> output =
+          files
+              .apply(FileIO.readMatches())
+              .apply("Read files", 
getProvider().buildTransform(configuration));
+
+      return PCollectionRowTuple.of(OUTPUT_TAG, output);
+    }
+
+    public PTransform<?, PCollection<MatchResult.Metadata>> 
maybeApplyStreaming(
+        PTransform<?, PCollection<MatchResult.Metadata>> matchTransform) {
+      // Two parameters are provided to configure watching for new files.
+      Long terminateAfterSeconds = 
configuration.getTerminateAfterSecondsSinceNewOutput();
+      Long pollIntervalMillis = configuration.getPollIntervalMillis();
+
+      // Streaming is enabled when a poll interval is provided
+      if (pollIntervalMillis != null && pollIntervalMillis > 0L) {
+        Duration pollDuration = Duration.millis(pollIntervalMillis);
+
+        // By default, the file match transform never terminates
+        TerminationCondition<String, ?> terminationCondition = Growth.never();
+
+        // If provided, will terminate after this many seconds since seeing a 
new file
+        if (terminateAfterSeconds != null && terminateAfterSeconds > 0L) {
+          terminationCondition =
+              
Growth.afterTimeSinceNewOutput(Duration.standardSeconds(terminateAfterSeconds));
+        }
+
+        // Apply watch for new files
+        if (matchTransform instanceof FileIO.Match) {
+          matchTransform =
+              ((FileIO.Match) matchTransform).continuously(pollDuration, 
terminationCondition);
+        } else if (matchTransform instanceof FileIO.MatchAll) {
+          matchTransform =
+              ((FileIO.MatchAll) matchTransform).continuously(pollDuration, 
terminationCondition);
+        }

Review Comment:
   Instead of these two conditionals, in addition to the conditionals above as 
well as the conditional from which this method is called, may we consider a 
`buildMatchTransform` method that performs these checks and returns a 
`PTransform<PCollection<String>, PCollection<MatchResult.Metadata>>`?  The 
conditionals could be flattened making it easier to troubleshoot and test.  
And, on the subject of testing either this method or `buildMatchTransform` 
pattern, may we see more test coverage?  Finally, does the method need to be 
public and not package private?



##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformProvider.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.schemas.io.Providers;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Watch.Growth;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CharStreams;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@AutoService(SchemaTransformProvider.class)
+public class FileReadSchemaTransformProvider
+    extends TypedSchemaTransformProvider<FileReadSchemaTransformConfiguration> 
{
+  private static final Logger LOG = 
LoggerFactory.getLogger(FileReadSchemaTransformProvider.class);
+  private static final String IDENTIFIER = 
"beam:schematransform:org.apache.beam:file_read:v1";
+  static final String INPUT_TAG = "input";
+  static final String OUTPUT_TAG = "output";
+
+  @Override
+  protected Class<FileReadSchemaTransformConfiguration> configurationClass() {
+    return FileReadSchemaTransformConfiguration.class;
+  }
+
+  @Override
+  protected SchemaTransform from(FileReadSchemaTransformConfiguration 
configuration) {
+    return new FileReadSchemaTransform(configuration);
+  }
+
+  @Override
+  public String identifier() {
+    return IDENTIFIER;
+  }
+
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT_TAG);
+  }
+
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  private static class FileReadSchemaTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> implements 
SchemaTransform {
+    private FileReadSchemaTransformConfiguration configuration;
+
+    FileReadSchemaTransform(FileReadSchemaTransformConfiguration 
configuration) {
+      configuration.validate();
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      checkArgument(
+          input.getAll().isEmpty() ^ 
Strings.isNullOrEmpty(configuration.getFilepattern()),
+          "Either an input PCollection of file patterns or the filepattern 
parameter must be set,"
+              + "but not both.");
+
+      // Input schema can be a schema String or a path to a file containing 
the schema
+      // Resolve to get a schema String
+      String schema = configuration.getSchema();
+      if (!Strings.isNullOrEmpty(schema)) {
+        schema = resolveSchemaStringOrFilePath(schema);
+        configuration = configuration.toBuilder().setSchema(schema).build();
+      }
+
+      PCollection<MatchResult.Metadata> files;
+      if (!Strings.isNullOrEmpty(configuration.getFilepattern())) {
+        Pipeline p = input.getPipeline();
+        FileIO.Match matchFiles = 
FileIO.match().filepattern(configuration.getFilepattern());
+        // Handle streaming case
+        matchFiles = (FileIO.Match) maybeApplyStreaming(matchFiles);
+
+        files = p.apply(matchFiles);
+      } else {
+        FileIO.MatchAll matchAllFiles = FileIO.matchAll();
+        // Handle streaming case
+        matchAllFiles = (FileIO.MatchAll) maybeApplyStreaming(matchAllFiles);
+
+        files =
+            input
+                .get(INPUT_TAG)
+                .apply(
+                    "Get filepatterns",
+                    MapElements.into(TypeDescriptors.strings())
+                        .via((row) -> row.getString("filepattern")))
+                .apply("Match files", matchAllFiles);
+      }
+
+      // Pass readable files to the appropriate source and output rows.
+      PCollection<Row> output =
+          files
+              .apply(FileIO.readMatches())
+              .apply("Read files", 
getProvider().buildTransform(configuration));
+
+      return PCollectionRowTuple.of(OUTPUT_TAG, output);
+    }
+
+    public PTransform<?, PCollection<MatchResult.Metadata>> 
maybeApplyStreaming(
+        PTransform<?, PCollection<MatchResult.Metadata>> matchTransform) {
+      // Two parameters are provided to configure watching for new files.
+      Long terminateAfterSeconds = 
configuration.getTerminateAfterSecondsSinceNewOutput();
+      Long pollIntervalMillis = configuration.getPollIntervalMillis();
+
+      // Streaming is enabled when a poll interval is provided
+      if (pollIntervalMillis != null && pollIntervalMillis > 0L) {
+        Duration pollDuration = Duration.millis(pollIntervalMillis);
+
+        // By default, the file match transform never terminates
+        TerminationCondition<String, ?> terminationCondition = Growth.never();
+
+        // If provided, will terminate after this many seconds since seeing a 
new file
+        if (terminateAfterSeconds != null && terminateAfterSeconds > 0L) {
+          terminationCondition =
+              
Growth.afterTimeSinceNewOutput(Duration.standardSeconds(terminateAfterSeconds));
+        }
+
+        // Apply watch for new files
+        if (matchTransform instanceof FileIO.Match) {
+          matchTransform =
+              ((FileIO.Match) matchTransform).continuously(pollDuration, 
terminationCondition);
+        } else if (matchTransform instanceof FileIO.MatchAll) {
+          matchTransform =
+              ((FileIO.MatchAll) matchTransform).continuously(pollDuration, 
terminationCondition);
+        }
+      }
+      return matchTransform;
+    }
+
+    public String resolveSchemaStringOrFilePath(String schema) {
+      try {
+        MatchResult result;
+        try {
+          LOG.info("Attempting to locate input schema as a file path.");
+          result = FileSystems.match(schema);
+          // While some FileSystem implementations throw an 
IllegalArgumentException when matching
+          // invalid file paths, others will return a status ERROR or 
NOT_FOUND. The following check
+          // will throw an IllegalArgumentException and take us to the catch 
block.
+          checkArgument(result.status() == MatchResult.Status.OK);
+        } catch (IllegalArgumentException e) {
+          LOG.info(
+              "Input schema is not a valid file path. Will attempt to use it 
as a schema string.");
+          return schema;
+        }
+        checkArgument(
+            !result.metadata().isEmpty(),
+            "Failed to match any files for the input schema file path.");
+        List<ResourceId> resource =
+            result.metadata().stream()
+                .map(MatchResult.Metadata::resourceId)
+                .collect(Collectors.toList());
+
+        checkArgument(
+            resource.size() == 1,
+            "Expected exactly 1 schema file, but got " + resource.size() + " 
files.");
+
+        ReadableByteChannel byteChannel = FileSystems.open(resource.get(0));
+        Reader reader = Channels.newReader(byteChannel, UTF_8.name());
+        return CharStreams.toString(reader);
+      } catch (IOException e) {
+        throw new RuntimeException("Error when parsing input schema file: ", 
e);
+      }
+    }
+
+    private FileReadSchemaTransformFormatProvider getProvider() {
+      String format = configuration.getFormat();
+      Map<String, FileReadSchemaTransformFormatProvider> providers =
+          Providers.loadProviders(FileReadSchemaTransformFormatProvider.class);
+      checkArgument(
+          providers.containsKey(format),
+          String.format(
+              "Received unsupported file format: %s. Supported formats are %s",
+              format, providers.keySet()));
+
+      return providers.get(format);

Review Comment:
   After removing the SuppressWarnings annotation (See 
[comment](https://github.com/apache/beam/pull/25927#discussion_r1174141095)), I 
was able to get check to pass with this:
   ```
   Optional<FileReadSchemaTransformFormatProvider> provider =
             Optional.ofNullable(providers.get(format));
   
   checkState(
       provider.isPresent(),
       String.format(
           "Received unsupported file format: %s. Supported formats are %s",
           format, providers.keySet()));
   
   return provider.get();
   ```



##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/LineReadSchemaTransformFormatProvider.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.io.FileIO.ReadableFile;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+
+/** A {@link FileReadSchemaTransformFormatProvider} that reads lines as 
Strings. */
+@AutoService(FileReadSchemaTransformFormatProvider.class)
+public class LineReadSchemaTransformFormatProvider
+    implements FileReadSchemaTransformFormatProvider {
+  @Override
+  public String identifier() {
+    return "line";
+  }
+
+  @Override
+  public PTransform<PCollection<ReadableFile>, PCollection<Row>> 
buildTransform(
+      FileReadSchemaTransformConfiguration configuration) {
+
+    return new PTransform<PCollection<ReadableFile>, PCollection<Row>>() {
+      @Override
+      public PCollection<Row> expand(PCollection<ReadableFile> input) {
+        Schema lineSchema = Schema.builder().addStringField("line").build();

Review Comment:
   Could we move this to a `static final Schema LINE_SCHEMA = ...` between 
lines 33 and 34?



##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformProvider.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.schemas.io.Providers;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Watch.Growth;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CharStreams;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@AutoService(SchemaTransformProvider.class)
+public class FileReadSchemaTransformProvider
+    extends TypedSchemaTransformProvider<FileReadSchemaTransformConfiguration> 
{
+  private static final Logger LOG = 
LoggerFactory.getLogger(FileReadSchemaTransformProvider.class);
+  private static final String IDENTIFIER = 
"beam:schematransform:org.apache.beam:file_read:v1";
+  static final String INPUT_TAG = "input";
+  static final String OUTPUT_TAG = "output";
+
+  @Override
+  protected Class<FileReadSchemaTransformConfiguration> configurationClass() {
+    return FileReadSchemaTransformConfiguration.class;
+  }
+
+  @Override
+  protected SchemaTransform from(FileReadSchemaTransformConfiguration 
configuration) {
+    return new FileReadSchemaTransform(configuration);
+  }
+
+  @Override
+  public String identifier() {
+    return IDENTIFIER;
+  }
+
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT_TAG);
+  }
+
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  private static class FileReadSchemaTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> implements 
SchemaTransform {
+    private FileReadSchemaTransformConfiguration configuration;
+
+    FileReadSchemaTransform(FileReadSchemaTransformConfiguration 
configuration) {
+      configuration.validate();
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      checkArgument(
+          input.getAll().isEmpty() ^ 
Strings.isNullOrEmpty(configuration.getFilepattern()),
+          "Either an input PCollection of file patterns or the filepattern 
parameter must be set,"
+              + "but not both.");
+
+      // Input schema can be a schema String or a path to a file containing 
the schema
+      // Resolve to get a schema String
+      String schema = configuration.getSchema();

Review Comment:
   In this 
[comment](https://github.com/apache/beam/pull/25927#discussion_r1174141095)'s 
context, could we do this? I agree it looks like a game with the checker 
framework but the benefit of having the checker framework and removing the 
SuppressWarnings outweighs the cost of this I'd argue. Even if one checks 
nullorempty in a prior step and still calls getSchema, the checker framework 
complains.
   ```
   if (!Strings.isNullOrEmpty(configuration.getSchema())) {
           String schema = configuration.getSafeSchema();
   ...
   }
   ```



##########
sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformFormatProviderTest.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static 
org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.ALL_PRIMITIVE_DATA_TYPES_SCHEMA;
+import static 
org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.ARRAY_PRIMITIVE_DATA_TYPES_SCHEMA;
+import static 
org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.BYTE_SEQUENCE_TYPE_SCHEMA;
+import static 
org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.BYTE_TYPE_SCHEMA;
+import static 
org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA;
+import static 
org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.SINGLY_NESTED_DATA_TYPES_SCHEMA;
+import static 
org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.TIME_CONTAINING_SCHEMA;
+import static 
org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformConfiguration.parquetConfigurationBuilder;
+import static 
org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviderTestData.DATA;
+import static org.junit.Assume.assumeTrue;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+
+public abstract class FileReadSchemaTransformFormatProviderTest {
+
+  /** Returns the format of the {@linke 
FileReadSchemaTransformFormatProviderTest} subclass. */
+  protected abstract String getFormat();
+
+  /** Given a Beam Schema, returns the relevant source's String schema 
representation. */
+  protected abstract String getStringSchemaFromBeamSchema(Schema beamSchema);
+
+  /**
+   * Writes {@link Row}s to files then reads from those files. Performs a 
{@link
+   * org.apache.beam.sdk.testing.PAssert} check to validate the written and 
read {@link Row}s are
+   * equal.
+   */
+  protected abstract void runWriteAndReadTest(
+      Schema schema, List<Row> rows, String filePath, String schemaFilePath);
+
+  @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+  @Rule public TestPipeline writePipeline = TestPipeline.create();
+  @Rule public TestPipeline readPipeline = TestPipeline.create();
+
+  @Rule public TestName testName = new TestName();
+
+  protected Schema getFilepatternSchema() {
+    return Schema.of(Field.of("filepattern", FieldType.STRING));
+  }
+
+  protected String getFilePath() {
+    return getFolder() + "/test";
+  }
+
+  protected String getFolder() {
+    try {
+      return tmpFolder.newFolder(getFormat(), 
testName.getMethodName()).getAbsolutePath();
+    } catch (IOException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  @Test
+  public void testAllPrimitiveDataTypes() {
+    Schema schema = ALL_PRIMITIVE_DATA_TYPES_SCHEMA;
+    List<Row> rows = DATA.allPrimitiveDataTypesRows;
+    String filePath = getFilePath();
+
+    runWriteAndReadTest(schema, rows, filePath, null);
+  }
+
+  @Test
+  public void testNullableAllPrimitiveDataTypes() {
+    Schema schema = NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA;
+    List<Row> rows = DATA.nullableAllPrimitiveDataTypesRows;
+    String filePath = getFilePath();
+
+    runWriteAndReadTest(schema, rows, filePath, null);
+  }
+
+  @Test
+  public void testTimeContaining() {
+    // JSON schemas don't support DATETIME or other logical types
+    assumeTrue(!getFormat().equals("json"));
+
+    Schema schema = TIME_CONTAINING_SCHEMA;
+    List<Row> rows = DATA.timeContainingRows;
+    String filePath = getFilePath();
+
+    runWriteAndReadTest(schema, rows, filePath, null);
+  }
+
+  @Test
+  public void testByteType() {
+    List<String> formatsThatSupportSingleByteType = Arrays.asList("csv", 
"json", "xml");
+    assumeTrue(formatsThatSupportSingleByteType.contains(getFormat()));
+
+    Schema schema = BYTE_TYPE_SCHEMA;
+    List<Row> rows = DATA.byteTypeRows;
+    String filePath = getFilePath();
+
+    runWriteAndReadTest(schema, rows, filePath, null);
+  }
+
+  @Test
+  public void testByteSequenceType() {
+    List<String> formatsThatSupportByteSequenceType = Arrays.asList("avro", 
"parquet");
+    assumeTrue(formatsThatSupportByteSequenceType.contains(getFormat()));
+
+    Schema schema = BYTE_SEQUENCE_TYPE_SCHEMA;
+    List<Row> rows = DATA.byteSequenceTypeRows;
+    String filePath = getFilePath();
+
+    runWriteAndReadTest(schema, rows, filePath, null);
+  }
+
+  @Test
+  public void testArrayPrimitiveDataTypes() {
+    Schema schema = ARRAY_PRIMITIVE_DATA_TYPES_SCHEMA;
+    List<Row> rows = DATA.arrayPrimitiveDataTypesRows;
+    String filePath = getFilePath();
+
+    runWriteAndReadTest(schema, rows, filePath, null);
+  }
+
+  @Test
+  public void testNestedRepeatedDataTypes() {

Review Comment:
   Could we see a test for the doublyNestedDataTypesRepeatRows?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to