[ 
https://issues.apache.org/jira/browse/BEAM-14035?focusedWorklogId=755633&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-755633
 ]

ASF GitHub Bot logged work on BEAM-14035:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Apr/22 08:08
            Start Date: 12/Apr/22 08:08
    Worklog Time Spent: 10m 
      Work Description: angoenka commented on code in PR #17181:
URL: https://github.com/apache/beam/pull/17181#discussion_r848042337


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformConfiguration.java:
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/** Configurations for reading from or writing to BigQuery. */
+public class BigQuerySchemaTransformConfiguration {

Review Comment:
   I think we can separate the configuration out to 
`BigQuerySchemaTransformReadConfiguration` and 
`BigQuerySchemaTransformWriteConfiguration`



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformConfiguration.java:
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/** Configurations for reading from or writing to BigQuery. */
+public class BigQuerySchemaTransformConfiguration {
+  private static final AutoValueSchema AUTO_VALUE_SCHEMA = new 
AutoValueSchema();
+
+  /**
+   * Instantiates a {@link Read.Builder} from the SQL query.
+   *
+   * <p>The configuration defaults to useStandardSql=true.
+   */
+  public static Read.Builder createQueryBuilder(String query) {

Review Comment:
   I think we can get away with only 1 read builder. `createReadBuilder`
   We also need to do validation at the time of build so that both `JobType` in 
consistence with other fields (`useStandardSql` and `TableSpec`).
   The returned builders are modifiable so there is no real value in 
overloading the create method.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import 
org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformConfiguration.JobType;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery read 
jobs configured using
+ * {@link BigQuerySchemaTransformConfiguration.Read}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformReadProvider
+    extends 
TypedSchemaTransformProvider<BigQuerySchemaTransformConfiguration.Read> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";

Review Comment:
   What does the version represent here?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import 
org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformConfiguration.JobType;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery read 
jobs configured using
+ * {@link BigQuerySchemaTransformConfiguration.Read}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformReadProvider
+    extends 
TypedSchemaTransformProvider<BigQuerySchemaTransformConfiguration.Read> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  private static final String OUTPUT_TAG = "OUTPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformConfiguration.Read> 
configurationClass() {
+    return BigQuerySchemaTransformConfiguration.Read.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformConfiguration.Read 
configuration) {
+    return new BigQueryReadSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier 
method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:%s", API, VERSION);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} 
inputCollectionNames method. Since
+   * no input is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} 
outputCollectionNames method. Since
+   * a single output is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for BigQuery read jobs 
configured using {@link
+   * BigQuerySchemaTransformConfiguration.Read}.
+   */
+  static class BigQueryReadSchemaTransform implements SchemaTransform {
+    private final BigQuerySchemaTransformConfiguration.Read configuration;
+
+    BigQueryReadSchemaTransform(BigQuerySchemaTransformConfiguration.Read 
configuration) {
+      this.configuration = configuration;
+    }
+
+    /** Implements {@link SchemaTransform} buildTransform method. */
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> 
buildTransform() {
+      return new PCollectionRowTupleTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for BigQuery read jobs configured 
using {@link
+   * BigQuerySchemaTransformConfiguration.Read}.
+   */
+  static class PCollectionRowTupleTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+
+    private final BigQuerySchemaTransformConfiguration.Read configuration;
+
+    /** An instance of {@link BigQueryServices} used for testing. */
+    private BigQueryServices testBigQueryServices = null;
+
+    PCollectionRowTupleTransform(BigQuerySchemaTransformConfiguration.Read 
configuration) {
+      this.configuration = configuration;
+    }
+
+    void setTestBigQueryServices(BigQueryServices testBigQueryServices) {

Review Comment:
   Mark this as @visibleForTesting



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteProvider.java:
##########
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery 
write jobs configured
+ * using {@link BigQuerySchemaTransformConfiguration.Write}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformWriteProvider
+    extends 
TypedSchemaTransformProvider<BigQuerySchemaTransformConfiguration.Write> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  static final String INPUT_TAG = "INPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformConfiguration.Write> 
configurationClass() {
+    return BigQuerySchemaTransformConfiguration.Write.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformConfiguration.Write 
configuration) {
+    return new BigQueryWriteSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier 
method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:%s", API, VERSION);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} 
inputCollectionNames method. Since a
+   * single is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT_TAG);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} 
outputCollectionNames method. Since
+   * no output is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * A {@link SchemaTransform} that performs {@link BigQueryIO.Write}s based 
on a {@link
+   * BigQuerySchemaTransformConfiguration.Write}.
+   */
+  static class BigQueryWriteSchemaTransform implements SchemaTransform {
+    private final BigQuerySchemaTransformConfiguration.Write configuration;
+
+    BigQueryWriteSchemaTransform(BigQuerySchemaTransformConfiguration.Write 
configuration) {
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> 
buildTransform() {
+      return new PCollectionRowTupleTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for BigQuery write jobs 
configured using {@link
+   * BigQuerySchemaTransformConfiguration.Write}.
+   */
+  static class PCollectionRowTupleTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+
+    private final BigQuerySchemaTransformConfiguration.Write configuration;
+
+    /** An instance of {@link BigQueryServices} used for testing. */
+    private BigQueryServices testBigQueryServices = null;
+
+    PCollectionRowTupleTransform(BigQuerySchemaTransformConfiguration.Write 
configuration) {
+      this.configuration = configuration;
+    }
+
+    @Override
+    public void validate(PipelineOptions options) {
+      CreateDisposition createDisposition = 
configuration.getCreateDispositionEnum();
+      Schema destinationSchema = getDestinationRowSchema(options);
+
+      if (destinationSchema == null) {
+        // We only care if the create disposition implies an existing table 
i.e. create never.
+        if (createDisposition.equals(CreateDisposition.CREATE_NEVER)) {
+          throw new InvalidConfigurationException(
+              String.format(
+                  "configuration create disposition: %s for table: %s for a 
null destination schema",
+                  createDisposition, configuration.getTableSpec()));
+        }
+      }
+    }

Review Comment:
   ```suggestion
         if (destinationSchema == null && 
createDisposition.equals(CreateDisposition.CREATE_NEVER) {
           // We only care if the create disposition implies an existing table 
i.e. create never.
             throw new InvalidConfigurationException(
                 String.format(
                     "configuration create disposition: %s for table: %s for a 
null destination schema",
                     createDisposition, configuration.getTableSpec()));
         }
       }
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformConfiguration.java:
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/** Configurations for reading from or writing to BigQuery. */
+public class BigQuerySchemaTransformConfiguration {
+  private static final AutoValueSchema AUTO_VALUE_SCHEMA = new 
AutoValueSchema();
+
+  /**
+   * Instantiates a {@link Read.Builder} from the SQL query.
+   *
+   * <p>The configuration defaults to useStandardSql=true.
+   */
+  public static Read.Builder createQueryBuilder(String query) {
+    return new AutoValue_BigQuerySchemaTransformConfiguration_Read.Builder()
+        .setJobType(JobType.QUERY)
+        .setUseStandardSql(Read.DEFAULT_USE_STANDARD_SQL)
+        .setQuery(query);
+  }
+
+  /**
+   * Instantiates a {@link Read.Builder} to support BigQuery extract jobs. See 
{@link
+   * BigQueryIO.TypedRead#from(String)} for the expected format.
+   */
+  public static Read.Builder createExtractBuilder(String tableSpec) {
+    return new AutoValue_BigQuerySchemaTransformConfiguration_Read.Builder()
+        .setJobType(JobType.EXTRACT)
+        .setTableSpec(tableSpec);
+  }
+
+  /** Instantiates a {@link Read.Builder} to support BigQuery extract jobs. */
+  public static Read.Builder createExtractBuilder(TableReference 
tableReference) {
+    return createExtractBuilder(BigQueryHelpers.toTableSpec(tableReference));
+  }
+
+  /**
+   * Instantiates a {@link Write.Builder} to support BigQuery load jobs. See 
{@link
+   * BigQueryIO.Write#to(String)}} for toTableSpec expected format.
+   */
+  public static Write.Builder createLoadBuilder(
+      String tableSpec, CreateDisposition createDisposition, WriteDisposition 
writeDisposition) {
+    return new AutoValue_BigQuerySchemaTransformConfiguration_Write.Builder()
+        .setTableSpec(tableSpec)
+        .setCreateDisposition(createDisposition.name())
+        .setWriteDisposition(writeDisposition.name());
+  }
+
+  /** Instantiates a {@link Write.Builder} to support BigQuery load jobs. */
+  public static Write.Builder createLoadBuilder(
+      TableReference toTable,
+      CreateDisposition createDisposition,
+      WriteDisposition writeDisposition) {
+    return createLoadBuilder(
+        BigQueryHelpers.toTableSpec(toTable), createDisposition, 
writeDisposition);
+  }
+
+  /**
+   * Configuration for reading from BigQuery.
+   *
+   * <p>This class is meant to be used with {@link 
BigQuerySchemaTransformReadProvider}.
+   *
+   * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+   * provide no backwards compatibility guarantees, and it should not be 
implemented outside the
+   * Beam repository.
+   */
+  @SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+  })
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class Read {
+    private static final TypeDescriptor<Read> TYPE_DESCRIPTOR = 
TypeDescriptor.of(Read.class);
+    private static final SerializableFunction<Read, Row> 
ROW_SERIALIZABLE_FUNCTION =
+        AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
+
+    private static final boolean DEFAULT_USE_STANDARD_SQL = true;
+
+    /** Configures the BigQuery job type. */
+    abstract JobType getJobType();
+
+    /** Serializes configuration to a {@link Row}. */
+    Row toBeamRow() {
+      return ROW_SERIALIZABLE_FUNCTION.apply(this);
+    }
+
+    /** Configures the BigQuery read job with the SQL query. */
+    @Nullable
+    public abstract String getQuery();
+
+    /**
+     * Specifies a table for a BigQuery read job. See {@link 
BigQueryIO.TypedRead#from(String)} for
+     * more details on the expected format.
+     */
+    @Nullable
+    public abstract String getTableSpec();
+
+    /** BigQuery geographic location where the query job will be executed. */
+    @Nullable
+    public abstract String getQueryLocation();
+
+    /** Enables BigQuery's Standard SQL dialect when reading from a query. */
+    @Nullable
+    public abstract Boolean getUseStandardSql();
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+
+      /** Configures the BigQuery job type. */
+      abstract Builder setJobType(JobType value);
+
+      /** Configures the BigQuery read job with the SQL query. */
+      public abstract Builder setQuery(String value);
+
+      /**
+       * Specifies a table for a BigQuery read job. See {@link 
BigQueryIO.TypedRead#from(String)}
+       * for more details on the expected format.
+       */
+      public abstract Builder setTableSpec(String value);
+
+      /** BigQuery geographic location where the query job will be executed. */
+      public abstract Builder setQueryLocation(String value);
+
+      /** Enables BigQuery's Standard SQL dialect when reading from a query. */
+      public abstract Builder setUseStandardSql(Boolean value);
+
+      /** Builds the {@link Read} configuration. */
+      public abstract Read build();
+    }
+  }
+
+  /**
+   * Configuration for writing to BigQuery.
+   *
+   * <p>This class is meant to be used with {@link 
BigQuerySchemaTransformWriteProvider}.
+   *
+   * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+   * provide no backwards compatibility guarantees, and it should not be 
implemented outside the
+   * Beam repository.
+   */
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class Write {
+    private static final 
TypeDescriptor<BigQuerySchemaTransformConfiguration.Write>
+        TYPE_DESCRIPTOR = 
TypeDescriptor.of(BigQuerySchemaTransformConfiguration.Write.class);
+    private static final 
SerializableFunction<BigQuerySchemaTransformConfiguration.Write, Row>
+        ROW_SERIALIZABLE_FUNCTION = 
AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
+
+    /**
+     * Writes to the given table specification. See {@link 
BigQueryIO.Write#to(String)}} for the
+     * expected format.
+     */
+    public abstract String getTableSpec();
+
+    /** Specifies whether the table should be created if it does not exist. */
+    public abstract String getCreateDisposition();
+
+    /** Specifies what to do with existing data in the table, in case the 
table already exists. */
+    public abstract String getWriteDisposition();
+
+    /** Returns the {@link #getTableSpec()} as a {@link TableReference}. */
+    TableReference getTableReference() {
+      return BigQueryHelpers.parseTableSpec(getTableSpec());
+    }
+
+    /** Returns the {@link #getCreateDisposition()} as a {@link 
CreateDisposition}. */
+    CreateDisposition getCreateDispositionEnum() {
+      return CreateDisposition.valueOf(getCreateDisposition());
+    }
+
+    /** Serializes configuration to a {@link Row}. */
+    Row toBeamRow() {
+      return ROW_SERIALIZABLE_FUNCTION.apply(this);
+    }
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+
+      /**
+       * Writes to the given table specification. See {@link 
BigQueryIO.Write#to(String)}} for the
+       * expected format.
+       */
+      public abstract Builder setTableSpec(String value);
+
+      /** Specifies whether the table should be created if it does not exist. 
*/
+      public abstract Builder setCreateDisposition(String value);
+
+      /** Specifies what to do with existing data in the table, in case the 
table already exists. */
+      public abstract Builder setWriteDisposition(String value);
+
+      /** Builds the {@link Write} configuration. */
+      public abstract Write build();
+    }
+  }
+
+  enum JobType {

Review Comment:
   Shall we make these public.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformConfiguration.java:
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/** Configurations for reading from or writing to BigQuery. */
+public class BigQuerySchemaTransformConfiguration {
+  private static final AutoValueSchema AUTO_VALUE_SCHEMA = new 
AutoValueSchema();
+
+  /**
+   * Instantiates a {@link Read.Builder} from the SQL query.
+   *
+   * <p>The configuration defaults to useStandardSql=true.
+   */
+  public static Read.Builder createQueryBuilder(String query) {
+    return new AutoValue_BigQuerySchemaTransformConfiguration_Read.Builder()
+        .setJobType(JobType.QUERY)
+        .setUseStandardSql(Read.DEFAULT_USE_STANDARD_SQL)
+        .setQuery(query);
+  }
+
+  /**
+   * Instantiates a {@link Read.Builder} to support BigQuery extract jobs. See 
{@link
+   * BigQueryIO.TypedRead#from(String)} for the expected format.
+   */
+  public static Read.Builder createExtractBuilder(String tableSpec) {
+    return new AutoValue_BigQuerySchemaTransformConfiguration_Read.Builder()
+        .setJobType(JobType.EXTRACT)
+        .setTableSpec(tableSpec);
+  }
+
+  /** Instantiates a {@link Read.Builder} to support BigQuery extract jobs. */
+  public static Read.Builder createExtractBuilder(TableReference 
tableReference) {
+    return createExtractBuilder(BigQueryHelpers.toTableSpec(tableReference));
+  }
+
+  /**
+   * Instantiates a {@link Write.Builder} to support BigQuery load jobs. See 
{@link
+   * BigQueryIO.Write#to(String)}} for toTableSpec expected format.
+   */
+  public static Write.Builder createLoadBuilder(

Review Comment:
   Same. We can have single builder creation method and sanity check at the 
time of build.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import 
org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformConfiguration.JobType;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery read 
jobs configured using
+ * {@link BigQuerySchemaTransformConfiguration.Read}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformReadProvider
+    extends 
TypedSchemaTransformProvider<BigQuerySchemaTransformConfiguration.Read> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  private static final String OUTPUT_TAG = "OUTPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformConfiguration.Read> 
configurationClass() {
+    return BigQuerySchemaTransformConfiguration.Read.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformConfiguration.Read 
configuration) {
+    return new BigQueryReadSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier 
method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:%s", API, VERSION);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} 
inputCollectionNames method. Since
+   * no input is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} 
outputCollectionNames method. Since
+   * a single output is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for BigQuery read jobs 
configured using {@link
+   * BigQuerySchemaTransformConfiguration.Read}.
+   */
+  static class BigQueryReadSchemaTransform implements SchemaTransform {

Review Comment:
   Can we make this class private?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import 
org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformConfiguration.JobType;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery read 
jobs configured using
+ * {@link BigQuerySchemaTransformConfiguration.Read}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformReadProvider
+    extends 
TypedSchemaTransformProvider<BigQuerySchemaTransformConfiguration.Read> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  private static final String OUTPUT_TAG = "OUTPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformConfiguration.Read> 
configurationClass() {
+    return BigQuerySchemaTransformConfiguration.Read.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformConfiguration.Read 
configuration) {
+    return new BigQueryReadSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier 
method. */
+  @Override
+  public String identifier() {

Review Comment:
   I think needs to have read some where in the name



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformConfiguration.java:
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/** Configurations for reading from or writing to BigQuery. */
+public class BigQuerySchemaTransformConfiguration {
+  private static final AutoValueSchema AUTO_VALUE_SCHEMA = new 
AutoValueSchema();
+
+  /**
+   * Instantiates a {@link Read.Builder} from the SQL query.
+   *
+   * <p>The configuration defaults to useStandardSql=true.
+   */
+  public static Read.Builder createQueryBuilder(String query) {
+    return new AutoValue_BigQuerySchemaTransformConfiguration_Read.Builder()
+        .setJobType(JobType.QUERY)
+        .setUseStandardSql(Read.DEFAULT_USE_STANDARD_SQL)
+        .setQuery(query);
+  }
+
+  /**
+   * Instantiates a {@link Read.Builder} to support BigQuery extract jobs. See 
{@link
+   * BigQueryIO.TypedRead#from(String)} for the expected format.
+   */
+  public static Read.Builder createExtractBuilder(String tableSpec) {
+    return new AutoValue_BigQuerySchemaTransformConfiguration_Read.Builder()
+        .setJobType(JobType.EXTRACT)
+        .setTableSpec(tableSpec);
+  }
+
+  /** Instantiates a {@link Read.Builder} to support BigQuery extract jobs. */
+  public static Read.Builder createExtractBuilder(TableReference 
tableReference) {
+    return createExtractBuilder(BigQueryHelpers.toTableSpec(tableReference));
+  }
+
+  /**
+   * Instantiates a {@link Write.Builder} to support BigQuery load jobs. See 
{@link
+   * BigQueryIO.Write#to(String)}} for toTableSpec expected format.
+   */
+  public static Write.Builder createLoadBuilder(
+      String tableSpec, CreateDisposition createDisposition, WriteDisposition 
writeDisposition) {
+    return new AutoValue_BigQuerySchemaTransformConfiguration_Write.Builder()
+        .setTableSpec(tableSpec)
+        .setCreateDisposition(createDisposition.name())
+        .setWriteDisposition(writeDisposition.name());
+  }
+
+  /** Instantiates a {@link Write.Builder} to support BigQuery load jobs. */
+  public static Write.Builder createLoadBuilder(
+      TableReference toTable,
+      CreateDisposition createDisposition,
+      WriteDisposition writeDisposition) {
+    return createLoadBuilder(
+        BigQueryHelpers.toTableSpec(toTable), createDisposition, 
writeDisposition);
+  }
+
+  /**
+   * Configuration for reading from BigQuery.
+   *
+   * <p>This class is meant to be used with {@link 
BigQuerySchemaTransformReadProvider}.
+   *
+   * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+   * provide no backwards compatibility guarantees, and it should not be 
implemented outside the
+   * Beam repository.
+   */
+  @SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+  })
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class Read {
+    private static final TypeDescriptor<Read> TYPE_DESCRIPTOR = 
TypeDescriptor.of(Read.class);
+    private static final SerializableFunction<Read, Row> 
ROW_SERIALIZABLE_FUNCTION =
+        AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
+
+    private static final boolean DEFAULT_USE_STANDARD_SQL = true;
+
+    /** Configures the BigQuery job type. */
+    abstract JobType getJobType();
+
+    /** Serializes configuration to a {@link Row}. */
+    Row toBeamRow() {
+      return ROW_SERIALIZABLE_FUNCTION.apply(this);
+    }
+
+    /** Configures the BigQuery read job with the SQL query. */
+    @Nullable
+    public abstract String getQuery();
+
+    /**
+     * Specifies a table for a BigQuery read job. See {@link 
BigQueryIO.TypedRead#from(String)} for
+     * more details on the expected format.
+     */
+    @Nullable
+    public abstract String getTableSpec();
+
+    /** BigQuery geographic location where the query job will be executed. */
+    @Nullable
+    public abstract String getQueryLocation();
+
+    /** Enables BigQuery's Standard SQL dialect when reading from a query. */
+    @Nullable
+    public abstract Boolean getUseStandardSql();
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+
+      /** Configures the BigQuery job type. */
+      abstract Builder setJobType(JobType value);
+
+      /** Configures the BigQuery read job with the SQL query. */
+      public abstract Builder setQuery(String value);
+
+      /**
+       * Specifies a table for a BigQuery read job. See {@link 
BigQueryIO.TypedRead#from(String)}
+       * for more details on the expected format.
+       */
+      public abstract Builder setTableSpec(String value);
+
+      /** BigQuery geographic location where the query job will be executed. */
+      public abstract Builder setQueryLocation(String value);
+
+      /** Enables BigQuery's Standard SQL dialect when reading from a query. */
+      public abstract Builder setUseStandardSql(Boolean value);
+
+      /** Builds the {@link Read} configuration. */
+      public abstract Read build();
+    }
+  }
+
+  /**
+   * Configuration for writing to BigQuery.
+   *
+   * <p>This class is meant to be used with {@link 
BigQuerySchemaTransformWriteProvider}.
+   *
+   * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+   * provide no backwards compatibility guarantees, and it should not be 
implemented outside the
+   * Beam repository.
+   */
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class Write {
+    private static final 
TypeDescriptor<BigQuerySchemaTransformConfiguration.Write>
+        TYPE_DESCRIPTOR = 
TypeDescriptor.of(BigQuerySchemaTransformConfiguration.Write.class);
+    private static final 
SerializableFunction<BigQuerySchemaTransformConfiguration.Write, Row>
+        ROW_SERIALIZABLE_FUNCTION = 
AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
+
+    /**
+     * Writes to the given table specification. See {@link 
BigQueryIO.Write#to(String)}} for the
+     * expected format.
+     */
+    public abstract String getTableSpec();
+
+    /** Specifies whether the table should be created if it does not exist. */
+    public abstract String getCreateDisposition();
+
+    /** Specifies what to do with existing data in the table, in case the 
table already exists. */
+    public abstract String getWriteDisposition();
+
+    /** Returns the {@link #getTableSpec()} as a {@link TableReference}. */
+    TableReference getTableReference() {
+      return BigQueryHelpers.parseTableSpec(getTableSpec());
+    }
+
+    /** Returns the {@link #getCreateDisposition()} as a {@link 
CreateDisposition}. */
+    CreateDisposition getCreateDispositionEnum() {

Review Comment:
   We can remove this.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformConfiguration.java:
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/** Configurations for reading from or writing to BigQuery. */
+public class BigQuerySchemaTransformConfiguration {
+  private static final AutoValueSchema AUTO_VALUE_SCHEMA = new 
AutoValueSchema();
+
+  /**
+   * Instantiates a {@link Read.Builder} from the SQL query.
+   *
+   * <p>The configuration defaults to useStandardSql=true.
+   */
+  public static Read.Builder createQueryBuilder(String query) {
+    return new AutoValue_BigQuerySchemaTransformConfiguration_Read.Builder()
+        .setJobType(JobType.QUERY)
+        .setUseStandardSql(Read.DEFAULT_USE_STANDARD_SQL)
+        .setQuery(query);
+  }
+
+  /**
+   * Instantiates a {@link Read.Builder} to support BigQuery extract jobs. See 
{@link
+   * BigQueryIO.TypedRead#from(String)} for the expected format.
+   */
+  public static Read.Builder createExtractBuilder(String tableSpec) {
+    return new AutoValue_BigQuerySchemaTransformConfiguration_Read.Builder()
+        .setJobType(JobType.EXTRACT)
+        .setTableSpec(tableSpec);
+  }
+
+  /** Instantiates a {@link Read.Builder} to support BigQuery extract jobs. */
+  public static Read.Builder createExtractBuilder(TableReference 
tableReference) {
+    return createExtractBuilder(BigQueryHelpers.toTableSpec(tableReference));
+  }
+
+  /**
+   * Instantiates a {@link Write.Builder} to support BigQuery load jobs. See 
{@link
+   * BigQueryIO.Write#to(String)}} for toTableSpec expected format.
+   */
+  public static Write.Builder createLoadBuilder(
+      String tableSpec, CreateDisposition createDisposition, WriteDisposition 
writeDisposition) {
+    return new AutoValue_BigQuerySchemaTransformConfiguration_Write.Builder()
+        .setTableSpec(tableSpec)
+        .setCreateDisposition(createDisposition.name())
+        .setWriteDisposition(writeDisposition.name());
+  }
+
+  /** Instantiates a {@link Write.Builder} to support BigQuery load jobs. */
+  public static Write.Builder createLoadBuilder(
+      TableReference toTable,
+      CreateDisposition createDisposition,
+      WriteDisposition writeDisposition) {
+    return createLoadBuilder(
+        BigQueryHelpers.toTableSpec(toTable), createDisposition, 
writeDisposition);
+  }
+
+  /**
+   * Configuration for reading from BigQuery.
+   *
+   * <p>This class is meant to be used with {@link 
BigQuerySchemaTransformReadProvider}.
+   *
+   * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+   * provide no backwards compatibility guarantees, and it should not be 
implemented outside the
+   * Beam repository.
+   */
+  @SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+  })
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class Read {
+    private static final TypeDescriptor<Read> TYPE_DESCRIPTOR = 
TypeDescriptor.of(Read.class);
+    private static final SerializableFunction<Read, Row> 
ROW_SERIALIZABLE_FUNCTION =
+        AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
+
+    private static final boolean DEFAULT_USE_STANDARD_SQL = true;
+
+    /** Configures the BigQuery job type. */
+    abstract JobType getJobType();
+
+    /** Serializes configuration to a {@link Row}. */
+    Row toBeamRow() {
+      return ROW_SERIALIZABLE_FUNCTION.apply(this);
+    }
+
+    /** Configures the BigQuery read job with the SQL query. */
+    @Nullable
+    public abstract String getQuery();
+
+    /**
+     * Specifies a table for a BigQuery read job. See {@link 
BigQueryIO.TypedRead#from(String)} for
+     * more details on the expected format.
+     */
+    @Nullable
+    public abstract String getTableSpec();
+
+    /** BigQuery geographic location where the query job will be executed. */
+    @Nullable
+    public abstract String getQueryLocation();
+
+    /** Enables BigQuery's Standard SQL dialect when reading from a query. */
+    @Nullable
+    public abstract Boolean getUseStandardSql();
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+
+      /** Configures the BigQuery job type. */
+      abstract Builder setJobType(JobType value);
+
+      /** Configures the BigQuery read job with the SQL query. */
+      public abstract Builder setQuery(String value);
+
+      /**
+       * Specifies a table for a BigQuery read job. See {@link 
BigQueryIO.TypedRead#from(String)}
+       * for more details on the expected format.
+       */
+      public abstract Builder setTableSpec(String value);
+
+      /** BigQuery geographic location where the query job will be executed. */
+      public abstract Builder setQueryLocation(String value);
+
+      /** Enables BigQuery's Standard SQL dialect when reading from a query. */
+      public abstract Builder setUseStandardSql(Boolean value);
+
+      /** Builds the {@link Read} configuration. */
+      public abstract Read build();
+    }
+  }
+
+  /**
+   * Configuration for writing to BigQuery.
+   *
+   * <p>This class is meant to be used with {@link 
BigQuerySchemaTransformWriteProvider}.
+   *
+   * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+   * provide no backwards compatibility guarantees, and it should not be 
implemented outside the
+   * Beam repository.
+   */
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class Write {
+    private static final 
TypeDescriptor<BigQuerySchemaTransformConfiguration.Write>
+        TYPE_DESCRIPTOR = 
TypeDescriptor.of(BigQuerySchemaTransformConfiguration.Write.class);
+    private static final 
SerializableFunction<BigQuerySchemaTransformConfiguration.Write, Row>
+        ROW_SERIALIZABLE_FUNCTION = 
AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
+
+    /**
+     * Writes to the given table specification. See {@link 
BigQueryIO.Write#to(String)}} for the
+     * expected format.
+     */
+    public abstract String getTableSpec();
+
+    /** Specifies whether the table should be created if it does not exist. */
+    public abstract String getCreateDisposition();
+
+    /** Specifies what to do with existing data in the table, in case the 
table already exists. */
+    public abstract String getWriteDisposition();
+
+    /** Returns the {@link #getTableSpec()} as a {@link TableReference}. */
+    TableReference getTableReference() {
+      return BigQueryHelpers.parseTableSpec(getTableSpec());
+    }
+
+    /** Returns the {@link #getCreateDisposition()} as a {@link 
CreateDisposition}. */
+    CreateDisposition getCreateDispositionEnum() {
+      return CreateDisposition.valueOf(getCreateDisposition());
+    }
+
+    /** Serializes configuration to a {@link Row}. */
+    Row toBeamRow() {

Review Comment:
   We can remove this.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformConfiguration.java:
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/** Configurations for reading from or writing to BigQuery. */
+public class BigQuerySchemaTransformConfiguration {
+  private static final AutoValueSchema AUTO_VALUE_SCHEMA = new 
AutoValueSchema();
+
+  /**
+   * Instantiates a {@link Read.Builder} from the SQL query.
+   *
+   * <p>The configuration defaults to useStandardSql=true.
+   */
+  public static Read.Builder createQueryBuilder(String query) {
+    return new AutoValue_BigQuerySchemaTransformConfiguration_Read.Builder()
+        .setJobType(JobType.QUERY)
+        .setUseStandardSql(Read.DEFAULT_USE_STANDARD_SQL)
+        .setQuery(query);
+  }
+
+  /**
+   * Instantiates a {@link Read.Builder} to support BigQuery extract jobs. See 
{@link
+   * BigQueryIO.TypedRead#from(String)} for the expected format.
+   */
+  public static Read.Builder createExtractBuilder(String tableSpec) {
+    return new AutoValue_BigQuerySchemaTransformConfiguration_Read.Builder()
+        .setJobType(JobType.EXTRACT)
+        .setTableSpec(tableSpec);
+  }
+
+  /** Instantiates a {@link Read.Builder} to support BigQuery extract jobs. */
+  public static Read.Builder createExtractBuilder(TableReference 
tableReference) {
+    return createExtractBuilder(BigQueryHelpers.toTableSpec(tableReference));
+  }
+
+  /**
+   * Instantiates a {@link Write.Builder} to support BigQuery load jobs. See 
{@link
+   * BigQueryIO.Write#to(String)}} for toTableSpec expected format.
+   */
+  public static Write.Builder createLoadBuilder(
+      String tableSpec, CreateDisposition createDisposition, WriteDisposition 
writeDisposition) {
+    return new AutoValue_BigQuerySchemaTransformConfiguration_Write.Builder()
+        .setTableSpec(tableSpec)
+        .setCreateDisposition(createDisposition.name())
+        .setWriteDisposition(writeDisposition.name());
+  }
+
+  /** Instantiates a {@link Write.Builder} to support BigQuery load jobs. */
+  public static Write.Builder createLoadBuilder(
+      TableReference toTable,
+      CreateDisposition createDisposition,
+      WriteDisposition writeDisposition) {
+    return createLoadBuilder(
+        BigQueryHelpers.toTableSpec(toTable), createDisposition, 
writeDisposition);
+  }
+
+  /**
+   * Configuration for reading from BigQuery.
+   *
+   * <p>This class is meant to be used with {@link 
BigQuerySchemaTransformReadProvider}.
+   *
+   * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+   * provide no backwards compatibility guarantees, and it should not be 
implemented outside the
+   * Beam repository.
+   */
+  @SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+  })
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class Read {
+    private static final TypeDescriptor<Read> TYPE_DESCRIPTOR = 
TypeDescriptor.of(Read.class);
+    private static final SerializableFunction<Read, Row> 
ROW_SERIALIZABLE_FUNCTION =
+        AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
+
+    private static final boolean DEFAULT_USE_STANDARD_SQL = true;
+
+    /** Configures the BigQuery job type. */
+    abstract JobType getJobType();
+
+    /** Serializes configuration to a {@link Row}. */
+    Row toBeamRow() {
+      return ROW_SERIALIZABLE_FUNCTION.apply(this);
+    }
+
+    /** Configures the BigQuery read job with the SQL query. */
+    @Nullable
+    public abstract String getQuery();
+
+    /**
+     * Specifies a table for a BigQuery read job. See {@link 
BigQueryIO.TypedRead#from(String)} for
+     * more details on the expected format.
+     */
+    @Nullable
+    public abstract String getTableSpec();
+
+    /** BigQuery geographic location where the query job will be executed. */
+    @Nullable
+    public abstract String getQueryLocation();
+
+    /** Enables BigQuery's Standard SQL dialect when reading from a query. */
+    @Nullable
+    public abstract Boolean getUseStandardSql();
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+
+      /** Configures the BigQuery job type. */
+      abstract Builder setJobType(JobType value);
+
+      /** Configures the BigQuery read job with the SQL query. */
+      public abstract Builder setQuery(String value);
+
+      /**
+       * Specifies a table for a BigQuery read job. See {@link 
BigQueryIO.TypedRead#from(String)}
+       * for more details on the expected format.
+       */
+      public abstract Builder setTableSpec(String value);
+
+      /** BigQuery geographic location where the query job will be executed. */
+      public abstract Builder setQueryLocation(String value);
+
+      /** Enables BigQuery's Standard SQL dialect when reading from a query. */
+      public abstract Builder setUseStandardSql(Boolean value);
+
+      /** Builds the {@link Read} configuration. */
+      public abstract Read build();
+    }
+  }
+
+  /**
+   * Configuration for writing to BigQuery.
+   *
+   * <p>This class is meant to be used with {@link 
BigQuerySchemaTransformWriteProvider}.
+   *
+   * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+   * provide no backwards compatibility guarantees, and it should not be 
implemented outside the
+   * Beam repository.
+   */
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class Write {
+    private static final 
TypeDescriptor<BigQuerySchemaTransformConfiguration.Write>
+        TYPE_DESCRIPTOR = 
TypeDescriptor.of(BigQuerySchemaTransformConfiguration.Write.class);
+    private static final 
SerializableFunction<BigQuerySchemaTransformConfiguration.Write, Row>
+        ROW_SERIALIZABLE_FUNCTION = 
AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
+
+    /**
+     * Writes to the given table specification. See {@link 
BigQueryIO.Write#to(String)}} for the
+     * expected format.
+     */
+    public abstract String getTableSpec();
+
+    /** Specifies whether the table should be created if it does not exist. */
+    public abstract String getCreateDisposition();
+
+    /** Specifies what to do with existing data in the table, in case the 
table already exists. */
+    public abstract String getWriteDisposition();
+
+    /** Returns the {@link #getTableSpec()} as a {@link TableReference}. */
+    TableReference getTableReference() {

Review Comment:
   We can remove this.
   Its generally good to avoid single line methods which are not exposed.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteProvider.java:
##########
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery 
write jobs configured
+ * using {@link BigQuerySchemaTransformConfiguration.Write}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformWriteProvider
+    extends 
TypedSchemaTransformProvider<BigQuerySchemaTransformConfiguration.Write> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  static final String INPUT_TAG = "INPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformConfiguration.Write> 
configurationClass() {
+    return BigQuerySchemaTransformConfiguration.Write.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformConfiguration.Write 
configuration) {
+    return new BigQueryWriteSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier 
method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:%s", API, VERSION);

Review Comment:
   We should have write in the name



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteProvider.java:
##########
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery 
write jobs configured
+ * using {@link BigQuerySchemaTransformConfiguration.Write}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformWriteProvider
+    extends 
TypedSchemaTransformProvider<BigQuerySchemaTransformConfiguration.Write> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  static final String INPUT_TAG = "INPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformConfiguration.Write> 
configurationClass() {
+    return BigQuerySchemaTransformConfiguration.Write.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformConfiguration.Write 
configuration) {
+    return new BigQueryWriteSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier 
method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:%s", API, VERSION);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} 
inputCollectionNames method. Since a
+   * single is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT_TAG);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} 
outputCollectionNames method. Since
+   * no output is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * A {@link SchemaTransform} that performs {@link BigQueryIO.Write}s based 
on a {@link
+   * BigQuerySchemaTransformConfiguration.Write}.
+   */
+  static class BigQueryWriteSchemaTransform implements SchemaTransform {
+    private final BigQuerySchemaTransformConfiguration.Write configuration;
+
+    BigQueryWriteSchemaTransform(BigQuerySchemaTransformConfiguration.Write 
configuration) {
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> 
buildTransform() {
+      return new PCollectionRowTupleTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for BigQuery write jobs 
configured using {@link
+   * BigQuerySchemaTransformConfiguration.Write}.
+   */
+  static class PCollectionRowTupleTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+
+    private final BigQuerySchemaTransformConfiguration.Write configuration;
+
+    /** An instance of {@link BigQueryServices} used for testing. */
+    private BigQueryServices testBigQueryServices = null;
+
+    PCollectionRowTupleTransform(BigQuerySchemaTransformConfiguration.Write 
configuration) {
+      this.configuration = configuration;
+    }
+
+    @Override
+    public void validate(PipelineOptions options) {
+      CreateDisposition createDisposition = 
configuration.getCreateDispositionEnum();
+      Schema destinationSchema = getDestinationRowSchema(options);
+
+      if (destinationSchema == null) {
+        // We only care if the create disposition implies an existing table 
i.e. create never.
+        if (createDisposition.equals(CreateDisposition.CREATE_NEVER)) {
+          throw new InvalidConfigurationException(
+              String.format(
+                  "configuration create disposition: %s for table: %s for a 
null destination schema",
+                  createDisposition, configuration.getTableSpec()));
+        }
+      }
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      validate(input);
+      PCollection<Row> rowPCollection = input.get(INPUT_TAG);
+      Schema schema = rowPCollection.getSchema();
+      BigQueryIO.Write<TableRow> write = toWrite(schema);
+      if (testBigQueryServices != null) {
+        write = write.withTestServices(testBigQueryServices);
+      }
+
+      PCollection<TableRow> tableRowPCollection =
+          rowPCollection.apply(
+              
MapElements.into(TypeDescriptor.of(TableRow.class)).via(BigQueryUtils::toTableRow));
+      tableRowPCollection.apply(write);
+      return PCollectionRowTuple.empty(input.getPipeline());
+    }
+
+    /** Instantiates a {@link BigQueryIO.Write<TableRow>} from a {@link 
Schema}. */
+    BigQueryIO.Write<TableRow> toWrite(Schema schema) {
+      TableSchema tableSchema = BigQueryUtils.toTableSchema(schema);
+      CreateDisposition createDisposition =
+          CreateDisposition.valueOf(configuration.getCreateDisposition());
+      WriteDisposition writeDisposition =
+          WriteDisposition.valueOf(configuration.getWriteDisposition());
+
+      return BigQueryIO.writeTableRows()
+          .to(configuration.getTableSpec())
+          .withCreateDisposition(createDisposition)
+          .withWriteDisposition(writeDisposition)
+          .withSchema(tableSchema);
+    }
+
+    /** Setter for testing using {@link BigQueryServices}. */
+    void setTestBigQueryServices(BigQueryServices testBigQueryServices) {
+      this.testBigQueryServices = testBigQueryServices;
+    }
+
+    /** Validate a {@link PCollectionRowTuple} input. */
+    void validate(PCollectionRowTuple input) {
+      if (!input.has(INPUT_TAG)) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s %s is missing expected tag: %s",
+                getClass().getSimpleName(), input.getClass().getSimpleName(), 
INPUT_TAG));
+      }
+
+      validate(input.get(INPUT_TAG));

Review Comment:
   I think we can inline this method call.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteProvider.java:
##########
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery 
write jobs configured
+ * using {@link BigQuerySchemaTransformConfiguration.Write}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformWriteProvider
+    extends 
TypedSchemaTransformProvider<BigQuerySchemaTransformConfiguration.Write> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  static final String INPUT_TAG = "INPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformConfiguration.Write> 
configurationClass() {
+    return BigQuerySchemaTransformConfiguration.Write.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformConfiguration.Write 
configuration) {
+    return new BigQueryWriteSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier 
method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:%s", API, VERSION);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} 
inputCollectionNames method. Since a
+   * single is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT_TAG);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} 
outputCollectionNames method. Since
+   * no output is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * A {@link SchemaTransform} that performs {@link BigQueryIO.Write}s based 
on a {@link
+   * BigQuerySchemaTransformConfiguration.Write}.
+   */
+  static class BigQueryWriteSchemaTransform implements SchemaTransform {
+    private final BigQuerySchemaTransformConfiguration.Write configuration;
+
+    BigQueryWriteSchemaTransform(BigQuerySchemaTransformConfiguration.Write 
configuration) {
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> 
buildTransform() {
+      return new PCollectionRowTupleTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for BigQuery write jobs 
configured using {@link
+   * BigQuerySchemaTransformConfiguration.Write}.
+   */
+  static class PCollectionRowTupleTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+
+    private final BigQuerySchemaTransformConfiguration.Write configuration;
+
+    /** An instance of {@link BigQueryServices} used for testing. */
+    private BigQueryServices testBigQueryServices = null;
+
+    PCollectionRowTupleTransform(BigQuerySchemaTransformConfiguration.Write 
configuration) {
+      this.configuration = configuration;
+    }
+
+    @Override
+    public void validate(PipelineOptions options) {
+      CreateDisposition createDisposition = 
configuration.getCreateDispositionEnum();
+      Schema destinationSchema = getDestinationRowSchema(options);
+
+      if (destinationSchema == null) {
+        // We only care if the create disposition implies an existing table 
i.e. create never.
+        if (createDisposition.equals(CreateDisposition.CREATE_NEVER)) {
+          throw new InvalidConfigurationException(
+              String.format(
+                  "configuration create disposition: %s for table: %s for a 
null destination schema",
+                  createDisposition, configuration.getTableSpec()));
+        }
+      }
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      validate(input);
+      PCollection<Row> rowPCollection = input.get(INPUT_TAG);
+      Schema schema = rowPCollection.getSchema();
+      BigQueryIO.Write<TableRow> write = toWrite(schema);
+      if (testBigQueryServices != null) {
+        write = write.withTestServices(testBigQueryServices);
+      }
+
+      PCollection<TableRow> tableRowPCollection =
+          rowPCollection.apply(
+              
MapElements.into(TypeDescriptor.of(TableRow.class)).via(BigQueryUtils::toTableRow));
+      tableRowPCollection.apply(write);
+      return PCollectionRowTuple.empty(input.getPipeline());
+    }
+
+    /** Instantiates a {@link BigQueryIO.Write<TableRow>} from a {@link 
Schema}. */
+    BigQueryIO.Write<TableRow> toWrite(Schema schema) {
+      TableSchema tableSchema = BigQueryUtils.toTableSchema(schema);
+      CreateDisposition createDisposition =
+          CreateDisposition.valueOf(configuration.getCreateDisposition());
+      WriteDisposition writeDisposition =
+          WriteDisposition.valueOf(configuration.getWriteDisposition());
+
+      return BigQueryIO.writeTableRows()
+          .to(configuration.getTableSpec())
+          .withCreateDisposition(createDisposition)
+          .withWriteDisposition(writeDisposition)
+          .withSchema(tableSchema);
+    }
+
+    /** Setter for testing using {@link BigQueryServices}. */
+    void setTestBigQueryServices(BigQueryServices testBigQueryServices) {
+      this.testBigQueryServices = testBigQueryServices;
+    }
+
+    /** Validate a {@link PCollectionRowTuple} input. */
+    void validate(PCollectionRowTuple input) {
+      if (!input.has(INPUT_TAG)) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s %s is missing expected tag: %s",
+                getClass().getSimpleName(), input.getClass().getSimpleName(), 
INPUT_TAG));
+      }
+
+      validate(input.get(INPUT_TAG));
+    }
+
+    /** Validate a {@link PCollection<Row>} input. */
+    void validate(PCollection<Row> input) {
+      Schema sourceSchema = input.getSchema();
+      if (sourceSchema == null) {
+        throw new IllegalArgumentException(
+            String.format("%s is null for input of tag: %s", Schema.class, 
INPUT_TAG));
+      }
+
+      Schema destinationSchema = 
getDestinationRowSchema(input.getPipeline().getOptions());
+
+      // We already evaluate whether we should acquire a destination schema.
+      // See the validate(PipelineOptions) method for details.
+      if (destinationSchema != null) {
+        validateMatching(sourceSchema, destinationSchema);
+      }
+    }
+
+    void validateMatching(Schema sourceSchema, Schema destinationSchema) {
+      Set<String> fieldNames = new HashSet<>();
+      List<String> mismatchedFieldNames = new ArrayList<>();
+      fieldNames.addAll(sourceSchema.getFieldNames());
+      fieldNames.addAll(destinationSchema.getFieldNames());
+      for (String name : fieldNames) {
+        Field gotField = null;
+        Field wantField = null;
+        if (sourceSchema.hasField(name)) {
+          gotField = sourceSchema.getField(name);
+        }
+        if (destinationSchema.hasField(name)) {
+          wantField = destinationSchema.getField(name);
+        }
+        if (!matches(wantField, gotField)) {
+          mismatchedFieldNames.add(name);
+        }
+      }
+
+      if (!mismatchedFieldNames.isEmpty()) {
+        throw new IllegalArgumentException(
+            String.format(
+                "source and destination schema mismatch for table: %s with 
fields: %s",
+                configuration.getTableSpec(), String.join(" | ", 
mismatchedFieldNames)));
+      }
+    }
+
+    private boolean matches(Field want, Field got) {
+      if (want == null && got == null) {
+        return true;
+      }
+      if (want == null) {
+        return false;
+      }
+      return want.equals(got);
+    }
+
+    TableSchema getDestinationTableSchema(BigQueryOptions options) {
+      Table destinationTable = getDestinationTable(options);
+      if (destinationTable == null) {
+        return null;
+      }
+      return destinationTable.getSchema();
+    }
+
+    DatasetService getDatasetService(BigQueryOptions options) {
+      BigQueryServices bigQueryServices = testBigQueryServices;
+      if (bigQueryServices == null) {
+        bigQueryServices = new BigQueryServicesImpl();
+      }
+      return bigQueryServices.getDatasetService(options);
+    }
+
+    Table getDestinationTable(BigQueryOptions options) {
+      Table destinationTable = null;
+      CreateDisposition createDisposition = 
configuration.getCreateDispositionEnum();
+      DatasetService datasetService = getDatasetService(options);
+      try {
+        destinationTable = 
datasetService.getTable(configuration.getTableReference());
+      } catch (IOException | InterruptedException e) {
+        // We only care if the create disposition implies an existing table 
i.e. create never.
+        if (createDisposition.equals(CreateDisposition.CREATE_NEVER)) {
+          throw new InvalidConfigurationException(
+              String.format(
+                  "error querying destination schema for create disposition: 
%s for table: %s, error: %s",
+                  createDisposition, configuration.getTableSpec(), 
e.getMessage()));
+        }
+      }
+      return destinationTable;
+    }
+
+    Schema getDestinationRowSchema(BigQueryOptions options) {
+      TableSchema tableSchema = getDestinationTableSchema(options);

Review Comment:
   I think we can inline this.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteProvider.java:
##########
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery 
write jobs configured
+ * using {@link BigQuerySchemaTransformConfiguration.Write}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformWriteProvider
+    extends 
TypedSchemaTransformProvider<BigQuerySchemaTransformConfiguration.Write> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  static final String INPUT_TAG = "INPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformConfiguration.Write> 
configurationClass() {
+    return BigQuerySchemaTransformConfiguration.Write.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformConfiguration.Write 
configuration) {
+    return new BigQueryWriteSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier 
method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:%s", API, VERSION);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} 
inputCollectionNames method. Since a
+   * single is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT_TAG);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} 
outputCollectionNames method. Since
+   * no output is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * A {@link SchemaTransform} that performs {@link BigQueryIO.Write}s based 
on a {@link
+   * BigQuerySchemaTransformConfiguration.Write}.
+   */
+  static class BigQueryWriteSchemaTransform implements SchemaTransform {
+    private final BigQuerySchemaTransformConfiguration.Write configuration;
+
+    BigQueryWriteSchemaTransform(BigQuerySchemaTransformConfiguration.Write 
configuration) {
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> 
buildTransform() {
+      return new PCollectionRowTupleTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for BigQuery write jobs 
configured using {@link
+   * BigQuerySchemaTransformConfiguration.Write}.
+   */
+  static class PCollectionRowTupleTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+
+    private final BigQuerySchemaTransformConfiguration.Write configuration;
+
+    /** An instance of {@link BigQueryServices} used for testing. */
+    private BigQueryServices testBigQueryServices = null;
+
+    PCollectionRowTupleTransform(BigQuerySchemaTransformConfiguration.Write 
configuration) {
+      this.configuration = configuration;
+    }
+
+    @Override
+    public void validate(PipelineOptions options) {
+      CreateDisposition createDisposition = 
configuration.getCreateDispositionEnum();
+      Schema destinationSchema = getDestinationRowSchema(options);
+
+      if (destinationSchema == null) {
+        // We only care if the create disposition implies an existing table 
i.e. create never.
+        if (createDisposition.equals(CreateDisposition.CREATE_NEVER)) {
+          throw new InvalidConfigurationException(
+              String.format(
+                  "configuration create disposition: %s for table: %s for a 
null destination schema",
+                  createDisposition, configuration.getTableSpec()));
+        }
+      }
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      validate(input);
+      PCollection<Row> rowPCollection = input.get(INPUT_TAG);
+      Schema schema = rowPCollection.getSchema();
+      BigQueryIO.Write<TableRow> write = toWrite(schema);
+      if (testBigQueryServices != null) {
+        write = write.withTestServices(testBigQueryServices);
+      }
+
+      PCollection<TableRow> tableRowPCollection =
+          rowPCollection.apply(
+              
MapElements.into(TypeDescriptor.of(TableRow.class)).via(BigQueryUtils::toTableRow));
+      tableRowPCollection.apply(write);
+      return PCollectionRowTuple.empty(input.getPipeline());
+    }
+
+    /** Instantiates a {@link BigQueryIO.Write<TableRow>} from a {@link 
Schema}. */
+    BigQueryIO.Write<TableRow> toWrite(Schema schema) {
+      TableSchema tableSchema = BigQueryUtils.toTableSchema(schema);
+      CreateDisposition createDisposition =
+          CreateDisposition.valueOf(configuration.getCreateDisposition());
+      WriteDisposition writeDisposition =
+          WriteDisposition.valueOf(configuration.getWriteDisposition());
+
+      return BigQueryIO.writeTableRows()
+          .to(configuration.getTableSpec())
+          .withCreateDisposition(createDisposition)
+          .withWriteDisposition(writeDisposition)
+          .withSchema(tableSchema);
+    }
+
+    /** Setter for testing using {@link BigQueryServices}. */
+    void setTestBigQueryServices(BigQueryServices testBigQueryServices) {
+      this.testBigQueryServices = testBigQueryServices;
+    }
+
+    /** Validate a {@link PCollectionRowTuple} input. */
+    void validate(PCollectionRowTuple input) {
+      if (!input.has(INPUT_TAG)) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s %s is missing expected tag: %s",
+                getClass().getSimpleName(), input.getClass().getSimpleName(), 
INPUT_TAG));
+      }
+
+      validate(input.get(INPUT_TAG));
+    }
+
+    /** Validate a {@link PCollection<Row>} input. */
+    void validate(PCollection<Row> input) {
+      Schema sourceSchema = input.getSchema();
+      if (sourceSchema == null) {
+        throw new IllegalArgumentException(
+            String.format("%s is null for input of tag: %s", Schema.class, 
INPUT_TAG));
+      }
+
+      Schema destinationSchema = 
getDestinationRowSchema(input.getPipeline().getOptions());
+
+      // We already evaluate whether we should acquire a destination schema.
+      // See the validate(PipelineOptions) method for details.
+      if (destinationSchema != null) {
+        validateMatching(sourceSchema, destinationSchema);
+      }
+    }
+
+    void validateMatching(Schema sourceSchema, Schema destinationSchema) {
+      Set<String> fieldNames = new HashSet<>();
+      List<String> mismatchedFieldNames = new ArrayList<>();
+      fieldNames.addAll(sourceSchema.getFieldNames());
+      fieldNames.addAll(destinationSchema.getFieldNames());
+      for (String name : fieldNames) {
+        Field gotField = null;
+        Field wantField = null;
+        if (sourceSchema.hasField(name)) {
+          gotField = sourceSchema.getField(name);
+        }
+        if (destinationSchema.hasField(name)) {
+          wantField = destinationSchema.getField(name);
+        }
+        if (!matches(wantField, gotField)) {

Review Comment:
   I think we can simplify the if conditions in the method and inline it.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import 
org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformConfiguration.JobType;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery read 
jobs configured using
+ * {@link BigQuerySchemaTransformConfiguration.Read}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformReadProvider
+    extends 
TypedSchemaTransformProvider<BigQuerySchemaTransformConfiguration.Read> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  private static final String OUTPUT_TAG = "OUTPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformConfiguration.Read> 
configurationClass() {
+    return BigQuerySchemaTransformConfiguration.Read.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformConfiguration.Read 
configuration) {
+    return new BigQueryReadSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier 
method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:%s", API, VERSION);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} 
inputCollectionNames method. Since
+   * no input is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} 
outputCollectionNames method. Since
+   * a single output is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for BigQuery read jobs 
configured using {@link
+   * BigQuerySchemaTransformConfiguration.Read}.
+   */
+  static class BigQueryReadSchemaTransform implements SchemaTransform {
+    private final BigQuerySchemaTransformConfiguration.Read configuration;
+
+    BigQueryReadSchemaTransform(BigQuerySchemaTransformConfiguration.Read 
configuration) {
+      this.configuration = configuration;
+    }
+
+    /** Implements {@link SchemaTransform} buildTransform method. */
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> 
buildTransform() {
+      return new PCollectionRowTupleTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for BigQuery read jobs configured 
using {@link
+   * BigQuerySchemaTransformConfiguration.Read}.
+   */
+  static class PCollectionRowTupleTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+
+    private final BigQuerySchemaTransformConfiguration.Read configuration;
+
+    /** An instance of {@link BigQueryServices} used for testing. */
+    private BigQueryServices testBigQueryServices = null;
+
+    PCollectionRowTupleTransform(BigQuerySchemaTransformConfiguration.Read 
configuration) {
+      this.configuration = configuration;
+    }
+
+    void setTestBigQueryServices(BigQueryServices testBigQueryServices) {
+      this.testBigQueryServices = testBigQueryServices;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      if (!input.getAll().isEmpty()) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s %s input is expected to be empty",
+                input.getClass().getSimpleName(), getClass().getSimpleName()));
+      }
+
+      BigQueryIO.TypedRead<TableRow> read = toTypedRead();
+      if (testBigQueryServices != null) {
+        read = read.withTestServices(testBigQueryServices).withoutValidation();
+      }
+
+      PCollection<TableRow> tableRowPCollection = 
input.getPipeline().apply(read);
+      Schema schema = tableRowPCollection.getSchema();
+      PCollection<Row> rowPCollection =
+          tableRowPCollection.apply(
+              MapElements.into(TypeDescriptor.of(Row.class))
+                  .via((tableRow) -> BigQueryUtils.toBeamRow(schema, 
tableRow)));
+      return PCollectionRowTuple.of(OUTPUT_TAG, 
rowPCollection.setRowSchema(schema));
+    }
+
+    BigQueryIO.TypedRead<TableRow> toTypedRead() {
+      JobType jobType = configuration.getJobType();
+      switch (jobType) {
+        case QUERY:
+          return toQueryTypedRead();
+
+        case EXTRACT:
+          return toExtractTypedRead();
+
+        default:
+          throw new InvalidConfigurationException(
+              String.format("invalid job type for BigQueryIO read, got: %s", 
jobType));
+      }
+    }
+
+    private BigQueryIO.TypedRead<TableRow> toExtractTypedRead() {
+      return 
BigQueryIO.readTableRowsWithSchema().from(configuration.getTableSpec());
+    }
+
+    private BigQueryIO.TypedRead<TableRow> toQueryTypedRead() {
+      String query = Objects.requireNonNull(configuration.getQuery());
+
+      BigQueryIO.TypedRead<TableRow> read = 
BigQueryIO.readTableRowsWithSchema().fromQuery(query);
+
+      if (configuration.getQueryLocation() != null) {

Review Comment:
   ```suggestion
         if (configuration.getQueryLocation() != null && 
!configuration.getQueryLocation().isEmpty()) {
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import 
org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformConfiguration.JobType;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery read 
jobs configured using
+ * {@link BigQuerySchemaTransformConfiguration.Read}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformReadProvider
+    extends 
TypedSchemaTransformProvider<BigQuerySchemaTransformConfiguration.Read> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  private static final String OUTPUT_TAG = "OUTPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformConfiguration.Read> 
configurationClass() {
+    return BigQuerySchemaTransformConfiguration.Read.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformConfiguration.Read 
configuration) {
+    return new BigQueryReadSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier 
method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:%s", API, VERSION);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} 
inputCollectionNames method. Since
+   * no input is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} 
outputCollectionNames method. Since
+   * a single output is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for BigQuery read jobs 
configured using {@link
+   * BigQuerySchemaTransformConfiguration.Read}.
+   */
+  static class BigQueryReadSchemaTransform implements SchemaTransform {
+    private final BigQuerySchemaTransformConfiguration.Read configuration;
+
+    BigQueryReadSchemaTransform(BigQuerySchemaTransformConfiguration.Read 
configuration) {
+      this.configuration = configuration;
+    }
+
+    /** Implements {@link SchemaTransform} buildTransform method. */
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> 
buildTransform() {
+      return new PCollectionRowTupleTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for BigQuery read jobs configured 
using {@link
+   * BigQuerySchemaTransformConfiguration.Read}.
+   */
+  static class PCollectionRowTupleTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+
+    private final BigQuerySchemaTransformConfiguration.Read configuration;
+
+    /** An instance of {@link BigQueryServices} used for testing. */
+    private BigQueryServices testBigQueryServices = null;
+
+    PCollectionRowTupleTransform(BigQuerySchemaTransformConfiguration.Read 
configuration) {
+      this.configuration = configuration;
+    }
+
+    void setTestBigQueryServices(BigQueryServices testBigQueryServices) {
+      this.testBigQueryServices = testBigQueryServices;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      if (!input.getAll().isEmpty()) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s %s input is expected to be empty",
+                input.getClass().getSimpleName(), getClass().getSimpleName()));
+      }
+
+      BigQueryIO.TypedRead<TableRow> read = toTypedRead();
+      if (testBigQueryServices != null) {
+        read = read.withTestServices(testBigQueryServices).withoutValidation();
+      }
+
+      PCollection<TableRow> tableRowPCollection = 
input.getPipeline().apply(read);
+      Schema schema = tableRowPCollection.getSchema();
+      PCollection<Row> rowPCollection =
+          tableRowPCollection.apply(
+              MapElements.into(TypeDescriptor.of(Row.class))
+                  .via((tableRow) -> BigQueryUtils.toBeamRow(schema, 
tableRow)));
+      return PCollectionRowTuple.of(OUTPUT_TAG, 
rowPCollection.setRowSchema(schema));
+    }
+
+    BigQueryIO.TypedRead<TableRow> toTypedRead() {
+      JobType jobType = configuration.getJobType();
+      switch (jobType) {
+        case QUERY:
+          return toQueryTypedRead();
+
+        case EXTRACT:
+          return toExtractTypedRead();
+
+        default:
+          throw new InvalidConfigurationException(
+              String.format("invalid job type for BigQueryIO read, got: %s", 
jobType));
+      }
+    }
+
+    private BigQueryIO.TypedRead<TableRow> toExtractTypedRead() {
+      return 
BigQueryIO.readTableRowsWithSchema().from(configuration.getTableSpec());
+    }
+
+    private BigQueryIO.TypedRead<TableRow> toQueryTypedRead() {
+      String query = Objects.requireNonNull(configuration.getQuery());
+
+      BigQueryIO.TypedRead<TableRow> read = 
BigQueryIO.readTableRowsWithSchema().fromQuery(query);
+
+      if (configuration.getQueryLocation() != null) {
+        read = read.withQueryLocation(configuration.getQueryLocation());
+      }
+
+      if (configuration.getUseStandardSql() != null) {
+        if (configuration.getUseStandardSql()) {
+          read = read.usingStandardSql();
+        }
+      }

Review Comment:
   ```suggestion
         if (configuration.getUseStandardSql() != null && 
configuration.getUseStandardSql()) {
           read = read.usingStandardSql();
         }
   ```





Issue Time Tracking
-------------------

    Worklog Id:     (was: 755633)
    Time Spent: 5h  (was: 4h 50m)

> Convert BigQuery SchemaIO to SchemaTransform
> --------------------------------------------
>
>                 Key: BEAM-14035
>                 URL: https://issues.apache.org/jira/browse/BEAM-14035
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-java-core
>            Reporter: Lara Schmidt
>            Assignee: Damon Douglas
>            Priority: P2
>          Time Spent: 5h
>  Remaining Estimate: 0h
>
> The output of this task is to refactor 
> org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaIOProvider [1] to implement 
> SchemaTransform [2] and SchemaTransformProvider interfaces [3].
> Please see https://issues.apache.org/jira/browse/BEAM-14168 for more 
> information on additional work remaining after this task is complete.
> As a result of this task there will be:
> 1. one class deletion and four classes created for the PR reflecting whether 
> we are reading from or writing to BigQuery, via BigQueryIO.Read and 
> BigQueryIO.Write, respectively.
> * delete: 
> sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaIOProvider.java
> * create: 
> sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadConfiguration.java
> * create: 
> sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java
> * create: 
> sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteConfiguration.java
> * create: 
> sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteProvider.java
> 2. The testing strategy will leverage the use of the 
> org.apache.beam.sdk.transforms.display.DisplayData class instead of using the 
> org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices.  
> We will test whether the input of BigQuerySchemaTransformReadConfiguration 
> yields a BigQueryIO.Read with correct query, tableSpec, useLegacySQL 
> (defaulted to false), and queryLocation values.  
> We will test whether the input of BigQuerySchemaTransformWriteConfiguration 
> yields a BigQueryIO.Write with correct tableReference, createDisposition, and 
> writeDisposition values.
> References:
> 1 - 
> [sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaIOProvider.java|https://github.com/apache/beam/blob/fc00b9697a0dfb73a03981f4d6c2c8dd1e316d5e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaIOProvider.java]
> 2 - 
> [SchemaTransform|https://github.com/apache/beam/blob/a47a725863fc8c37a9b8520cebeef83677fe531d/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransform.java]
> 3 - 
> [SchemaTransformProvider|https://github.com/apache/beam/blob/a47a725863fc8c37a9b8520cebeef83677fe531d/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to