damondouglas commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1166137111


##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformConfiguration.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class FileReadSchemaTransformConfiguration {
+  static final Set<String> VALID_FORMATS = Sets.newHashSet("avro", "parquet", 
"json", "line");
+
+  public void validate() {
+    checkArgument(
+        !Strings.isNullOrEmpty(this.getFormat()) && 
VALID_FORMATS.contains(this.getFormat()),
+        "A valid file format must be specified. Please specify one of: "
+            + VALID_FORMATS.toString());

Review Comment:
   Do you like the idea of this instead?  That was if a new 
FileReadSchemaTransformFormatProvider AutoService annotated implementation is 
added there's no need to syncronize with the VALID_FORMATS list.
   ```
   
Providers.loadProviders(FileReadSchemaTransformFormatProvider.class).containsKey(this.getFormat())
   ```



##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformConfiguration.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class FileReadSchemaTransformConfiguration {
+  static final Set<String> VALID_FORMATS = Sets.newHashSet("avro", "parquet", 
"json", "line");
+
+  public void validate() {
+    checkArgument(
+        !Strings.isNullOrEmpty(this.getFormat()) && 
VALID_FORMATS.contains(this.getFormat()),
+        "A valid file format must be specified. Please specify one of: "
+            + VALID_FORMATS.toString());
+
+    if (!this.getFormat().equals("line")) {
+      checkArgument(
+          !Strings.isNullOrEmpty(this.getSchema()),
+          String.format(
+              "A schema must be specified when reading files with %s format. 
You may provide a schema string or a path to a file containing the schema.",
+              this.getFormat()));
+    }

Review Comment:
   I'm struggling with the need for someone to look into this code to determine 
that schema is required for all of the formats except for line.  Neither of the 
following options are 100% ideal and I trust your judgement but wanted to relay 
my thoughts.
   
   1) Remove LineReadSchemaTransformFormatProvider and the Nullable annotation 
on the getSchema method
   2) Remove the Nullable annotation on the getSchema method and 
provide/document a Schema for LineReadSchemaTransformFormatProvider
   
   What's missing from option 1 and 2 is dealing with a non-null empty string. 
I'd have to test this on my end but I wonder if Schema is one of the rare 
non-primitive types that we can get away with.



##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformConfiguration.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})

Review Comment:
   Is there a way to apply nullness warnings suppression at the methods that 
are giving the problem? It's more code but alternatively do you like the idea 
of using Optional?  In the configuration I don't imagine a performance hit as 
it is evaluated at pipeline construction time.



##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformProvider.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.schemas.io.Providers;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Watch.Growth;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CharStreams;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@AutoService(SchemaTransformProvider.class)
+public class FileReadSchemaTransformProvider
+    extends TypedSchemaTransformProvider<FileReadSchemaTransformConfiguration> 
{
+  private static final Logger LOG = 
LoggerFactory.getLogger(FileReadSchemaTransformProvider.class);
+  private static final String IDENTIFIER = 
"beam:schematransform:org.apache.beam:file_read:v1";
+  static final String INPUT_TAG = "input";
+  static final String OUTPUT_TAG = "output";
+
+  @Override
+  protected Class<FileReadSchemaTransformConfiguration> configurationClass() {
+    return FileReadSchemaTransformConfiguration.class;
+  }
+
+  @Override
+  protected SchemaTransform from(FileReadSchemaTransformConfiguration 
configuration) {
+    return new FileReadSchemaTransform(configuration);
+  }
+
+  @Override
+  public String identifier() {
+    return IDENTIFIER;
+  }
+
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT_TAG);
+  }
+
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  private static class FileReadSchemaTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> implements 
SchemaTransform {
+    private FileReadSchemaTransformConfiguration configuration;
+
+    FileReadSchemaTransform(FileReadSchemaTransformConfiguration 
configuration) {
+      configuration.validate();
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      checkArgument(
+          input.getAll().isEmpty() ^ 
Strings.isNullOrEmpty(configuration.getFilepattern()),
+          "Either an input PCollection of file patterns or the filepattern 
parameter must be set,"
+              + "but not both.");

Review Comment:
   I had a hard time understanding this. Is this because we have the 
LineReadSchemaTransformFormatProvider?



##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformConfiguration.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class FileReadSchemaTransformConfiguration {
+  static final Set<String> VALID_FORMATS = Sets.newHashSet("avro", "parquet", 
"json", "line");
+
+  public void validate() {
+    checkArgument(
+        !Strings.isNullOrEmpty(this.getFormat()) && 
VALID_FORMATS.contains(this.getFormat()),
+        "A valid file format must be specified. Please specify one of: "
+            + VALID_FORMATS.toString());
+
+    if (!this.getFormat().equals("line")) {
+      checkArgument(
+          !Strings.isNullOrEmpty(this.getSchema()),
+          String.format(
+              "A schema must be specified when reading files with %s format. 
You may provide a schema string or a path to a file containing the schema.",
+              this.getFormat()));
+    }
+
+    Integer terminateAfterSecondsSinceNewOutput = 
this.getTerminateAfterSecondsSinceNewOutput();
+    Integer pollIntervalMillis = this.getPollIntervalMillis();
+    if (terminateAfterSecondsSinceNewOutput != null && 
terminateAfterSecondsSinceNewOutput > 0) {
+      checkArgument(
+          pollIntervalMillis != null && pollIntervalMillis > 0,
+          "Found positive value for terminateAfterSecondsSinceNewOutput but 
non-positive"
+              + "value for pollIntervalMillis. Please set pollIntervalMillis 
as well to enable"
+              + "streaming.");
+    }
+  }
+
+  public static Builder builder() {
+    return new AutoValue_FileReadSchemaTransformConfiguration.Builder();
+  }
+
+  /**
+   * The format of the file(s) to read.
+   *
+   * <p>Possible values are: `"lines"`, `"avro"`, `"parquet"`, `"json"`
+   */
+  public abstract String getFormat();
+
+  /**
+   * The filepattern used to match and read files.
+   *
+   * <p>May instead use an input PCollection<Row> of filepatterns.
+   */
+  @Nullable
+  public abstract String getFilepattern();
+
+  /**
+   * The schema used by sources to deserialize data and create Beam Rows.
+   *
+   * <p>May be provided as a schema String or as a String path to a file that 
contains the schema.
+   */
+  @Nullable
+  public abstract String getSchema();
+
+  /**
+   * The time, in milliseconds, to wait before polling for new files.
+   *
+   * <p>This will set the pipeline to be a streaming pipeline and will 
continuously watch for new
+   * files.
+   *
+   * <p>Note: This only polls for new files. New updates to an existing file 
will not be watched
+   * for.
+   */
+  @Nullable
+  public abstract Integer getPollIntervalMillis();

Review Comment:
   Should this be a Long so that it is easy to pass into 
[Duration](https://www.joda.org/joda-time/apidocs/org/joda/time/Duration.html#Duration-long-)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to