damondouglas commented on code in PR #25927: URL: https://github.com/apache/beam/pull/25927#discussion_r1174144957
########## sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/AvroReadSchemaTransformFormatProvider.java: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.fileschematransform; + +import com.google.auto.service.AutoService; +import org.apache.beam.sdk.extensions.avro.io.AvroIO; +import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils; +import org.apache.beam.sdk.io.FileIO.ReadableFile; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptors; + +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +@AutoService(FileReadSchemaTransformFormatProvider.class) +public class AvroReadSchemaTransformFormatProvider + implements FileReadSchemaTransformFormatProvider { + @Override + public String identifier() { + return "avro"; + } + + @Override + public PTransform<PCollection<ReadableFile>, PCollection<Row>> buildTransform( + FileReadSchemaTransformConfiguration configuration) { + + return new PTransform<PCollection<ReadableFile>, PCollection<Row>>() { + @Override + public PCollection<Row> expand(PCollection<ReadableFile> input) { + Schema beamSchema = + AvroUtils.toBeamSchema( + new org.apache.avro.Schema.Parser().parse(configuration.getSchema())); Review Comment: Consider `configuration.getSafeSchema()` from [comment](https://github.com/apache/beam/pull/25927#discussion_r1174141095). ########## sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/ParquetReadSchemaTransformFormatProvider.java: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.fileschematransform; + +import com.google.auto.service.AutoService; +import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils; +import org.apache.beam.sdk.io.FileIO.ReadableFile; +import org.apache.beam.sdk.io.parquet.ParquetIO; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptors; + +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +@AutoService(FileReadSchemaTransformFormatProvider.class) +public class ParquetReadSchemaTransformFormatProvider + implements FileReadSchemaTransformFormatProvider { + @Override + public String identifier() { + return "parquet"; + } + + @Override + public PTransform<PCollection<ReadableFile>, PCollection<Row>> buildTransform( + FileReadSchemaTransformConfiguration configuration) { + + return new PTransform<PCollection<ReadableFile>, PCollection<Row>>() { + @Override + public PCollection<Row> expand(PCollection<ReadableFile> input) { + org.apache.avro.Schema avroSchema = + new org.apache.avro.Schema.Parser().parse(configuration.getSchema()); Review Comment: In the context of this [comment](https://github.com/apache/beam/pull/25927#discussion_r1174141095) may we consider `configuration.getSafeSchema()`? ########## sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformProvider.java: ########## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.fileschematransform; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.service.AutoService; +import java.io.IOException; +import java.io.Reader; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.schemas.io.Providers; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.Watch.Growth; +import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CharStreams; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +@AutoService(SchemaTransformProvider.class) +public class FileReadSchemaTransformProvider + extends TypedSchemaTransformProvider<FileReadSchemaTransformConfiguration> { + private static final Logger LOG = LoggerFactory.getLogger(FileReadSchemaTransformProvider.class); + private static final String IDENTIFIER = "beam:schematransform:org.apache.beam:file_read:v1"; + static final String INPUT_TAG = "input"; + static final String OUTPUT_TAG = "output"; + + @Override + protected Class<FileReadSchemaTransformConfiguration> configurationClass() { + return FileReadSchemaTransformConfiguration.class; + } + + @Override + protected SchemaTransform from(FileReadSchemaTransformConfiguration configuration) { + return new FileReadSchemaTransform(configuration); + } + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public List<String> inputCollectionNames() { + return Collections.singletonList(INPUT_TAG); + } + + @Override + public List<String> outputCollectionNames() { + return Collections.singletonList(OUTPUT_TAG); + } + + private static class FileReadSchemaTransform + extends PTransform<PCollectionRowTuple, PCollectionRowTuple> implements SchemaTransform { + private FileReadSchemaTransformConfiguration configuration; + + FileReadSchemaTransform(FileReadSchemaTransformConfiguration configuration) { + configuration.validate(); + this.configuration = configuration; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + checkArgument( + input.getAll().isEmpty() ^ Strings.isNullOrEmpty(configuration.getFilepattern()), + "Either an input PCollection of file patterns or the filepattern parameter must be set," + + "but not both."); + + // Input schema can be a schema String or a path to a file containing the schema + // Resolve to get a schema String + String schema = configuration.getSchema(); + if (!Strings.isNullOrEmpty(schema)) { + schema = resolveSchemaStringOrFilePath(schema); + configuration = configuration.toBuilder().setSchema(schema).build(); + } + + PCollection<MatchResult.Metadata> files; + if (!Strings.isNullOrEmpty(configuration.getFilepattern())) { + Pipeline p = input.getPipeline(); + FileIO.Match matchFiles = FileIO.match().filepattern(configuration.getFilepattern()); + // Handle streaming case + matchFiles = (FileIO.Match) maybeApplyStreaming(matchFiles); + + files = p.apply(matchFiles); + } else { + FileIO.MatchAll matchAllFiles = FileIO.matchAll(); + // Handle streaming case + matchAllFiles = (FileIO.MatchAll) maybeApplyStreaming(matchAllFiles); + + files = + input + .get(INPUT_TAG) + .apply( + "Get filepatterns", + MapElements.into(TypeDescriptors.strings()) + .via((row) -> row.getString("filepattern"))) + .apply("Match files", matchAllFiles); + } + + // Pass readable files to the appropriate source and output rows. + PCollection<Row> output = + files + .apply(FileIO.readMatches()) + .apply("Read files", getProvider().buildTransform(configuration)); + + return PCollectionRowTuple.of(OUTPUT_TAG, output); + } + + public PTransform<?, PCollection<MatchResult.Metadata>> maybeApplyStreaming( + PTransform<?, PCollection<MatchResult.Metadata>> matchTransform) { + // Two parameters are provided to configure watching for new files. + Long terminateAfterSeconds = configuration.getTerminateAfterSecondsSinceNewOutput(); + Long pollIntervalMillis = configuration.getPollIntervalMillis(); + + // Streaming is enabled when a poll interval is provided + if (pollIntervalMillis != null && pollIntervalMillis > 0L) { + Duration pollDuration = Duration.millis(pollIntervalMillis); + + // By default, the file match transform never terminates + TerminationCondition<String, ?> terminationCondition = Growth.never(); + + // If provided, will terminate after this many seconds since seeing a new file + if (terminateAfterSeconds != null && terminateAfterSeconds > 0L) { + terminationCondition = + Growth.afterTimeSinceNewOutput(Duration.standardSeconds(terminateAfterSeconds)); + } + + // Apply watch for new files + if (matchTransform instanceof FileIO.Match) { + matchTransform = + ((FileIO.Match) matchTransform).continuously(pollDuration, terminationCondition); + } else if (matchTransform instanceof FileIO.MatchAll) { + matchTransform = + ((FileIO.MatchAll) matchTransform).continuously(pollDuration, terminationCondition); + } + } + return matchTransform; + } + + public String resolveSchemaStringOrFilePath(String schema) { Review Comment: Instead of this method, may we consider an additional configuration field and throwing an error when the user provides both the Schema and the Schema file source property? It took me some time to realize that the configuration's "Schema" property could be either a Schema or a file path. Having a more declared intent by the user from where the Schema derives would simplify the code. ########## sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/AvroReadSchemaTransformFormatProvider.java: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.fileschematransform; + +import com.google.auto.service.AutoService; +import org.apache.beam.sdk.extensions.avro.io.AvroIO; +import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils; +import org.apache.beam.sdk.io.FileIO.ReadableFile; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptors; + +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +@AutoService(FileReadSchemaTransformFormatProvider.class) +public class AvroReadSchemaTransformFormatProvider + implements FileReadSchemaTransformFormatProvider { + @Override + public String identifier() { + return "avro"; + } + + @Override + public PTransform<PCollection<ReadableFile>, PCollection<Row>> buildTransform( + FileReadSchemaTransformConfiguration configuration) { + + return new PTransform<PCollection<ReadableFile>, PCollection<Row>>() { + @Override + public PCollection<Row> expand(PCollection<ReadableFile> input) { + Schema beamSchema = + AvroUtils.toBeamSchema( + new org.apache.avro.Schema.Parser().parse(configuration.getSchema())); + + return input + .apply(AvroIO.readFilesGenericRecords(configuration.getSchema()).withBeamSchemas(true)) Review Comment: Consider `configuration.getSafeSchema()` from [comment](https://github.com/apache/beam/pull/25927#discussion_r1174141095). ########## sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/JsonReadSchemaTransformFormatProvider.java: ########## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.fileschematransform; + +import com.google.auto.service.AutoService; +import java.nio.charset.StandardCharsets; +import org.apache.beam.sdk.io.FileIO.ReadableFile; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.io.payloads.JsonPayloadSerializerProvider; +import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer; +import org.apache.beam.sdk.schemas.utils.JsonUtils; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; + +/** A {@link FileReadSchemaTransformFormatProvider} that reads newline-delimited JSONs. */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +@AutoService(FileReadSchemaTransformFormatProvider.class) +public class JsonReadSchemaTransformFormatProvider + implements FileReadSchemaTransformFormatProvider { + @Override + public String identifier() { + return "json"; + } + + @Override + public PTransform<PCollection<ReadableFile>, PCollection<Row>> buildTransform( + FileReadSchemaTransformConfiguration configuration) { + + return new PTransform<PCollection<ReadableFile>, PCollection<Row>>() { + @Override + public PCollection<Row> expand(PCollection<ReadableFile> input) { + Schema beamSchema = JsonUtils.beamSchemaFromJsonSchema(configuration.getSchema()); Review Comment: In the context of this [comment](https://github.com/apache/beam/pull/25927#discussion_r1174141095) may we consider `configuration.getSafeSchema()`? ########## sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformProvider.java: ########## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.fileschematransform; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.service.AutoService; +import java.io.IOException; +import java.io.Reader; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.schemas.io.Providers; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.Watch.Growth; +import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CharStreams; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +@AutoService(SchemaTransformProvider.class) +public class FileReadSchemaTransformProvider + extends TypedSchemaTransformProvider<FileReadSchemaTransformConfiguration> { + private static final Logger LOG = LoggerFactory.getLogger(FileReadSchemaTransformProvider.class); + private static final String IDENTIFIER = "beam:schematransform:org.apache.beam:file_read:v1"; + static final String INPUT_TAG = "input"; + static final String OUTPUT_TAG = "output"; + + @Override + protected Class<FileReadSchemaTransformConfiguration> configurationClass() { + return FileReadSchemaTransformConfiguration.class; + } + + @Override + protected SchemaTransform from(FileReadSchemaTransformConfiguration configuration) { + return new FileReadSchemaTransform(configuration); + } + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public List<String> inputCollectionNames() { + return Collections.singletonList(INPUT_TAG); + } + + @Override + public List<String> outputCollectionNames() { + return Collections.singletonList(OUTPUT_TAG); + } + + private static class FileReadSchemaTransform + extends PTransform<PCollectionRowTuple, PCollectionRowTuple> implements SchemaTransform { + private FileReadSchemaTransformConfiguration configuration; + + FileReadSchemaTransform(FileReadSchemaTransformConfiguration configuration) { + configuration.validate(); + this.configuration = configuration; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + checkArgument( + input.getAll().isEmpty() ^ Strings.isNullOrEmpty(configuration.getFilepattern()), + "Either an input PCollection of file patterns or the filepattern parameter must be set," + + "but not both."); + + // Input schema can be a schema String or a path to a file containing the schema + // Resolve to get a schema String + String schema = configuration.getSchema(); + if (!Strings.isNullOrEmpty(schema)) { + schema = resolveSchemaStringOrFilePath(schema); + configuration = configuration.toBuilder().setSchema(schema).build(); + } + + PCollection<MatchResult.Metadata> files; + if (!Strings.isNullOrEmpty(configuration.getFilepattern())) { + Pipeline p = input.getPipeline(); + FileIO.Match matchFiles = FileIO.match().filepattern(configuration.getFilepattern()); + // Handle streaming case + matchFiles = (FileIO.Match) maybeApplyStreaming(matchFiles); + + files = p.apply(matchFiles); + } else { + FileIO.MatchAll matchAllFiles = FileIO.matchAll(); + // Handle streaming case + matchAllFiles = (FileIO.MatchAll) maybeApplyStreaming(matchAllFiles); + + files = + input + .get(INPUT_TAG) + .apply( + "Get filepatterns", + MapElements.into(TypeDescriptors.strings()) + .via((row) -> row.getString("filepattern"))) Review Comment: 1) In the context of this [comment](https://github.com/apache/beam/pull/25927#discussion_r1174141095), this could change to `.via((row) -> Objects.requireNonNull(row.getString("filepattern"))))` 2) Instead of hardcoding things like "filepattern", could we instead have these constants just below the class declaration as `static final String ...`? 3) Should we check that the incoming row's Schema adhere's to having the `FieldType.STRING` field with the name "filepattern" and tests to show that meaningful errors display to the user about this expectation? ########## sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformProvider.java: ########## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.fileschematransform; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.service.AutoService; +import java.io.IOException; +import java.io.Reader; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.schemas.io.Providers; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.Watch.Growth; +import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CharStreams; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +@AutoService(SchemaTransformProvider.class) +public class FileReadSchemaTransformProvider + extends TypedSchemaTransformProvider<FileReadSchemaTransformConfiguration> { + private static final Logger LOG = LoggerFactory.getLogger(FileReadSchemaTransformProvider.class); + private static final String IDENTIFIER = "beam:schematransform:org.apache.beam:file_read:v1"; + static final String INPUT_TAG = "input"; + static final String OUTPUT_TAG = "output"; + + @Override + protected Class<FileReadSchemaTransformConfiguration> configurationClass() { + return FileReadSchemaTransformConfiguration.class; + } + + @Override + protected SchemaTransform from(FileReadSchemaTransformConfiguration configuration) { + return new FileReadSchemaTransform(configuration); + } + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public List<String> inputCollectionNames() { + return Collections.singletonList(INPUT_TAG); + } + + @Override + public List<String> outputCollectionNames() { + return Collections.singletonList(OUTPUT_TAG); + } + + private static class FileReadSchemaTransform + extends PTransform<PCollectionRowTuple, PCollectionRowTuple> implements SchemaTransform { + private FileReadSchemaTransformConfiguration configuration; + + FileReadSchemaTransform(FileReadSchemaTransformConfiguration configuration) { + configuration.validate(); + this.configuration = configuration; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + checkArgument( + input.getAll().isEmpty() ^ Strings.isNullOrEmpty(configuration.getFilepattern()), + "Either an input PCollection of file patterns or the filepattern parameter must be set," + + "but not both."); + + // Input schema can be a schema String or a path to a file containing the schema + // Resolve to get a schema String + String schema = configuration.getSchema(); + if (!Strings.isNullOrEmpty(schema)) { + schema = resolveSchemaStringOrFilePath(schema); + configuration = configuration.toBuilder().setSchema(schema).build(); + } + + PCollection<MatchResult.Metadata> files; + if (!Strings.isNullOrEmpty(configuration.getFilepattern())) { + Pipeline p = input.getPipeline(); + FileIO.Match matchFiles = FileIO.match().filepattern(configuration.getFilepattern()); + // Handle streaming case + matchFiles = (FileIO.Match) maybeApplyStreaming(matchFiles); + + files = p.apply(matchFiles); + } else { + FileIO.MatchAll matchAllFiles = FileIO.matchAll(); + // Handle streaming case + matchAllFiles = (FileIO.MatchAll) maybeApplyStreaming(matchAllFiles); + + files = + input + .get(INPUT_TAG) + .apply( + "Get filepatterns", + MapElements.into(TypeDescriptors.strings()) + .via((row) -> row.getString("filepattern"))) + .apply("Match files", matchAllFiles); + } + + // Pass readable files to the appropriate source and output rows. + PCollection<Row> output = + files + .apply(FileIO.readMatches()) + .apply("Read files", getProvider().buildTransform(configuration)); + + return PCollectionRowTuple.of(OUTPUT_TAG, output); + } + + public PTransform<?, PCollection<MatchResult.Metadata>> maybeApplyStreaming( + PTransform<?, PCollection<MatchResult.Metadata>> matchTransform) { + // Two parameters are provided to configure watching for new files. + Long terminateAfterSeconds = configuration.getTerminateAfterSecondsSinceNewOutput(); + Long pollIntervalMillis = configuration.getPollIntervalMillis(); + + // Streaming is enabled when a poll interval is provided + if (pollIntervalMillis != null && pollIntervalMillis > 0L) { + Duration pollDuration = Duration.millis(pollIntervalMillis); + + // By default, the file match transform never terminates + TerminationCondition<String, ?> terminationCondition = Growth.never(); + + // If provided, will terminate after this many seconds since seeing a new file + if (terminateAfterSeconds != null && terminateAfterSeconds > 0L) { + terminationCondition = + Growth.afterTimeSinceNewOutput(Duration.standardSeconds(terminateAfterSeconds)); + } + + // Apply watch for new files + if (matchTransform instanceof FileIO.Match) { + matchTransform = + ((FileIO.Match) matchTransform).continuously(pollDuration, terminationCondition); + } else if (matchTransform instanceof FileIO.MatchAll) { + matchTransform = + ((FileIO.MatchAll) matchTransform).continuously(pollDuration, terminationCondition); + } Review Comment: Instead of these two conditionals, in addition to the conditionals above as well as the conditional from which this method is called, may we consider a `buildMatchTransform` method that performs these checks and returns a `PTransform<PCollection<String>, PCollection<MatchResult.Metadata>>`? The conditionals could be flattened making it easier to troubleshoot and test. And, on the subject of testing either this method or `buildMatchTransform` pattern, may we see more test coverage? Finally, does the method need to be public and not package private? ########## sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformProvider.java: ########## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.fileschematransform; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.service.AutoService; +import java.io.IOException; +import java.io.Reader; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.schemas.io.Providers; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.Watch.Growth; +import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CharStreams; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +@AutoService(SchemaTransformProvider.class) +public class FileReadSchemaTransformProvider + extends TypedSchemaTransformProvider<FileReadSchemaTransformConfiguration> { + private static final Logger LOG = LoggerFactory.getLogger(FileReadSchemaTransformProvider.class); + private static final String IDENTIFIER = "beam:schematransform:org.apache.beam:file_read:v1"; + static final String INPUT_TAG = "input"; + static final String OUTPUT_TAG = "output"; + + @Override + protected Class<FileReadSchemaTransformConfiguration> configurationClass() { + return FileReadSchemaTransformConfiguration.class; + } + + @Override + protected SchemaTransform from(FileReadSchemaTransformConfiguration configuration) { + return new FileReadSchemaTransform(configuration); + } + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public List<String> inputCollectionNames() { + return Collections.singletonList(INPUT_TAG); + } + + @Override + public List<String> outputCollectionNames() { + return Collections.singletonList(OUTPUT_TAG); + } + + private static class FileReadSchemaTransform + extends PTransform<PCollectionRowTuple, PCollectionRowTuple> implements SchemaTransform { + private FileReadSchemaTransformConfiguration configuration; + + FileReadSchemaTransform(FileReadSchemaTransformConfiguration configuration) { + configuration.validate(); + this.configuration = configuration; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + checkArgument( + input.getAll().isEmpty() ^ Strings.isNullOrEmpty(configuration.getFilepattern()), + "Either an input PCollection of file patterns or the filepattern parameter must be set," + + "but not both."); + + // Input schema can be a schema String or a path to a file containing the schema + // Resolve to get a schema String + String schema = configuration.getSchema(); + if (!Strings.isNullOrEmpty(schema)) { + schema = resolveSchemaStringOrFilePath(schema); + configuration = configuration.toBuilder().setSchema(schema).build(); + } + + PCollection<MatchResult.Metadata> files; + if (!Strings.isNullOrEmpty(configuration.getFilepattern())) { + Pipeline p = input.getPipeline(); + FileIO.Match matchFiles = FileIO.match().filepattern(configuration.getFilepattern()); + // Handle streaming case + matchFiles = (FileIO.Match) maybeApplyStreaming(matchFiles); + + files = p.apply(matchFiles); + } else { + FileIO.MatchAll matchAllFiles = FileIO.matchAll(); + // Handle streaming case + matchAllFiles = (FileIO.MatchAll) maybeApplyStreaming(matchAllFiles); + + files = + input + .get(INPUT_TAG) + .apply( + "Get filepatterns", + MapElements.into(TypeDescriptors.strings()) + .via((row) -> row.getString("filepattern"))) + .apply("Match files", matchAllFiles); + } + + // Pass readable files to the appropriate source and output rows. + PCollection<Row> output = + files + .apply(FileIO.readMatches()) + .apply("Read files", getProvider().buildTransform(configuration)); + + return PCollectionRowTuple.of(OUTPUT_TAG, output); + } + + public PTransform<?, PCollection<MatchResult.Metadata>> maybeApplyStreaming( + PTransform<?, PCollection<MatchResult.Metadata>> matchTransform) { + // Two parameters are provided to configure watching for new files. + Long terminateAfterSeconds = configuration.getTerminateAfterSecondsSinceNewOutput(); + Long pollIntervalMillis = configuration.getPollIntervalMillis(); + + // Streaming is enabled when a poll interval is provided + if (pollIntervalMillis != null && pollIntervalMillis > 0L) { + Duration pollDuration = Duration.millis(pollIntervalMillis); + + // By default, the file match transform never terminates + TerminationCondition<String, ?> terminationCondition = Growth.never(); + + // If provided, will terminate after this many seconds since seeing a new file + if (terminateAfterSeconds != null && terminateAfterSeconds > 0L) { + terminationCondition = + Growth.afterTimeSinceNewOutput(Duration.standardSeconds(terminateAfterSeconds)); + } + + // Apply watch for new files + if (matchTransform instanceof FileIO.Match) { + matchTransform = + ((FileIO.Match) matchTransform).continuously(pollDuration, terminationCondition); + } else if (matchTransform instanceof FileIO.MatchAll) { + matchTransform = + ((FileIO.MatchAll) matchTransform).continuously(pollDuration, terminationCondition); + } + } + return matchTransform; + } + + public String resolveSchemaStringOrFilePath(String schema) { + try { + MatchResult result; + try { + LOG.info("Attempting to locate input schema as a file path."); + result = FileSystems.match(schema); + // While some FileSystem implementations throw an IllegalArgumentException when matching + // invalid file paths, others will return a status ERROR or NOT_FOUND. The following check + // will throw an IllegalArgumentException and take us to the catch block. + checkArgument(result.status() == MatchResult.Status.OK); + } catch (IllegalArgumentException e) { + LOG.info( + "Input schema is not a valid file path. Will attempt to use it as a schema string."); + return schema; + } + checkArgument( + !result.metadata().isEmpty(), + "Failed to match any files for the input schema file path."); + List<ResourceId> resource = + result.metadata().stream() + .map(MatchResult.Metadata::resourceId) + .collect(Collectors.toList()); + + checkArgument( + resource.size() == 1, + "Expected exactly 1 schema file, but got " + resource.size() + " files."); + + ReadableByteChannel byteChannel = FileSystems.open(resource.get(0)); + Reader reader = Channels.newReader(byteChannel, UTF_8.name()); + return CharStreams.toString(reader); + } catch (IOException e) { + throw new RuntimeException("Error when parsing input schema file: ", e); + } + } + + private FileReadSchemaTransformFormatProvider getProvider() { + String format = configuration.getFormat(); + Map<String, FileReadSchemaTransformFormatProvider> providers = + Providers.loadProviders(FileReadSchemaTransformFormatProvider.class); + checkArgument( + providers.containsKey(format), + String.format( + "Received unsupported file format: %s. Supported formats are %s", + format, providers.keySet())); + + return providers.get(format); Review Comment: After removing the SuppressWarnings annotation (See [comment](https://github.com/apache/beam/pull/25927#discussion_r1174141095)), I was able to get check to pass with this: ``` Optional<FileReadSchemaTransformFormatProvider> provider = Optional.ofNullable(providers.get(format)); checkState( provider.isPresent(), String.format( "Received unsupported file format: %s. Supported formats are %s", format, providers.keySet())); return provider.get(); ``` ########## sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/LineReadSchemaTransformFormatProvider.java: ########## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.fileschematransform; + +import com.google.auto.service.AutoService; +import org.apache.beam.sdk.io.FileIO.ReadableFile; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptors; + +/** A {@link FileReadSchemaTransformFormatProvider} that reads lines as Strings. */ +@AutoService(FileReadSchemaTransformFormatProvider.class) +public class LineReadSchemaTransformFormatProvider + implements FileReadSchemaTransformFormatProvider { + @Override + public String identifier() { + return "line"; + } + + @Override + public PTransform<PCollection<ReadableFile>, PCollection<Row>> buildTransform( + FileReadSchemaTransformConfiguration configuration) { + + return new PTransform<PCollection<ReadableFile>, PCollection<Row>>() { + @Override + public PCollection<Row> expand(PCollection<ReadableFile> input) { + Schema lineSchema = Schema.builder().addStringField("line").build(); Review Comment: Could we move this to a `static final Schema LINE_SCHEMA = ...` between lines 33 and 34? ########## sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformProvider.java: ########## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.fileschematransform; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.service.AutoService; +import java.io.IOException; +import java.io.Reader; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.schemas.io.Providers; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.Watch.Growth; +import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CharStreams; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +@AutoService(SchemaTransformProvider.class) +public class FileReadSchemaTransformProvider + extends TypedSchemaTransformProvider<FileReadSchemaTransformConfiguration> { + private static final Logger LOG = LoggerFactory.getLogger(FileReadSchemaTransformProvider.class); + private static final String IDENTIFIER = "beam:schematransform:org.apache.beam:file_read:v1"; + static final String INPUT_TAG = "input"; + static final String OUTPUT_TAG = "output"; + + @Override + protected Class<FileReadSchemaTransformConfiguration> configurationClass() { + return FileReadSchemaTransformConfiguration.class; + } + + @Override + protected SchemaTransform from(FileReadSchemaTransformConfiguration configuration) { + return new FileReadSchemaTransform(configuration); + } + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public List<String> inputCollectionNames() { + return Collections.singletonList(INPUT_TAG); + } + + @Override + public List<String> outputCollectionNames() { + return Collections.singletonList(OUTPUT_TAG); + } + + private static class FileReadSchemaTransform + extends PTransform<PCollectionRowTuple, PCollectionRowTuple> implements SchemaTransform { + private FileReadSchemaTransformConfiguration configuration; + + FileReadSchemaTransform(FileReadSchemaTransformConfiguration configuration) { + configuration.validate(); + this.configuration = configuration; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + checkArgument( + input.getAll().isEmpty() ^ Strings.isNullOrEmpty(configuration.getFilepattern()), + "Either an input PCollection of file patterns or the filepattern parameter must be set," + + "but not both."); + + // Input schema can be a schema String or a path to a file containing the schema + // Resolve to get a schema String + String schema = configuration.getSchema(); Review Comment: In this [comment](https://github.com/apache/beam/pull/25927#discussion_r1174141095)'s context, could we do this? I agree it looks like a game with the checker framework but the benefit of having the checker framework and removing the SuppressWarnings outweighs the cost of this I'd argue. Even if one checks nullorempty in a prior step and still calls getSchema, the checker framework complains. ``` if (!Strings.isNullOrEmpty(configuration.getSchema())) { String schema = configuration.getSafeSchema(); ... } ``` ########## sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformFormatProviderTest.java: ########## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.fileschematransform; + +import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.ALL_PRIMITIVE_DATA_TYPES_SCHEMA; +import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.ARRAY_PRIMITIVE_DATA_TYPES_SCHEMA; +import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.BYTE_SEQUENCE_TYPE_SCHEMA; +import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.BYTE_TYPE_SCHEMA; +import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA; +import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.SINGLY_NESTED_DATA_TYPES_SCHEMA; +import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.TIME_CONTAINING_SCHEMA; +import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformConfiguration.parquetConfigurationBuilder; +import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviderTestData.DATA; +import static org.junit.Assume.assumeTrue; + +import java.io.IOException; +import java.io.PrintWriter; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; + +public abstract class FileReadSchemaTransformFormatProviderTest { + + /** Returns the format of the {@linke FileReadSchemaTransformFormatProviderTest} subclass. */ + protected abstract String getFormat(); + + /** Given a Beam Schema, returns the relevant source's String schema representation. */ + protected abstract String getStringSchemaFromBeamSchema(Schema beamSchema); + + /** + * Writes {@link Row}s to files then reads from those files. Performs a {@link + * org.apache.beam.sdk.testing.PAssert} check to validate the written and read {@link Row}s are + * equal. + */ + protected abstract void runWriteAndReadTest( + Schema schema, List<Row> rows, String filePath, String schemaFilePath); + + @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); + @Rule public TestPipeline writePipeline = TestPipeline.create(); + @Rule public TestPipeline readPipeline = TestPipeline.create(); + + @Rule public TestName testName = new TestName(); + + protected Schema getFilepatternSchema() { + return Schema.of(Field.of("filepattern", FieldType.STRING)); + } + + protected String getFilePath() { + return getFolder() + "/test"; + } + + protected String getFolder() { + try { + return tmpFolder.newFolder(getFormat(), testName.getMethodName()).getAbsolutePath(); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + + @Test + public void testAllPrimitiveDataTypes() { + Schema schema = ALL_PRIMITIVE_DATA_TYPES_SCHEMA; + List<Row> rows = DATA.allPrimitiveDataTypesRows; + String filePath = getFilePath(); + + runWriteAndReadTest(schema, rows, filePath, null); + } + + @Test + public void testNullableAllPrimitiveDataTypes() { + Schema schema = NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA; + List<Row> rows = DATA.nullableAllPrimitiveDataTypesRows; + String filePath = getFilePath(); + + runWriteAndReadTest(schema, rows, filePath, null); + } + + @Test + public void testTimeContaining() { + // JSON schemas don't support DATETIME or other logical types + assumeTrue(!getFormat().equals("json")); + + Schema schema = TIME_CONTAINING_SCHEMA; + List<Row> rows = DATA.timeContainingRows; + String filePath = getFilePath(); + + runWriteAndReadTest(schema, rows, filePath, null); + } + + @Test + public void testByteType() { + List<String> formatsThatSupportSingleByteType = Arrays.asList("csv", "json", "xml"); + assumeTrue(formatsThatSupportSingleByteType.contains(getFormat())); + + Schema schema = BYTE_TYPE_SCHEMA; + List<Row> rows = DATA.byteTypeRows; + String filePath = getFilePath(); + + runWriteAndReadTest(schema, rows, filePath, null); + } + + @Test + public void testByteSequenceType() { + List<String> formatsThatSupportByteSequenceType = Arrays.asList("avro", "parquet"); + assumeTrue(formatsThatSupportByteSequenceType.contains(getFormat())); + + Schema schema = BYTE_SEQUENCE_TYPE_SCHEMA; + List<Row> rows = DATA.byteSequenceTypeRows; + String filePath = getFilePath(); + + runWriteAndReadTest(schema, rows, filePath, null); + } + + @Test + public void testArrayPrimitiveDataTypes() { + Schema schema = ARRAY_PRIMITIVE_DATA_TYPES_SCHEMA; + List<Row> rows = DATA.arrayPrimitiveDataTypesRows; + String filePath = getFilePath(); + + runWriteAndReadTest(schema, rows, filePath, null); + } + + @Test + public void testNestedRepeatedDataTypes() { Review Comment: Could we see a test for the doublyNestedDataTypesRepeatRows? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
