laraschmidt commented on a change in pull request #17135:
URL: https://github.com/apache/beam/pull/17135#discussion_r831428036



##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaIOProvider.java
##########
@@ -17,199 +17,63 @@
  */
 package org.apache.beam.sdk.io.gcp.bigquery;
 
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.auto.service.AutoService;
-import java.io.Serializable;
-import java.util.HashMap;
+import java.util.Collections;
+import java.util.List;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.Schema.FieldType;
-import org.apache.beam.sdk.schemas.io.SchemaIO;
-import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
-import org.apache.beam.sdk.schemas.transforms.Convert;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.Row;
-import org.checkerframework.checker.nullness.qual.Nullable;
+import 
org.apache.beam.sdk.io.gcp.bigquery.schematransform.BigQuerySchemaIOConfiguration;
+import 
org.apache.beam.sdk.io.gcp.bigquery.schematransform.BigQuerySchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
 
 /**
- * An implementation of {@link SchemaIOProvider} for reading and writing to 
BigQuery with {@link
- * BigQueryIO}. For a description of configuration options and other defaults, 
see {@link
- * BigQuerySchemaIOProvider#configurationSchema()}.
+ * An implementation of {@link TypedSchemaTransformProvider} for reading and 
writing to BigQuery
+ * with {@link BigQueryIO}.
  *
- * <p>This transform is still experimental, and is still subject to breaking 
changes.
+ * <p><b>Internal only:</b> This is actively being worked on and will likely 
change. We provide no
+ * backwards compatibility guarantees, and it should not be implemented 
outside the Beam repository.
  */
 @Internal
 @Experimental
-@AutoService(SchemaIOProvider.class)

Review comment:
       We probably still need AutoService, just for the SchemaTransformProvider 
instead of SchemaIOProvider.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaIOProvider.java
##########
@@ -17,199 +17,63 @@
  */
 package org.apache.beam.sdk.io.gcp.bigquery;
 
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.auto.service.AutoService;
-import java.io.Serializable;
-import java.util.HashMap;
+import java.util.Collections;
+import java.util.List;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.Schema.FieldType;
-import org.apache.beam.sdk.schemas.io.SchemaIO;
-import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
-import org.apache.beam.sdk.schemas.transforms.Convert;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.Row;
-import org.checkerframework.checker.nullness.qual.Nullable;
+import 
org.apache.beam.sdk.io.gcp.bigquery.schematransform.BigQuerySchemaIOConfiguration;
+import 
org.apache.beam.sdk.io.gcp.bigquery.schematransform.BigQuerySchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
 
 /**
- * An implementation of {@link SchemaIOProvider} for reading and writing to 
BigQuery with {@link
- * BigQueryIO}. For a description of configuration options and other defaults, 
see {@link
- * BigQuerySchemaIOProvider#configurationSchema()}.
+ * An implementation of {@link TypedSchemaTransformProvider} for reading and 
writing to BigQuery
+ * with {@link BigQueryIO}.
  *
- * <p>This transform is still experimental, and is still subject to breaking 
changes.
+ * <p><b>Internal only:</b> This is actively being worked on and will likely 
change. We provide no
+ * backwards compatibility guarantees, and it should not be implemented 
outside the Beam repository.
  */
 @Internal
 @Experimental
