laraschmidt commented on a change in pull request #17181:
URL: https://github.com/apache/beam/pull/17181#discussion_r840795608



##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadConfiguration.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+
+/**
+ * Configuration for reading from BigQuery.
+ *
+ * <p>This class is meant to be used with {@link 
BigQuerySchemaTransformReadProvider}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class BigQuerySchemaTransformReadConfiguration {

Review comment:
       What are your thoughts on combining this into one class 
BigQuerySchemaTransformConfiguration {
   
   public class Read {}
   
   public class Write {}
   }
   Maybe Ankur has more thoughts but this might cut down on the amount of new 
files we require at least.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadConfiguration.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+
+/**
+ * Configuration for reading from BigQuery.
+ *
+ * <p>This class is meant to be used with {@link 
BigQuerySchemaTransformReadProvider}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class BigQuerySchemaTransformReadConfiguration {

Review comment:
       What are your thoughts on combining this into one class 
BigQuerySchemaTransformConfiguration {
   
   public class Read {}
   
   public class Write {}
   }
   Maybe Ankur has more thoughts but this might cut down on the amount of new 
files we require at least.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteProvider.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery 
write jobs configured
+ * using {@link BigQuerySchemaTransformWriteConfiguration}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
+ * repository.
+ */
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformWriteProvider
+    extends 
TypedSchemaTransformProvider<BigQuerySchemaTransformWriteConfiguration> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  private static final String INPUT_TAG = "INPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformWriteConfiguration> 
configurationClass() {
+    return BigQuerySchemaTransformWriteConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformWriteConfiguration 
configuration) {
+    return new BigQueryWriteSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier 
method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:%s", API, VERSION);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} 
inputCollectionNames method. Since a
+   * single is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT_TAG);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} 
outputCollectionNames method. Since
+   * no output is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for BigQuery write jobs 
configured using {@link
+   * BigQuerySchemaTransformWriteConfiguration}.
+   */
+  static class BigQueryWriteSchemaTransform implements SchemaTransform {

Review comment:
       BigQueryWriteSchemaTransform. It's a schema transform that does bigquery 
write.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import 
org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformReadConfiguration.JobType;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery read 
jobs configured using
+ * {@link BigQuerySchemaTransformReadConfiguration}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformReadProvider
+    extends 
TypedSchemaTransformProvider<BigQuerySchemaTransformReadConfiguration> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  private static final String OUTPUT_TAG = "OUTPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformReadConfiguration> 
configurationClass() {
+    return BigQuerySchemaTransformReadConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformReadConfiguration 
configuration) {
+    return new BigQueryReadSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier 
method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:%s", API, VERSION);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} 
inputCollectionNames method. Since
+   * no input is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} 
outputCollectionNames method. Since
+   * a single output is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for BigQuery read jobs 
configured using {@link
+   * BigQuerySchemaTransformReadConfiguration}.
+   */
+  static class BigQueryReadSchemaTransform implements SchemaTransform {
+    private final BigQuerySchemaTransformReadConfiguration configuration;
+
+    BigQueryReadSchemaTransform(BigQuerySchemaTransformReadConfiguration 
configuration) {
+      this.configuration = configuration;
+    }
+
+    /** Implements {@link SchemaTransform} buildTransform method. */
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> 
buildTransform() {
+      return new PCollectionRowTupleTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for BigQuery read jobs configured 
using {@link
+   * BigQuerySchemaTransformReadConfiguration}.
+   */
+  static class PCollectionRowTupleTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+
+    private final BigQuerySchemaTransformReadConfiguration configuration;
+    private BigQueryServices testBigQueryServices = null;
+
+    PCollectionRowTupleTransform(BigQuerySchemaTransformReadConfiguration 
configuration) {
+      this.configuration = configuration;
+    }
+
+    void setTestBigQueryServices(BigQueryServices testBigQueryServices) {
+      this.testBigQueryServices = testBigQueryServices;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      if (!input.getAll().isEmpty()) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s %s input is expected to be empty",
+                input.getClass().getSimpleName(), getClass().getSimpleName()));
+      }
+
+      BigQueryIO.TypedRead<TableRow> read = toTypedRead();
+      if (testBigQueryServices != null) {
+        read = read.withTestServices(testBigQueryServices).withoutValidation();

Review comment:
       This still doesn't really test toTypedRead but if it's the best we can 
do then it's probably okay.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import 
org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformReadConfiguration.JobType;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery read 
jobs configured using
+ * {@link BigQuerySchemaTransformReadConfiguration}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformReadProvider
+    extends 
TypedSchemaTransformProvider<BigQuerySchemaTransformReadConfiguration> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  private static final String OUTPUT_TAG = "OUTPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformReadConfiguration> 
configurationClass() {
+    return BigQuerySchemaTransformReadConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformReadConfiguration 
configuration) {
+    return new BigQueryReadSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier 
method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:%s", API, VERSION);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} 
inputCollectionNames method. Since
+   * no input is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} 
outputCollectionNames method. Since
+   * a single output is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for BigQuery read jobs 
configured using {@link
+   * BigQuerySchemaTransformReadConfiguration}.
+   */
+  static class BigQueryReadSchemaTransform implements SchemaTransform {
+    private final BigQuerySchemaTransformReadConfiguration configuration;
+
+    BigQueryReadSchemaTransform(BigQuerySchemaTransformReadConfiguration 
configuration) {
+      this.configuration = configuration;
+    }
+
+    /** Implements {@link SchemaTransform} buildTransform method. */
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> 
buildTransform() {
+      return new PCollectionRowTupleTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for BigQuery read jobs configured 
using {@link
+   * BigQuerySchemaTransformReadConfiguration}.
+   */
+  static class PCollectionRowTupleTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+
+    private final BigQuerySchemaTransformReadConfiguration configuration;
+    private BigQueryServices testBigQueryServices = null;

Review comment:
       comments on this field

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import 
org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformReadConfiguration.JobType;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery read 
jobs configured using
+ * {@link BigQuerySchemaTransformReadConfiguration}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformReadProvider
+    extends 
TypedSchemaTransformProvider<BigQuerySchemaTransformReadConfiguration> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  private static final String OUTPUT_TAG = "OUTPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformReadConfiguration> 
configurationClass() {
+    return BigQuerySchemaTransformReadConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformReadConfiguration 
configuration) {
+    return new BigQueryReadSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier 
method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:%s", API, VERSION);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} 
inputCollectionNames method. Since
+   * no input is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} 
outputCollectionNames method. Since
+   * a single output is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for BigQuery read jobs 
configured using {@link
+   * BigQuerySchemaTransformReadConfiguration}.
+   */
+  static class BigQueryReadSchemaTransform implements SchemaTransform {
+    private final BigQuerySchemaTransformReadConfiguration configuration;
+
+    BigQueryReadSchemaTransform(BigQuerySchemaTransformReadConfiguration 
configuration) {
+      this.configuration = configuration;
+    }
+
+    /** Implements {@link SchemaTransform} buildTransform method. */
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> 
buildTransform() {
+      return new PCollectionRowTupleTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for BigQuery read jobs configured 
using {@link
+   * BigQuerySchemaTransformReadConfiguration}.
+   */
+  static class PCollectionRowTupleTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+
+    private final BigQuerySchemaTransformReadConfiguration configuration;
+    private BigQueryServices testBigQueryServices = null;

Review comment:
       comments on this field

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteProvider.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery 
write jobs configured
+ * using {@link BigQuerySchemaTransformWriteConfiguration}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
+ * repository.
+ */
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformWriteProvider
+    extends 
TypedSchemaTransformProvider<BigQuerySchemaTransformWriteConfiguration> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  private static final String INPUT_TAG = "INPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformWriteConfiguration> 
configurationClass() {
+    return BigQuerySchemaTransformWriteConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformWriteConfiguration 
configuration) {
+    return new BigQueryWriteSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier 
method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:%s", API, VERSION);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} 
inputCollectionNames method. Since a
+   * single is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT_TAG);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} 
outputCollectionNames method. Since
+   * no output is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for BigQuery write jobs 
configured using {@link
+   * BigQuerySchemaTransformWriteConfiguration}.
+   */
+  static class BigQueryWriteSchemaTransform implements SchemaTransform {
+    private final BigQuerySchemaTransformWriteConfiguration configuration;
+
+    BigQueryWriteSchemaTransform(BigQuerySchemaTransformWriteConfiguration 
configuration) {
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> 
buildTransform() {
+      return new BigQuerySchemaTransformWriteTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for BigQuery write jobs 
configured using {@link
+   * BigQuerySchemaTransformWriteConfiguration}.
+   */
+  static class BigQuerySchemaTransformWriteTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+    private final BigQuerySchemaTransformWriteConfiguration configuration;
+
+    
BigQuerySchemaTransformWriteTransform(BigQuerySchemaTransformWriteConfiguration 
configuration) {
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      if (!input.has(INPUT_TAG)) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s %s is missing expected tag: %s",
+                getClass().getSimpleName(), input.getClass().getSimpleName(), 
INPUT_TAG));
+      }
+      PCollection<Row> rowPCollection = input.get(INPUT_TAG);
+      Schema schema = rowPCollection.getSchema();
+      PCollection<TableRow> tableRowPCollection =
+          rowPCollection.apply(
+              
MapElements.into(TypeDescriptor.of(TableRow.class)).via(BigQueryUtils::toTableRow));
+      tableRowPCollection.apply(toWrite(schema));

Review comment:
       So what happens if the input schema doesn't match the table (if it 
already exists). I assume we'd fail somewhere? If so, that's probably fine for 
now. But ideally we would do a pre-emptive check here that the schema of the 
table matches. Let's at least add a TODO for this.

##########
File path: 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProviderTest.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertTrue;
+
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import 
org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformReadProvider.PCollectionRowTupleTransform;
+import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
+import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
+import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test for {@link BigQuerySchemaTransformReadProvider}. */
+@RunWith(JUnit4.class)
+public class BigQuerySchemaTransformReadProviderTest {
+  private static final AutoValueSchema AUTO_VALUE_SCHEMA = new 
AutoValueSchema();
+  private static final 
TypeDescriptor<BigQuerySchemaTransformReadConfiguration> TYPE_DESCRIPTOR =
+      TypeDescriptor.of(BigQuerySchemaTransformReadConfiguration.class);
+  private static final 
SerializableFunction<BigQuerySchemaTransformReadConfiguration, Row>
+      ROW_SERIALIZABLE_FUNCTION = 
AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
+
+  private static final String FAKE_PROJECT = "fakeproject";
+  private static final String FAKE_DATASET = "fakedataset";
+  private static final String FAKE_TABLE_ID = "faketable";
+  // private static final String FAKE_QUERY = String.format("select * from 
`%s.%s.%s`", FAKE_PROJECT,
+  //     FAKE_DATASET, FAKE_TABLE_ID);
+
+  private static final TableReference FAKE_TABLE_REFERENCE = new 
TableReference()
+      .setProjectId(FAKE_PROJECT)
+      .setDatasetId(FAKE_DATASET)
+      .setTableId(FAKE_TABLE_ID);
+
+  private static final String FAKE_TABLE_SPEC = 
BigQueryHelpers.toTableSpec(FAKE_TABLE_REFERENCE);
+
+  private static final Schema FAKE_SCHEMA = Schema.of(
+      Field.of("name", FieldType.STRING),
+      Field.of("number", FieldType.INT64)
+  );
+
+  private static final List<TableRow> FAKE_RECORDS = Arrays.asList(
+      new TableRow().set("name", "a").set("number", 1L),
+      new TableRow().set("name", "b").set("number", 2L),
+      new TableRow().set("name", "c").set("number", 3L)
+  );
+
+  private static final List<Row> FAKE_ROWS = Arrays.asList(
+      Row.withSchema(FAKE_SCHEMA)
+          .withFieldValue("name", "a")
+          .withFieldValue("number", 1L)
+          .build(),
+      Row.withSchema(FAKE_SCHEMA)
+          .withFieldValue("name", "b")
+          .withFieldValue("number", 2L)
+          .build(),
+      Row.withSchema(FAKE_SCHEMA)
+          .withFieldValue("name", "c")
+          .withFieldValue("number", 3L)
+          .build()
+  );
+
+  private static final TableSchema FAKE_TABLE_SCHEMA = 
BigQueryUtils.toTableSchema(FAKE_SCHEMA);
+  private static final BigQueryOptions OPTIONS =
+      TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
+  private final FakeDatasetService fakeDatasetService = new 
FakeDatasetService();
+  private final FakeJobService fakeJobService = new FakeJobService();
+  private final Table fakeTable = new Table();
+  private final TemporaryFolder temporaryFolder = new TemporaryFolder();
+  private final FakeBigQueryServices fakeBigQueryServices =
+      new FakeBigQueryServices()
+          .withJobService(fakeJobService)
+          .withDatasetService(fakeDatasetService);
+
+  @Before
+  public void setUp() throws IOException, InterruptedException {
+    FakeDatasetService.setUp();
+    FakeJobService.setUp();
+    BigQueryIO.clearCreatedTables();
+    fakeTable.setSchema(FAKE_TABLE_SCHEMA);
+    fakeTable.setTableReference(FAKE_TABLE_REFERENCE);
+    fakeTable.setNumBytes(1024L * 1024L);
+    fakeDatasetService.createDataset(FAKE_PROJECT, FAKE_DATASET, "", "", null);
+    fakeDatasetService.createTable(fakeTable);
+    fakeDatasetService.insertAll(fakeTable.getTableReference(), FAKE_RECORDS, 
null);
+    temporaryFolder.create();
+    OPTIONS.setProject(FAKE_PROJECT);
+    OPTIONS.setTempLocation(temporaryFolder.getRoot().getAbsolutePath());
+  }
+
+  @After
+  public void tearDown() {
+    temporaryFolder.delete();
+  }
+
+  @Rule
+  public transient TestPipeline p = TestPipeline.fromOptions(OPTIONS);
+
+  @Test
+  public void testFromExtractConfiguration() {

Review comment:
       testExtract? also prob use read here instead of extract.

##########
File path: 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProviderTest.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertTrue;
+
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import 
org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformReadProvider.PCollectionRowTupleTransform;
+import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
+import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
+import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test for {@link BigQuerySchemaTransformReadProvider}. */
+@RunWith(JUnit4.class)
+public class BigQuerySchemaTransformReadProviderTest {
+  private static final AutoValueSchema AUTO_VALUE_SCHEMA = new 
AutoValueSchema();
+  private static final 
TypeDescriptor<BigQuerySchemaTransformReadConfiguration> TYPE_DESCRIPTOR =
+      TypeDescriptor.of(BigQuerySchemaTransformReadConfiguration.class);
+  private static final 
SerializableFunction<BigQuerySchemaTransformReadConfiguration, Row>
+      ROW_SERIALIZABLE_FUNCTION = 
AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
+
+  private static final String FAKE_PROJECT = "fakeproject";
+  private static final String FAKE_DATASET = "fakedataset";
+  private static final String FAKE_TABLE_ID = "faketable";
+  // private static final String FAKE_QUERY = String.format("select * from 
`%s.%s.%s`", FAKE_PROJECT,
+  //     FAKE_DATASET, FAKE_TABLE_ID);
+
+  private static final TableReference FAKE_TABLE_REFERENCE = new 
TableReference()
+      .setProjectId(FAKE_PROJECT)
+      .setDatasetId(FAKE_DATASET)
+      .setTableId(FAKE_TABLE_ID);
+
+  private static final String FAKE_TABLE_SPEC = 
BigQueryHelpers.toTableSpec(FAKE_TABLE_REFERENCE);
+
+  private static final Schema FAKE_SCHEMA = Schema.of(
+      Field.of("name", FieldType.STRING),
+      Field.of("number", FieldType.INT64)
+  );
+
+  private static final List<TableRow> FAKE_RECORDS = Arrays.asList(
+      new TableRow().set("name", "a").set("number", 1L),
+      new TableRow().set("name", "b").set("number", 2L),
+      new TableRow().set("name", "c").set("number", 3L)
+  );
+
+  private static final List<Row> FAKE_ROWS = Arrays.asList(
+      Row.withSchema(FAKE_SCHEMA)
+          .withFieldValue("name", "a")
+          .withFieldValue("number", 1L)
+          .build(),
+      Row.withSchema(FAKE_SCHEMA)
+          .withFieldValue("name", "b")
+          .withFieldValue("number", 2L)
+          .build(),
+      Row.withSchema(FAKE_SCHEMA)
+          .withFieldValue("name", "c")
+          .withFieldValue("number", 3L)
+          .build()
+  );
+
+  private static final TableSchema FAKE_TABLE_SCHEMA = 
BigQueryUtils.toTableSchema(FAKE_SCHEMA);
+  private static final BigQueryOptions OPTIONS =
+      TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
+  private final FakeDatasetService fakeDatasetService = new 
FakeDatasetService();
+  private final FakeJobService fakeJobService = new FakeJobService();
+  private final Table fakeTable = new Table();
+  private final TemporaryFolder temporaryFolder = new TemporaryFolder();
+  private final FakeBigQueryServices fakeBigQueryServices =
+      new FakeBigQueryServices()
+          .withJobService(fakeJobService)
+          .withDatasetService(fakeDatasetService);
+
+  @Before
+  public void setUp() throws IOException, InterruptedException {
+    FakeDatasetService.setUp();
+    FakeJobService.setUp();
+    BigQueryIO.clearCreatedTables();
+    fakeTable.setSchema(FAKE_TABLE_SCHEMA);
+    fakeTable.setTableReference(FAKE_TABLE_REFERENCE);
+    fakeTable.setNumBytes(1024L * 1024L);
+    fakeDatasetService.createDataset(FAKE_PROJECT, FAKE_DATASET, "", "", null);
+    fakeDatasetService.createTable(fakeTable);
+    fakeDatasetService.insertAll(fakeTable.getTableReference(), FAKE_RECORDS, 
null);
+    temporaryFolder.create();
+    OPTIONS.setProject(FAKE_PROJECT);
+    OPTIONS.setTempLocation(temporaryFolder.getRoot().getAbsolutePath());
+  }
+
+  @After
+  public void tearDown() {
+    temporaryFolder.delete();
+  }
+
+  @Rule
+  public transient TestPipeline p = TestPipeline.fromOptions(OPTIONS);
+
+  @Test
+  public void testFromExtractConfiguration() {

Review comment:
       testExtract?

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import 
org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformReadConfiguration.JobType;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery read 
jobs configured using
+ * {@link BigQuerySchemaTransformReadConfiguration}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformReadProvider
+    extends 
TypedSchemaTransformProvider<BigQuerySchemaTransformReadConfiguration> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  private static final String OUTPUT_TAG = "OUTPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformReadConfiguration> 
configurationClass() {
+    return BigQuerySchemaTransformReadConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformReadConfiguration 
configuration) {
+    return new BigQueryReadSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier 
method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:%s", API, VERSION);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} 
inputCollectionNames method. Since
+   * no input is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} 
outputCollectionNames method. Since
+   * a single output is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for BigQuery read jobs 
configured using {@link
+   * BigQuerySchemaTransformReadConfiguration}.
+   */
+  static class BigQueryReadSchemaTransform implements SchemaTransform {
+    private final BigQuerySchemaTransformReadConfiguration configuration;
+
+    BigQueryReadSchemaTransform(BigQuerySchemaTransformReadConfiguration 
configuration) {
+      this.configuration = configuration;
+    }
+
+    /** Implements {@link SchemaTransform} buildTransform method. */
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> 
buildTransform() {
+      return new PCollectionRowTupleTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for BigQuery read jobs configured 
using {@link
+   * BigQuerySchemaTransformReadConfiguration}.
+   */
+  static class PCollectionRowTupleTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+
+    private final BigQuerySchemaTransformReadConfiguration configuration;
+    private BigQueryServices testBigQueryServices = null;
+
+    PCollectionRowTupleTransform(BigQuerySchemaTransformReadConfiguration 
configuration) {
+      this.configuration = configuration;
+    }
+
+    void setTestBigQueryServices(BigQueryServices testBigQueryServices) {
+      this.testBigQueryServices = testBigQueryServices;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      if (!input.getAll().isEmpty()) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s %s input is expected to be empty",
+                input.getClass().getSimpleName(), getClass().getSimpleName()));
+      }
+
+      BigQueryIO.TypedRead<TableRow> read = toTypedRead();
+      if (testBigQueryServices != null) {
+        read = read.withTestServices(testBigQueryServices).withoutValidation();

Review comment:
       We should at least manually test it works in a pipeline though. 

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteProvider.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery 
write jobs configured
+ * using {@link BigQuerySchemaTransformWriteConfiguration}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
+ * repository.
+ */
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformWriteProvider
+    extends 
TypedSchemaTransformProvider<BigQuerySchemaTransformWriteConfiguration> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  private static final String INPUT_TAG = "INPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformWriteConfiguration> 
configurationClass() {
+    return BigQuerySchemaTransformWriteConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformWriteConfiguration 
configuration) {
+    return new BigQueryWriteSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier 
method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:%s", API, VERSION);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} 
inputCollectionNames method. Since a
+   * single is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT_TAG);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} 
outputCollectionNames method. Since
+   * no output is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for BigQuery write jobs 
configured using {@link
+   * BigQuerySchemaTransformWriteConfiguration}.
+   */
+  static class BigQueryWriteSchemaTransform implements SchemaTransform {

Review comment:
       BigQueryWriteSchemaTransform. It's a schema transform that does bigquery 
write.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import 
org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformReadConfiguration.JobType;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery read 
jobs configured using
+ * {@link BigQuerySchemaTransformReadConfiguration}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformReadProvider
+    extends 
TypedSchemaTransformProvider<BigQuerySchemaTransformReadConfiguration> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  private static final String OUTPUT_TAG = "OUTPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformReadConfiguration> 
configurationClass() {
+    return BigQuerySchemaTransformReadConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformReadConfiguration 
configuration) {
+    return new BigQueryReadSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier 
method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:%s", API, VERSION);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} 
inputCollectionNames method. Since
+   * no input is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} 
outputCollectionNames method. Since
+   * a single output is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for BigQuery read jobs 
configured using {@link
+   * BigQuerySchemaTransformReadConfiguration}.
+   */
+  static class BigQueryReadSchemaTransform implements SchemaTransform {
+    private final BigQuerySchemaTransformReadConfiguration configuration;
+
+    BigQueryReadSchemaTransform(BigQuerySchemaTransformReadConfiguration 
configuration) {
+      this.configuration = configuration;
+    }
+
+    /** Implements {@link SchemaTransform} buildTransform method. */
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> 
buildTransform() {
+      return new PCollectionRowTupleTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for BigQuery read jobs configured 
using {@link
+   * BigQuerySchemaTransformReadConfiguration}.
+   */
+  static class PCollectionRowTupleTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+
+    private final BigQuerySchemaTransformReadConfiguration configuration;
+    private BigQueryServices testBigQueryServices = null;
+
+    PCollectionRowTupleTransform(BigQuerySchemaTransformReadConfiguration 
configuration) {
+      this.configuration = configuration;
+    }
+
+    void setTestBigQueryServices(BigQueryServices testBigQueryServices) {
+      this.testBigQueryServices = testBigQueryServices;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      if (!input.getAll().isEmpty()) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s %s input is expected to be empty",
+                input.getClass().getSimpleName(), getClass().getSimpleName()));
+      }
+
+      BigQueryIO.TypedRead<TableRow> read = toTypedRead();
+      if (testBigQueryServices != null) {
+        read = read.withTestServices(testBigQueryServices).withoutValidation();

Review comment:
       This still doesn't really test toTypedRead but if it's the best we can 
do then it's probably okay.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import 
org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformReadConfiguration.JobType;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery read 
jobs configured using
+ * {@link BigQuerySchemaTransformReadConfiguration}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformReadProvider
+    extends 
TypedSchemaTransformProvider<BigQuerySchemaTransformReadConfiguration> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  private static final String OUTPUT_TAG = "OUTPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformReadConfiguration> 
configurationClass() {
+    return BigQuerySchemaTransformReadConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformReadConfiguration 
configuration) {
+    return new BigQueryReadSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier 
method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:%s", API, VERSION);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} 
inputCollectionNames method. Since
+   * no input is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} 
outputCollectionNames method. Since
+   * a single output is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for BigQuery read jobs 
configured using {@link
+   * BigQuerySchemaTransformReadConfiguration}.
+   */
+  static class BigQueryReadSchemaTransform implements SchemaTransform {
+    private final BigQuerySchemaTransformReadConfiguration configuration;
+
+    BigQueryReadSchemaTransform(BigQuerySchemaTransformReadConfiguration 
configuration) {
+      this.configuration = configuration;
+    }
+
+    /** Implements {@link SchemaTransform} buildTransform method. */
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> 
buildTransform() {
+      return new PCollectionRowTupleTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for BigQuery read jobs configured 
using {@link
+   * BigQuerySchemaTransformReadConfiguration}.
+   */
+  static class PCollectionRowTupleTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+
+    private final BigQuerySchemaTransformReadConfiguration configuration;
+    private BigQueryServices testBigQueryServices = null;
+
+    PCollectionRowTupleTransform(BigQuerySchemaTransformReadConfiguration 
configuration) {
+      this.configuration = configuration;
+    }
+
+    void setTestBigQueryServices(BigQueryServices testBigQueryServices) {
+      this.testBigQueryServices = testBigQueryServices;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      if (!input.getAll().isEmpty()) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s %s input is expected to be empty",
+                input.getClass().getSimpleName(), getClass().getSimpleName()));
+      }
+
+      BigQueryIO.TypedRead<TableRow> read = toTypedRead();
+      if (testBigQueryServices != null) {
+        read = read.withTestServices(testBigQueryServices).withoutValidation();

Review comment:
       We should at least manually test it works in a pipeline though. 

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteProvider.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery 
write jobs configured
+ * using {@link BigQuerySchemaTransformWriteConfiguration}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
+ * repository.
+ */
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformWriteProvider
+    extends 
TypedSchemaTransformProvider<BigQuerySchemaTransformWriteConfiguration> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  private static final String INPUT_TAG = "INPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformWriteConfiguration> 
configurationClass() {
+    return BigQuerySchemaTransformWriteConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformWriteConfiguration 
configuration) {
+    return new BigQueryWriteSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier 
method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:%s", API, VERSION);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} 
inputCollectionNames method. Since a
+   * single is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT_TAG);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} 
outputCollectionNames method. Since
+   * no output is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for BigQuery write jobs 
configured using {@link
+   * BigQuerySchemaTransformWriteConfiguration}.
+   */
+  static class BigQueryWriteSchemaTransform implements SchemaTransform {
+    private final BigQuerySchemaTransformWriteConfiguration configuration;
+
+    BigQueryWriteSchemaTransform(BigQuerySchemaTransformWriteConfiguration 
configuration) {
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> 
buildTransform() {
+      return new BigQuerySchemaTransformWriteTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for BigQuery write jobs 
configured using {@link
+   * BigQuerySchemaTransformWriteConfiguration}.
+   */
+  static class BigQuerySchemaTransformWriteTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+    private final BigQuerySchemaTransformWriteConfiguration configuration;
+
+    
BigQuerySchemaTransformWriteTransform(BigQuerySchemaTransformWriteConfiguration 
configuration) {
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      if (!input.has(INPUT_TAG)) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s %s is missing expected tag: %s",
+                getClass().getSimpleName(), input.getClass().getSimpleName(), 
INPUT_TAG));
+      }
+      PCollection<Row> rowPCollection = input.get(INPUT_TAG);
+      Schema schema = rowPCollection.getSchema();
+      PCollection<TableRow> tableRowPCollection =
+          rowPCollection.apply(
+              
MapElements.into(TypeDescriptor.of(TableRow.class)).via(BigQueryUtils::toTableRow));
+      tableRowPCollection.apply(toWrite(schema));

Review comment:
       So what happens if the input schema doesn't match the table (if it 
already exists). I assume we'd fail somewhere? If so, that's probably fine for 
now. But ideally we would do a pre-emptive check here that the schema of the 
table matches. Let's at least add a TODO for this. Unless the old one only did 
create? Or did it use an existing table ever?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to