-@AutoService(SchemaIOProvider.class)
-public class BigQuerySchemaIOProvider implements SchemaIOProvider {
+public class BigQuerySchemaIOProvider
+    extends TypedSchemaTransformProvider<BigQuerySchemaIOConfiguration> {
 
-  /** Returns an id that uniquely represents this IO. */
-  @Override
-  public String identifier() {
-    return "bigquery";
+  public BigQuerySchemaIOProvider() {
+    super();

Review comment:
       The base class doesn't have a constructor. Is it normal to call super in 
this case?

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaIOProvider.java
##########
@@ -17,199 +17,63 @@
  */
 package org.apache.beam.sdk.io.gcp.bigquery;
 
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.auto.service.AutoService;
-import java.io.Serializable;
-import java.util.HashMap;
+import java.util.Collections;
+import java.util.List;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.Schema.FieldType;
-import org.apache.beam.sdk.schemas.io.SchemaIO;
-import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
-import org.apache.beam.sdk.schemas.transforms.Convert;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.Row;
-import org.checkerframework.checker.nullness.qual.Nullable;
+import 
org.apache.beam.sdk.io.gcp.bigquery.schematransform.BigQuerySchemaIOConfiguration;
+import 
org.apache.beam.sdk.io.gcp.bigquery.schematransform.BigQuerySchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
 
 /**
- * An implementation of {@link SchemaIOProvider} for reading and writing to 
BigQuery with {@link
- * BigQueryIO}. For a description of configuration options and other defaults, 
see {@link
- * BigQuerySchemaIOProvider#configurationSchema()}.
+ * An implementation of {@link TypedSchemaTransformProvider} for reading and 
writing to BigQuery
+ * with {@link BigQueryIO}.
  *
- * <p>This transform is still experimental, and is still subject to breaking 
changes.
+ * <p><b>Internal only:</b> This is actively being worked on and will likely 
change. We provide no
+ * backwards compatibility guarantees, and it should not be implemented 
outside the Beam repository.
  */
 @Internal
 @Experimental
-@AutoService(SchemaIOProvider.class)
-public class BigQuerySchemaIOProvider implements SchemaIOProvider {
+public class BigQuerySchemaIOProvider
+    extends TypedSchemaTransformProvider<BigQuerySchemaIOConfiguration> {
 
-  /** Returns an id that uniquely represents this IO. */
-  @Override
-  public String identifier() {
-    return "bigquery";
+  public BigQuerySchemaIOProvider() {
+    super();
   }
 
-  /**
-   * Returns the expected schema of the configuration object. Note this is 
distinct from the schema
-   * of the data source itself. The fields are as follows:
-   *
-   * <ul>
-   *   <li>table: Nullable String - Used for reads and writes. Specifies a 
table to read or write
-   *       to, in the format described in {@link 
BigQueryHelpers#parseTableSpec}. Used as an input
-   *       to {@link BigQueryIO.TypedRead#from(String)} or {@link 
BigQueryIO.Write#to(String)}.
-   *   <li>query: Nullable String - Used for reads. Specifies a query to read 
results from using the
-   *       BigQuery Standard SQL dialect. Used as an input to {@link
-   *       BigQueryIO.TypedRead#fromQuery(String)}.
-   *   <li>queryLocation: Nullable String - Used for reads. Specifies a 
BigQuery geographic location
-   *       where the query job will be executed. Used as an input to {@link
-   *       BigQueryIO.TypedRead#withQueryLocation(String)}.
-   *   <li>createDisposition: Nullable String - Used for writes. Specifies 
whether a table should be
-   *       created if it does not exist. Valid inputs are "Never" and 
"IfNeeded", corresponding to
-   *       values of {@link BigQueryIO.Write.CreateDisposition}. Used as an 
input to {@link
-   *       
BigQueryIO.Write#withCreateDisposition(BigQueryIO.Write.CreateDisposition)}.
-   * </ul>
-   *
-   * Relevant default values for these transforms that are not configurable 
fields are as follows:
-   *
-   * <ul>
-   *   <li>ReadMethod - The input to {@link
-   *       BigQueryIO.TypedRead#withMethod(BigQueryIO.TypedRead.Method)}. 
Defaults to EXPORT, since
-   *       that is the only method that currently offers Beam Schema support.
-   *   <li>WriteMethod - The input to {@link 
BigQueryIO.Write#withMethod(BigQueryIO.Write.Method)}.
-   *       Currently defaults to STORAGE_WRITE_API.
-   * </ul>
-   */
+  /** Returns the expected class of the configuration object. */
   @Override
-  public Schema configurationSchema() {
-    return Schema.builder()
-        .addNullableField("table", FieldType.STRING)
-        .addNullableField("query", FieldType.STRING)
-        .addNullableField("queryLocation", FieldType.STRING)
-        .addNullableField("createDisposition", FieldType.STRING)
-        .build();
-  }
-
-  private static final HashMap<String, BigQueryIO.Write.CreateDisposition> 
createDispositionsMap =
-      new HashMap<>();
-
-  static {
-    createDispositionsMap.put("Never", 
BigQueryIO.Write.CreateDisposition.CREATE_NEVER);
-    createDispositionsMap.put("IfNeeded", 
BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED);
+  public Class<BigQuerySchemaIOConfiguration> configurationClass() {
+    return BigQuerySchemaIOConfiguration.class;
   }
 
   /**
-   * Produces a SchemaIO given a String representing the data's location, the 
schema of the data
-   * that resides there, and some IO-specific configuration object.
-   *
-   * <p>For BigQuery IO, only the configuration object is used. Location and 
data schema have no
-   * effect. Specifying a table and dataset is done through appropriate fields 
in the configuration
-   * object, and the data schema is automatically generated from either the 
input PCollection or
-   * schema of the BigQuery table.
+   * Produces a {@link SchemaTransform} implementation based on the {@link
+   * BigQuerySchemaIOConfiguration} details.
    */
   @Override
-  public BigQuerySchemaIO from(String location, Row configuration, @Nullable 
Schema dataSchema) {
-    return new BigQuerySchemaIO(location, configuration);
+  public SchemaTransform from(BigQuerySchemaIOConfiguration configuration) {
+    return BigQuerySchemaTransform.of(configuration);
   }
 
-  /**
-   * Indicates whether this transform requires a specified data schema.
-   *
-   * @return false
-   */
+  /** Returns an id that uniquely identifies this transform. */
   @Override
-  public boolean requiresDataSchema() {
-    return false;
+  public String identifier() {
+    return BigQuerySchemaIOConfiguration.IDENTIFIER;

Review comment:
       Conceptually this doesn't really belong in the configuration class. I'd 
either do a string or a constant described in this file

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/schematransform/BigQuerySchemaTransform.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery.schematransform;
+
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+
+@AutoValue
+public abstract class BigQuerySchemaTransform implements SchemaTransform {

Review comment:
       Need one for write and one for read

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/schematransform/BigQuerySchemaTransform.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery.schematransform;
+
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+
+@AutoValue
+public abstract class BigQuerySchemaTransform implements SchemaTransform {
+
+  public static BigQuerySchemaTransform of(BigQuerySchemaIOConfiguration 
configuration) {
+    return builderOf(configuration).build();
+  }
+
+  public static Builder builderOf(BigQuerySchemaIOConfiguration configuration) 
{
+    return new 
AutoValue_BigQuerySchemaTransform.Builder().setConfiguration(configuration);
+  }
+
+  public abstract BigQuerySchemaIOConfiguration getConfiguration();
+
+  @Nullable
+  public abstract BigQueryServices getBigQueryServices();
+
+  @Override
+  public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() 
{
+    return new PTransform<PCollectionRowTuple, PCollectionRowTuple>() {
+      @Override
+      public PCollectionRowTuple expand(PCollectionRowTuple input) {
+        if (input.getAll().isEmpty()) {
+          BigQueryRowReader.Builder builder = 
BigQueryRowReader.builderOf(getConfiguration());

Review comment:
       Why not combine BigQueryRowReader and the SchemaTransform?

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaIOProvider.java
##########
@@ -17,199 +17,63 @@
  */
 package org.apache.beam.sdk.io.gcp.bigquery;
 
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.auto.service.AutoService;
-import java.io.Serializable;
-import java.util.HashMap;
+import java.util.Collections;
+import java.util.List;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.Schema.FieldType;
-import org.apache.beam.sdk.schemas.io.SchemaIO;
-import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
-import org.apache.beam.sdk.schemas.transforms.Convert;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.Row;
-import org.checkerframework.checker.nullness.qual.Nullable;
+import 
org.apache.beam.sdk.io.gcp.bigquery.schematransform.BigQuerySchemaIOConfiguration;
+import 
org.apache.beam.sdk.io.gcp.bigquery.schematransform.BigQuerySchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
 
 /**
- * An implementation of {@link SchemaIOProvider} for reading and writing to 
BigQuery with {@link
- * BigQueryIO}. For a description of configuration options and other defaults, 
see {@link
- * BigQuerySchemaIOProvider#configurationSchema()}.
+ * An implementation of {@link TypedSchemaTransformProvider} for reading and 
writing to BigQuery
+ * with {@link BigQueryIO}.
  *
- * <p>This transform is still experimental, and is still subject to breaking 
changes.
+ * <p><b>Internal only:</b> This is actively being worked on and will likely 
change. We provide no
+ * backwards compatibility guarantees, and it should not be implemented 
outside the Beam repository.
  */
 @Internal
 @Experimental
-@AutoService(SchemaIOProvider.class)
-public class BigQuerySchemaIOProvider implements SchemaIOProvider {
+public class BigQuerySchemaIOProvider
+    extends TypedSchemaTransformProvider<BigQuerySchemaIOConfiguration> {
 
-  /** Returns an id that uniquely represents this IO. */
-  @Override
-  public String identifier() {
-    return "bigquery";
+  public BigQuerySchemaIOProvider() {
+    super();
   }
 
-  /**
-   * Returns the expected schema of the configuration object. Note this is 
distinct from the schema
-   * of the data source itself. The fields are as follows:
-   *
-   * <ul>
-   *   <li>table: Nullable String - Used for reads and writes. Specifies a 
table to read or write
-   *       to, in the format described in {@link 
BigQueryHelpers#parseTableSpec}. Used as an input
-   *       to {@link BigQueryIO.TypedRead#from(String)} or {@link 
BigQueryIO.Write#to(String)}.
-   *   <li>query: Nullable String - Used for reads. Specifies a query to read 
results from using the
-   *       BigQuery Standard SQL dialect. Used as an input to {@link
-   *       BigQueryIO.TypedRead#fromQuery(String)}.
-   *   <li>queryLocation: Nullable String - Used for reads. Specifies a 
BigQuery geographic location
-   *       where the query job will be executed. Used as an input to {@link
-   *       BigQueryIO.TypedRead#withQueryLocation(String)}.
-   *   <li>createDisposition: Nullable String - Used for writes. Specifies 
whether a table should be
-   *       created if it does not exist. Valid inputs are "Never" and 
"IfNeeded", corresponding to
-   *       values of {@link BigQueryIO.Write.CreateDisposition}. Used as an 
input to {@link
-   *       
BigQueryIO.Write#withCreateDisposition(BigQueryIO.Write.CreateDisposition)}.
-   * </ul>
-   *
-   * Relevant default values for these transforms that are not configurable 
fields are as follows:
-   *
-   * <ul>
-   *   <li>ReadMethod - The input to {@link
-   *       BigQueryIO.TypedRead#withMethod(BigQueryIO.TypedRead.Method)}. 
Defaults to EXPORT, since
-   *       that is the only method that currently offers Beam Schema support.
-   *   <li>WriteMethod - The input to {@link 
BigQueryIO.Write#withMethod(BigQueryIO.Write.Method)}.
-   *       Currently defaults to STORAGE_WRITE_API.
-   * </ul>
-   */
+  /** Returns the expected class of the configuration object. */
   @Override
-  public Schema configurationSchema() {
-    return Schema.builder()
-        .addNullableField("table", FieldType.STRING)
-        .addNullableField("query", FieldType.STRING)
-        .addNullableField("queryLocation", FieldType.STRING)
-        .addNullableField("createDisposition", FieldType.STRING)
-        .build();
-  }
-
-  private static final HashMap<String, BigQueryIO.Write.CreateDisposition> 
createDispositionsMap =
-      new HashMap<>();
-
-  static {
-    createDispositionsMap.put("Never", 
BigQueryIO.Write.CreateDisposition.CREATE_NEVER);
-    createDispositionsMap.put("IfNeeded", 
BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED);
+  public Class<BigQuerySchemaIOConfiguration> configurationClass() {
+    return BigQuerySchemaIOConfiguration.class;
   }
 
   /**
-   * Produces a SchemaIO given a String representing the data's location, the 
schema of the data
-   * that resides there, and some IO-specific configuration object.
-   *
-   * <p>For BigQuery IO, only the configuration object is used. Location and 
data schema have no
-   * effect. Specifying a table and dataset is done through appropriate fields 
in the configuration
-   * object, and the data schema is automatically generated from either the 
input PCollection or
-   * schema of the BigQuery table.
+   * Produces a {@link SchemaTransform} implementation based on the {@link
+   * BigQuerySchemaIOConfiguration} details.
    */
   @Override
-  public BigQuerySchemaIO from(String location, Row configuration, @Nullable 
Schema dataSchema) {
-    return new BigQuerySchemaIO(location, configuration);
+  public SchemaTransform from(BigQuerySchemaIOConfiguration configuration) {

Review comment:
       SchemaTransform, but maybe BigQueryConfig is good enough?

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/schematransform/BigQuerySchemaIOConfiguration.java
##########
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery.schematransform;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.auto.value.AutoValue;
+import com.google.cloud.bigquery.storage.v1.DataFormat;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.QueryPriority;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+
+/**
+ * Configuration for the {@link 
org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaIOProvider}.
+ *
+ * <p><b>Internal only:</b> This is actively being worked on and will likely 
change. We provide no
+ * backwards compatibility guarantees, and it should not be implemented 
outside the Beam repository.
+ */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class BigQuerySchemaIOConfiguration {

Review comment:
       Where did this list of configs come from? For the first version we can 
probably just skip with the set of configurations the SchemaIO provided.
   
   eturn Schema.builder()
           .addNullableField("table", FieldType.STRING)
           .addNullableField("query", FieldType.STRING)
           .addNullableField("queryLocation", FieldType.STRING)
           .addNullableField("createDisposition", FieldType.STRING)
           .build();

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaIOProvider.java
##########
@@ -17,199 +17,63 @@
  */
 package org.apache.beam.sdk.io.gcp.bigquery;
 
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.auto.service.AutoService;
-import java.io.Serializable;
-import java.util.HashMap;
+import java.util.Collections;
+import java.util.List;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.Schema.FieldType;
-import org.apache.beam.sdk.schemas.io.SchemaIO;
-import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
-import org.apache.beam.sdk.schemas.transforms.Convert;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.Row;
-import org.checkerframework.checker.nullness.qual.Nullable;
+import 
org.apache.beam.sdk.io.gcp.bigquery.schematransform.BigQuerySchemaIOConfiguration;
+import 
org.apache.beam.sdk.io.gcp.bigquery.schematransform.BigQuerySchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
 
 /**
- * An implementation of {@link SchemaIOProvider} for reading and writing to 
BigQuery with {@link
- * BigQueryIO}. For a description of configuration options and other defaults, 
see {@link
- * BigQuerySchemaIOProvider#configurationSchema()}.
+ * An implementation of {@link TypedSchemaTransformProvider} for reading and 
writing to BigQuery
+ * with {@link BigQueryIO}.
  *
- * <p>This transform is still experimental, and is still subject to breaking 
changes.
+ * <p><b>Internal only:</b> This is actively being worked on and will likely 
change. We provide no
+ * backwards compatibility guarantees, and it should not be implemented 
outside the Beam repository.
  */
 @Internal
 @Experimental
-@AutoService(SchemaIOProvider.class)
-public class BigQuerySchemaIOProvider implements SchemaIOProvider {
+public class BigQuerySchemaIOProvider
+    extends TypedSchemaTransformProvider<BigQuerySchemaIOConfiguration> {
 
-  /** Returns an id that uniquely represents this IO. */
-  @Override
-  public String identifier() {
-    return "bigquery";
+  public BigQuerySchemaIOProvider() {
+    super();
   }
 
-  /**
-   * Returns the expected schema of the configuration object. Note this is 
distinct from the schema
-   * of the data source itself. The fields are as follows:
-   *
-   * <ul>
-   *   <li>table: Nullable String - Used for reads and writes. Specifies a 
table to read or write
-   *       to, in the format described in {@link 
BigQueryHelpers#parseTableSpec}. Used as an input
-   *       to {@link BigQueryIO.TypedRead#from(String)} or {@link 
BigQueryIO.Write#to(String)}.
-   *   <li>query: Nullable String - Used for reads. Specifies a query to read 
results from using the
-   *       BigQuery Standard SQL dialect. Used as an input to {@link
-   *       BigQueryIO.TypedRead#fromQuery(String)}.
-   *   <li>queryLocation: Nullable String - Used for reads. Specifies a 
BigQuery geographic location
-   *       where the query job will be executed. Used as an input to {@link
-   *       BigQueryIO.TypedRead#withQueryLocation(String)}.
-   *   <li>createDisposition: Nullable String - Used for writes. Specifies 
whether a table should be
-   *       created if it does not exist. Valid inputs are "Never" and 
"IfNeeded", corresponding to
-   *       values of {@link BigQueryIO.Write.CreateDisposition}. Used as an 
input to {@link
-   *       
BigQueryIO.Write#withCreateDisposition(BigQueryIO.Write.CreateDisposition)}.
-   * </ul>
-   *
-   * Relevant default values for these transforms that are not configurable 
fields are as follows:
-   *
-   * <ul>
-   *   <li>ReadMethod - The input to {@link
-   *       BigQueryIO.TypedRead#withMethod(BigQueryIO.TypedRead.Method)}. 
Defaults to EXPORT, since
-   *       that is the only method that currently offers Beam Schema support.
-   *   <li>WriteMethod - The input to {@link 
BigQueryIO.Write#withMethod(BigQueryIO.Write.Method)}.
-   *       Currently defaults to STORAGE_WRITE_API.
-   * </ul>
-   */
+  /** Returns the expected class of the configuration object. */
   @Override
-  public Schema configurationSchema() {
-    return Schema.builder()
-        .addNullableField("table", FieldType.STRING)
-        .addNullableField("query", FieldType.STRING)
-        .addNullableField("queryLocation", FieldType.STRING)
-        .addNullableField("createDisposition", FieldType.STRING)
-        .build();
-  }
-
-  private static final HashMap<String, BigQueryIO.Write.CreateDisposition> 
createDispositionsMap =
-      new HashMap<>();
-
-  static {
-    createDispositionsMap.put("Never", 
BigQueryIO.Write.CreateDisposition.CREATE_NEVER);
-    createDispositionsMap.put("IfNeeded", 
BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED);
+  public Class<BigQuerySchemaIOConfiguration> configurationClass() {
+    return BigQuerySchemaIOConfiguration.class;
   }
 
   /**
-   * Produces a SchemaIO given a String representing the data's location, the 
schema of the data
-   * that resides there, and some IO-specific configuration object.
-   *
-   * <p>For BigQuery IO, only the configuration object is used. Location and 
data schema have no
-   * effect. Specifying a table and dataset is done through appropriate fields 
in the configuration
-   * object, and the data schema is automatically generated from either the 
input PCollection or
-   * schema of the BigQuery table.
+   * Produces a {@link SchemaTransform} implementation based on the {@link
+   * BigQuerySchemaIOConfiguration} details.
    */
   @Override
-  public BigQuerySchemaIO from(String location, Row configuration, @Nullable 
Schema dataSchema) {
-    return new BigQuerySchemaIO(location, configuration);
+  public SchemaTransform from(BigQuerySchemaIOConfiguration configuration) {
+    return BigQuerySchemaTransform.of(configuration);
   }
 
-  /**
-   * Indicates whether this transform requires a specified data schema.
-   *
-   * @return false
-   */
+  /** Returns an id that uniquely identifies this transform. */
   @Override
-  public boolean requiresDataSchema() {
-    return false;
+  public String identifier() {
+    return BigQuerySchemaIOConfiguration.IDENTIFIER;
   }
 
-  /**
-   * Indicates whether the PCollections produced by this transform will 
contain a bounded or
-   * unbounded number of elements.
-   *
-   * @return Bounded
-   */
+  /** Returns the input collection names of this transform. */
   @Override
-  public PCollection.IsBounded isBounded() {
-    return PCollection.IsBounded.BOUNDED;
+  public List<String> inputCollectionNames() {
+    // TODO: determine valid input collection names for JobType

Review comment:
       You can also just put input or output if it's just one input and output. 
I think this is more useful when there's several inputs, e.g. a join.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaIOProvider.java
##########
@@ -17,199 +17,63 @@
  */
 package org.apache.beam.sdk.io.gcp.bigquery;
 
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.auto.service.AutoService;
-import java.io.Serializable;
-import java.util.HashMap;
+import java.util.Collections;
+import java.util.List;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.Schema.FieldType;
-import org.apache.beam.sdk.schemas.io.SchemaIO;
-import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
-import org.apache.beam.sdk.schemas.transforms.Convert;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.Row;
-import org.checkerframework.checker.nullness.qual.Nullable;
+import 
org.apache.beam.sdk.io.gcp.bigquery.schematransform.BigQuerySchemaIOConfiguration;
+import 
org.apache.beam.sdk.io.gcp.bigquery.schematransform.BigQuerySchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
 
 /**
- * An implementation of {@link SchemaIOProvider} for reading and writing to 
BigQuery with {@link
- * BigQueryIO}. For a description of configuration options and other defaults, 
see {@link
- * BigQuerySchemaIOProvider#configurationSchema()}.
+ * An implementation of {@link TypedSchemaTransformProvider} for reading and 
writing to BigQuery
+ * with {@link BigQueryIO}.
  *
- * <p>This transform is still experimental, and is still subject to breaking 
changes.
+ * <p><b>Internal only:</b> This is actively being worked on and will likely 
change. We provide no
+ * backwards compatibility guarantees, and it should not be implemented 
outside the Beam repository.
  */
 @Internal
 @Experimental
-@AutoService(SchemaIOProvider.class)

Review comment:
       Typically I've seen the SchemaProvider + the SchemaTransform/IO in one 
file. So we'd have two sets of those. I don't know if that fits with the 
classes here or not though.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/schematransform/TableRowToBeamRowFn.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery.schematransform;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.Objects;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.Row;
+
+class TableRowToBeamRowFn extends DoFn<TableRow, Row> {

Review comment:
       Why do we need to add this? Just moving it to a shared location?

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaIOProvider.java
##########
@@ -17,199 +17,63 @@
  */
 package org.apache.beam.sdk.io.gcp.bigquery;
 
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.auto.service.AutoService;
-import java.io.Serializable;
-import java.util.HashMap;
+import java.util.Collections;
+import java.util.List;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.Schema.FieldType;
-import org.apache.beam.sdk.schemas.io.SchemaIO;
-import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
-import org.apache.beam.sdk.schemas.transforms.Convert;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.Row;
-import org.checkerframework.checker.nullness.qual.Nullable;
+import 
org.apache.beam.sdk.io.gcp.bigquery.schematransform.BigQuerySchemaIOConfiguration;
+import 
org.apache.beam.sdk.io.gcp.bigquery.schematransform.BigQuerySchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
 
 /**
- * An implementation of {@link SchemaIOProvider} for reading and writing to 
BigQuery with {@link
- * BigQueryIO}. For a description of configuration options and other defaults, 
see {@link
- * BigQuerySchemaIOProvider#configurationSchema()}.
+ * An implementation of {@link TypedSchemaTransformProvider} for reading and 
writing to BigQuery
+ * with {@link BigQueryIO}.
  *
- * <p>This transform is still experimental, and is still subject to breaking 
changes.
+ * <p><b>Internal only:</b> This is actively being worked on and will likely 
change. We provide no
+ * backwards compatibility guarantees, and it should not be implemented 
outside the Beam repository.
  */
 @Internal
 @Experimental
-@AutoService(SchemaIOProvider.class)

Review comment:
       We need one of these for read and write, right? Shouldn't that be in the 
name?

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/schematransform/BigQuerySchemaIOConfiguration.java
##########
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery.schematransform;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.auto.value.AutoValue;
+import com.google.cloud.bigquery.storage.v1.DataFormat;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.QueryPriority;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+
+/**
+ * Configuration for the {@link 
org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaIOProvider}.
+ *
+ * <p><b>Internal only:</b> This is actively being worked on and will likely 
change. We provide no
+ * backwards compatibility guarantees, and it should not be implemented 
outside the Beam repository.
+ */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class BigQuerySchemaIOConfiguration {

Review comment:
       the SchemaTransform doesn't have to expose everything the base class 
does - though eventually it may. When it comes to converting it ot the new form 
we can just mimic what was in the original form though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to