This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 628348b8baf Managed BigQueryIO (#31486)
628348b8baf is described below

commit 628348b8bafb3856b26aed6d2bd20b97f938aad0
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Tue Nov 12 13:50:47 2024 -0500

    Managed BigQueryIO (#31486)
    
    * managed bigqueryio
    
    * spotless
    
    * move managed dependency to test only
    
    * cleanup after merging snake_case PR
    
    * choose write method based on boundedness and pipeline options
    
    * rename bigquery write config class
    
    * spotless
    
    * change read output tag to 'output'
    
    * spotless
    
    * revert logic that depends on DataflowServiceOptions. switching BQ methods 
can instead be done in Dataflow service side
    
    * spotless
    
    * fix typo
    
    * separate BQ write config to a new class
    
    * fix doc
    
    * resolve after syncing to HEAD
    
    * spotless
    
    * fork on batch/streaming
    
    * cleanup
    
    * spotless
    
    * move forking logic to BQ schematransform side
    
    * add file loads translation and tests; add test checks that the correct 
transform is chosen
    
    * set top-level wrapper to be the underlying managed BQ transform urn; 
change tests to verify underlying transform name
    
    * move unit tests to respectvie schematransform test classes
    
    * expose to Python SDK as well
---
 .../beam_PostCommit_Java_DataflowV2.json           |   3 +-
 .../beam_PostCommit_Python_Xlang_Gcp_Direct.json   |   2 +-
 .../model/pipeline/v1/external_transforms.proto    |   4 +
 sdks/java/io/google-cloud-platform/build.gradle    |   1 +
 .../expansion-service/build.gradle                 |   3 +
 ...FileLoadsWriteSchemaTransformConfiguration.java |  72 ------
 ...QueryFileLoadsWriteSchemaTransformProvider.java | 256 --------------------
 .../BigQueryDirectReadSchemaTransformProvider.java |  33 ++-
 .../BigQueryFileLoadsSchemaTransformProvider.java  | 137 +++++++++++
 .../BigQuerySchemaTransformTranslation.java        |  81 +++++++
 ...ueryStorageWriteApiSchemaTransformProvider.java | 226 ++----------------
 .../providers/BigQueryWriteConfiguration.java      | 218 +++++++++++++++++
 .../BigQueryWriteSchemaTransformProvider.java      |  87 +++++++
 ...yFileLoadsWriteSchemaTransformProviderTest.java | 265 ---------------------
 ...gQueryFileLoadsSchemaTransformProviderTest.java | 146 ++++++++++++
 .../gcp/bigquery/providers/BigQueryManagedIT.java  | 153 ++++++++++++
 .../BigQuerySchemaTransformTranslationTest.java    | 205 ++++++++++++++++
 ...StorageWriteApiSchemaTransformProviderTest.java |  83 ++++---
 .../java/org/apache/beam/sdk/managed/Managed.java  |  11 +-
 .../managed/ManagedSchemaTransformProvider.java    |  37 ++-
 .../sdk/managed/ManagedTransformConstants.java     |  18 ++
 .../ManagedSchemaTransformProviderTest.java        |   3 +-
 sdks/python/apache_beam/transforms/managed.py      |   8 +-
 23 files changed, 1187 insertions(+), 865 deletions(-)

diff --git a/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json 
b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json
index a03c067d2c4..1efc8e9e440 100644
--- a/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json
+++ b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json
@@ -1,3 +1,4 @@
 {
-    "comment": "Modify this file in a trivial way to cause this test suite to 
run"
+    "comment": "Modify this file in a trivial way to cause this test suite to 
run",
+    "modification": 1
 }
diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json 
b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
index b2683333323..e3d6056a5de 100644
--- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
+++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
@@ -1,4 +1,4 @@
 {
   "comment": "Modify this file in a trivial way to cause this test suite to 
run",
-  "modification": 2
+  "modification": 1
 }
diff --git 
a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
 
b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
index b03350966d6..f102e82bafa 100644
--- 
a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
+++ 
b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
@@ -70,6 +70,10 @@ message ManagedTransforms {
       "beam:schematransform:org.apache.beam:kafka_read:v1"];
     KAFKA_WRITE = 3 [(org.apache.beam.model.pipeline.v1.beam_urn) =
       "beam:schematransform:org.apache.beam:kafka_write:v1"];
+    BIGQUERY_READ = 4 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+      "beam:schematransform:org.apache.beam:bigquery_storage_read:v1"];
+    BIGQUERY_WRITE = 5 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+      "beam:schematransform:org.apache.beam:bigquery_write:v1"];
   }
 }
 
diff --git a/sdks/java/io/google-cloud-platform/build.gradle 
b/sdks/java/io/google-cloud-platform/build.gradle
index 3e322d976c1..2acce3e94cc 100644
--- a/sdks/java/io/google-cloud-platform/build.gradle
+++ b/sdks/java/io/google-cloud-platform/build.gradle
@@ -159,6 +159,7 @@ dependencies {
   testImplementation project(path: 
":sdks:java:extensions:google-cloud-platform-core", configuration: 
"testRuntimeMigration")
   testImplementation project(path: ":sdks:java:extensions:protobuf", 
configuration: "testRuntimeMigration")
   testImplementation project(path: ":runners:direct-java", configuration: 
"shadow")
+  testImplementation project(":sdks:java:managed")
   testImplementation project(path: ":sdks:java:io:common")
   testImplementation project(path: ":sdks:java:testing:test-utils")
   testImplementation library.java.commons_math3
diff --git a/sdks/java/io/google-cloud-platform/expansion-service/build.gradle 
b/sdks/java/io/google-cloud-platform/expansion-service/build.gradle
index 1288d91964e..f6c6f07d0cd 100644
--- a/sdks/java/io/google-cloud-platform/expansion-service/build.gradle
+++ b/sdks/java/io/google-cloud-platform/expansion-service/build.gradle
@@ -36,6 +36,9 @@ dependencies {
     permitUnusedDeclared project(":sdks:java:io:google-cloud-platform") // 
BEAM-11761
     implementation project(":sdks:java:extensions:schemaio-expansion-service")
     permitUnusedDeclared 
project(":sdks:java:extensions:schemaio-expansion-service") // BEAM-11761
+    implementation project(":sdks:java:managed")
+    permitUnusedDeclared project(":sdks:java:managed") // BEAM-11761
+
     runtimeOnly library.java.slf4j_jdk14
 }
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformConfiguration.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformConfiguration.java
deleted file mode 100644
index f634b5ec6f6..00000000000
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformConfiguration.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.bigquery;
-
-import com.google.auto.value.AutoValue;
-import org.apache.beam.sdk.schemas.AutoValueSchema;
-import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
-
-/**
- * Configuration for writing to BigQuery.
- *
- * <p>This class is meant to be used with {@link 
BigQueryFileLoadsWriteSchemaTransformProvider}.
- *
- * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
- * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
- * repository.
- */
-@DefaultSchema(AutoValueSchema.class)
-@AutoValue
-public abstract class BigQueryFileLoadsWriteSchemaTransformConfiguration {
-
-  /** Instantiates a {@link 
BigQueryFileLoadsWriteSchemaTransformConfiguration.Builder}. */
-  public static Builder builder() {
-    return new 
AutoValue_BigQueryFileLoadsWriteSchemaTransformConfiguration.Builder();
-  }
-
-  /**
-   * Writes to the given table specification. See {@link 
BigQueryIO.Write#to(String)}} for the
-   * expected format.
-   */
-  public abstract String getTableSpec();
-
-  /** Specifies whether the table should be created if it does not exist. */
-  public abstract String getCreateDisposition();
-
-  /** Specifies what to do with existing data in the table, in case the table 
already exists. */
-  public abstract String getWriteDisposition();
-
-  @AutoValue.Builder
-  public abstract static class Builder {
-
-    /**
-     * Writes to the given table specification. See {@link 
BigQueryIO.Write#to(String)}} for the
-     * expected format.
-     */
-    public abstract Builder setTableSpec(String value);
-
-    /** Specifies whether the table should be created if it does not exist. */
-    public abstract Builder setCreateDisposition(String value);
-
-    /** Specifies what to do with existing data in the table, in case the 
table already exists. */
-    public abstract Builder setWriteDisposition(String value);
-
-    /** Builds the {@link BigQueryFileLoadsWriteSchemaTransformConfiguration} 
configuration. */
-    public abstract BigQueryFileLoadsWriteSchemaTransformConfiguration build();
-  }
-}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java
deleted file mode 100644
index 3212e2a3034..00000000000
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.bigquery;
-
-import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.auto.service.AutoService;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
-import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
-import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
-import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionRowTuple;
-import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
-
-/**
- * An implementation of {@link TypedSchemaTransformProvider} for BigQuery 
write jobs configured
- * using {@link BigQueryFileLoadsWriteSchemaTransformConfiguration}.
- *
- * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
- * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
- * repository.
- */
-@SuppressWarnings({
-  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
-@Internal
-@AutoService(SchemaTransformProvider.class)
-public class BigQueryFileLoadsWriteSchemaTransformProvider
-    extends 
TypedSchemaTransformProvider<BigQueryFileLoadsWriteSchemaTransformConfiguration>
 {
-
-  private static final String IDENTIFIER =
-      "beam:schematransform:org.apache.beam:bigquery_fileloads_write:v1";
-  static final String INPUT_TAG = "INPUT";
-
-  /** Returns the expected class of the configuration. */
-  @Override
-  protected Class<BigQueryFileLoadsWriteSchemaTransformConfiguration> 
configurationClass() {
-    return BigQueryFileLoadsWriteSchemaTransformConfiguration.class;
-  }
-
-  /** Returns the expected {@link SchemaTransform} of the configuration. */
-  @Override
-  protected SchemaTransform 
from(BigQueryFileLoadsWriteSchemaTransformConfiguration configuration) {
-    return new BigQueryWriteSchemaTransform(configuration);
-  }
-
-  /** Implementation of the {@link TypedSchemaTransformProvider} identifier 
method. */
-  @Override
-  public String identifier() {
-    return IDENTIFIER;
-  }
-
-  /**
-   * Implementation of the {@link TypedSchemaTransformProvider} 
inputCollectionNames method. Since a
-   * single is expected, this returns a list with a single name.
-   */
-  @Override
-  public List<String> inputCollectionNames() {
-    return Collections.singletonList(INPUT_TAG);
-  }
-
-  /**
-   * Implementation of the {@link TypedSchemaTransformProvider} 
outputCollectionNames method. Since
-   * no output is expected, this returns an empty list.
-   */
-  @Override
-  public List<String> outputCollectionNames() {
-    return Collections.emptyList();
-  }
-
-  /**
-   * A {@link SchemaTransform} that performs {@link BigQueryIO.Write}s based 
on a {@link
-   * BigQueryFileLoadsWriteSchemaTransformConfiguration}.
-   */
-  protected static class BigQueryWriteSchemaTransform extends SchemaTransform {
-    /** An instance of {@link BigQueryServices} used for testing. */
-    private BigQueryServices testBigQueryServices = null;
-
-    private final BigQueryFileLoadsWriteSchemaTransformConfiguration 
configuration;
-
-    
BigQueryWriteSchemaTransform(BigQueryFileLoadsWriteSchemaTransformConfiguration 
configuration) {
-      this.configuration = configuration;
-    }
-
-    @Override
-    public void validate(PipelineOptions options) {
-      if 
(!configuration.getCreateDisposition().equals(CreateDisposition.CREATE_NEVER.name()))
 {
-        return;
-      }
-
-      BigQueryOptions bigQueryOptions = options.as(BigQueryOptions.class);
-
-      BigQueryServices bigQueryServices = new BigQueryServicesImpl();
-      if (testBigQueryServices != null) {
-        bigQueryServices = testBigQueryServices;
-      }
-
-      DatasetService datasetService = 
bigQueryServices.getDatasetService(bigQueryOptions);
-      TableReference tableReference = 
BigQueryUtils.toTableReference(configuration.getTableSpec());
-
-      try {
-        Table table = datasetService.getTable(tableReference);
-        if (table == null) {
-          throw new NullPointerException();
-        }
-
-        if (table.getSchema() == null) {
-          throw new InvalidConfigurationException(
-              String.format("could not fetch schema for table: %s", 
configuration.getTableSpec()));
-        }
-
-      } catch (NullPointerException | InterruptedException | IOException ex) {
-        throw new InvalidConfigurationException(
-            String.format(
-                "could not fetch table %s, error: %s",
-                configuration.getTableSpec(), ex.getMessage()));
-      }
-    }
-
-    @Override
-    public PCollectionRowTuple expand(PCollectionRowTuple input) {
-      validate(input);
-      PCollection<Row> rowPCollection = input.get(INPUT_TAG);
-      Schema schema = rowPCollection.getSchema();
-      BigQueryIO.Write<TableRow> write = toWrite(schema);
-      if (testBigQueryServices != null) {
-        write = write.withTestServices(testBigQueryServices);
-      }
-
-      PCollection<TableRow> tableRowPCollection =
-          rowPCollection.apply(
-              
MapElements.into(TypeDescriptor.of(TableRow.class)).via(BigQueryUtils::toTableRow));
-      tableRowPCollection.apply(write);
-      return PCollectionRowTuple.empty(input.getPipeline());
-    }
-
-    /** Instantiates a {@link BigQueryIO.Write<TableRow>} from a {@link 
Schema}. */
-    BigQueryIO.Write<TableRow> toWrite(Schema schema) {
-      TableSchema tableSchema = BigQueryUtils.toTableSchema(schema);
-      CreateDisposition createDisposition =
-          CreateDisposition.valueOf(configuration.getCreateDisposition());
-      WriteDisposition writeDisposition =
-          WriteDisposition.valueOf(configuration.getWriteDisposition());
-
-      return BigQueryIO.writeTableRows()
-          .to(configuration.getTableSpec())
-          .withCreateDisposition(createDisposition)
-          .withWriteDisposition(writeDisposition)
-          .withSchema(tableSchema);
-    }
-
-    /** Setter for testing using {@link BigQueryServices}. */
-    @VisibleForTesting
-    void setTestBigQueryServices(BigQueryServices testBigQueryServices) {
-      this.testBigQueryServices = testBigQueryServices;
-    }
-
-    /** Validate a {@link PCollectionRowTuple} input. */
-    void validate(PCollectionRowTuple input) {
-      if (!input.has(INPUT_TAG)) {
-        throw new IllegalArgumentException(
-            String.format(
-                "%s %s is missing expected tag: %s",
-                getClass().getSimpleName(), input.getClass().getSimpleName(), 
INPUT_TAG));
-      }
-
-      PCollection<Row> rowInput = input.get(INPUT_TAG);
-      Schema sourceSchema = rowInput.getSchema();
-
-      if (sourceSchema == null) {
-        throw new IllegalArgumentException(
-            String.format("%s is null for input of tag: %s", Schema.class, 
INPUT_TAG));
-      }
-
-      if 
(!configuration.getCreateDisposition().equals(CreateDisposition.CREATE_NEVER.name()))
 {
-        return;
-      }
-
-      BigQueryOptions bigQueryOptions = 
input.getPipeline().getOptions().as(BigQueryOptions.class);
-
-      BigQueryServices bigQueryServices = new BigQueryServicesImpl();
-      if (testBigQueryServices != null) {
-        bigQueryServices = testBigQueryServices;
-      }
-
-      DatasetService datasetService = 
bigQueryServices.getDatasetService(bigQueryOptions);
-      TableReference tableReference = 
BigQueryUtils.toTableReference(configuration.getTableSpec());
-
-      try {
-        Table table = datasetService.getTable(tableReference);
-        if (table == null) {
-          throw new NullPointerException();
-        }
-
-        TableSchema tableSchema = table.getSchema();
-        if (tableSchema == null) {
-          throw new NullPointerException();
-        }
-
-        Schema destinationSchema = BigQueryUtils.fromTableSchema(tableSchema);
-        if (destinationSchema == null) {
-          throw new NullPointerException();
-        }
-
-        validateMatching(sourceSchema, destinationSchema);
-
-      } catch (NullPointerException | InterruptedException | IOException e) {
-        throw new InvalidConfigurationException(
-            String.format(
-                "could not validate input for create disposition: %s and 
table: %s, error: %s",
-                configuration.getCreateDisposition(),
-                configuration.getTableSpec(),
-                e.getMessage()));
-      }
-    }
-
-    void validateMatching(Schema sourceSchema, Schema destinationSchema) {
-      if (!sourceSchema.equals(destinationSchema)) {
-        throw new IllegalArgumentException(
-            String.format(
-                "source and destination schema mismatch for table: %s",
-                configuration.getTableSpec()));
-      }
-    }
-  }
-}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProvider.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProvider.java
index 8b8e8179ce7..15b1b01d7f6 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProvider.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProvider.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io.gcp.bigquery.providers;
 
+import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
 
@@ -26,6 +27,7 @@ import com.google.auto.value.AutoValue;
 import java.util.Collections;
 import java.util.List;
 import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
@@ -33,7 +35,9 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
 import 
org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryDirectReadSchemaTransformProvider.BigQueryDirectReadSchemaTransformConfiguration;
 import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
 import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
 import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
@@ -62,7 +66,7 @@ import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
 public class BigQueryDirectReadSchemaTransformProvider
     extends 
TypedSchemaTransformProvider<BigQueryDirectReadSchemaTransformConfiguration> {
 
-  private static final String OUTPUT_TAG = "OUTPUT_ROWS";
+  public static final String OUTPUT_TAG = "output";
 
   @Override
   protected Class<BigQueryDirectReadSchemaTransformConfiguration> 
configurationClass() {
@@ -76,7 +80,7 @@ public class BigQueryDirectReadSchemaTransformProvider
 
   @Override
   public String identifier() {
-    return "beam:schematransform:org.apache.beam:bigquery_storage_read:v1";
+    return getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_READ);
   }
 
   @Override
@@ -139,6 +143,10 @@ public class BigQueryDirectReadSchemaTransformProvider
     @Nullable
     public abstract List<String> getSelectedFields();
 
+    @SchemaFieldDescription("Use this Cloud KMS key to encrypt your data")
+    @Nullable
+    public abstract String getKmsKey();
+
     @Nullable
     /** Builder for the {@link 
BigQueryDirectReadSchemaTransformConfiguration}. */
     @AutoValue.Builder
@@ -151,6 +159,8 @@ public class BigQueryDirectReadSchemaTransformProvider
 
       public abstract Builder setSelectedFields(List<String> selectedFields);
 
+      public abstract Builder setKmsKey(String kmsKey);
+
       /** Builds a {@link BigQueryDirectReadSchemaTransformConfiguration} 
instance. */
       public abstract BigQueryDirectReadSchemaTransformConfiguration build();
     }
@@ -161,7 +171,7 @@ public class BigQueryDirectReadSchemaTransformProvider
    * BigQueryDirectReadSchemaTransformConfiguration} and instantiated by {@link
    * BigQueryDirectReadSchemaTransformProvider}.
    */
-  protected static class BigQueryDirectReadSchemaTransform extends 
SchemaTransform {
+  public static class BigQueryDirectReadSchemaTransform extends 
SchemaTransform {
     private BigQueryServices testBigQueryServices = null;
     private final BigQueryDirectReadSchemaTransformConfiguration configuration;
 
@@ -172,6 +182,20 @@ public class BigQueryDirectReadSchemaTransformProvider
       this.configuration = configuration;
     }
 
+    public Row getConfigurationRow() {
+      try {
+        // To stay consistent with our SchemaTransform configuration naming 
conventions,
+        // we sort lexicographically
+        return SchemaRegistry.createDefault()
+            
.getToRowFunction(BigQueryDirectReadSchemaTransformConfiguration.class)
+            .apply(configuration)
+            .sorted()
+            .toSnakeCase();
+      } catch (NoSuchSchemaException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
     @VisibleForTesting
     public void setBigQueryServices(BigQueryServices testBigQueryServices) {
       this.testBigQueryServices = testBigQueryServices;
@@ -211,6 +235,9 @@ public class BigQueryDirectReadSchemaTransformProvider
       } else {
         read = read.fromQuery(configuration.getQuery());
       }
+      if (!Strings.isNullOrEmpty(configuration.getKmsKey())) {
+        read = read.withKmsKey(configuration.getKmsKey());
+      }
 
       if (this.testBigQueryServices != null) {
         read = read.withTestServices(testBigQueryServices);
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProvider.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProvider.java
new file mode 100644
index 00000000000..092cf42a29a
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProvider.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery.providers;
+
+import com.google.auto.service.AutoService;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery 
write jobs configured
+ * using {@link 
org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryWriteConfiguration}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@Internal
+@AutoService(SchemaTransformProvider.class)
+public class BigQueryFileLoadsSchemaTransformProvider
+    extends TypedSchemaTransformProvider<BigQueryWriteConfiguration> {
+
+  static final String INPUT_TAG = "input";
+
+  @Override
+  protected SchemaTransform from(BigQueryWriteConfiguration configuration) {
+    return new BigQueryFileLoadsSchemaTransform(configuration);
+  }
+
+  @Override
+  public String identifier() {
+    return "beam:schematransform:org.apache.beam:bigquery_fileloads:v1";
+  }
+
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT_TAG);
+  }
+
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  public static class BigQueryFileLoadsSchemaTransform extends SchemaTransform 
{
+    /** An instance of {@link BigQueryServices} used for testing. */
+    private BigQueryServices testBigQueryServices = null;
+
+    private final BigQueryWriteConfiguration configuration;
+
+    BigQueryFileLoadsSchemaTransform(BigQueryWriteConfiguration configuration) 
{
+      configuration.validate();
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      PCollection<Row> rowPCollection = input.getSinglePCollection();
+      BigQueryIO.Write<Row> write = toWrite(input.getPipeline().getOptions());
+      rowPCollection.apply(write);
+
+      return PCollectionRowTuple.empty(input.getPipeline());
+    }
+
+    BigQueryIO.Write<Row> toWrite(PipelineOptions options) {
+      BigQueryIO.Write<Row> write =
+          BigQueryIO.<Row>write()
+              .to(configuration.getTable())
+              .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
+              .withFormatFunction(BigQueryUtils.toTableRow())
+              // TODO(https://github.com/apache/beam/issues/33074) BatchLoad's
+              // createTempFilePrefixView() doesn't pick up the pipeline option
+              .withCustomGcsTempLocation(
+                  
ValueProvider.StaticValueProvider.of(options.getTempLocation()))
+              .withWriteDisposition(WriteDisposition.WRITE_APPEND)
+              .useBeamSchema();
+
+      if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) {
+        CreateDisposition createDisposition =
+            
CreateDisposition.valueOf(configuration.getCreateDisposition().toUpperCase());
+        write = write.withCreateDisposition(createDisposition);
+      }
+      if (!Strings.isNullOrEmpty(configuration.getWriteDisposition())) {
+        WriteDisposition writeDisposition =
+            
WriteDisposition.valueOf(configuration.getWriteDisposition().toUpperCase());
+        write = write.withWriteDisposition(writeDisposition);
+      }
+      if (!Strings.isNullOrEmpty(configuration.getKmsKey())) {
+        write = write.withKmsKey(configuration.getKmsKey());
+      }
+      if (testBigQueryServices != null) {
+        write = write.withTestServices(testBigQueryServices);
+      }
+
+      return write;
+    }
+
+    /** Setter for testing using {@link BigQueryServices}. */
+    @VisibleForTesting
+    void setTestBigQueryServices(BigQueryServices testBigQueryServices) {
+      this.testBigQueryServices = testBigQueryServices;
+    }
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQuerySchemaTransformTranslation.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQuerySchemaTransformTranslation.java
new file mode 100644
index 00000000000..555df0d0a2b
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQuerySchemaTransformTranslation.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery.providers;
+
+import static 
org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryDirectReadSchemaTransformProvider.BigQueryDirectReadSchemaTransform;
+import static 
org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryWriteSchemaTransformProvider.BigQueryWriteSchemaTransform;
+
+import com.google.auto.service.AutoService;
+import java.util.Map;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformTranslation;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.construction.PTransformTranslation;
+import 
org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+public class BigQuerySchemaTransformTranslation {
+  public static class BigQueryStorageReadSchemaTransformTranslator
+      extends SchemaTransformTranslation.SchemaTransformPayloadTranslator<
+          BigQueryDirectReadSchemaTransform> {
+    @Override
+    public SchemaTransformProvider provider() {
+      return new BigQueryDirectReadSchemaTransformProvider();
+    }
+
+    @Override
+    public Row toConfigRow(BigQueryDirectReadSchemaTransform transform) {
+      return transform.getConfigurationRow();
+    }
+  }
+
+  public static class BigQueryWriteSchemaTransformTranslator
+      extends SchemaTransformTranslation.SchemaTransformPayloadTranslator<
+          BigQueryWriteSchemaTransform> {
+    @Override
+    public SchemaTransformProvider provider() {
+      return new BigQueryWriteSchemaTransformProvider();
+    }
+
+    @Override
+    public Row toConfigRow(BigQueryWriteSchemaTransform transform) {
+      return transform.getConfigurationRow();
+    }
+  }
+
+  @AutoService(TransformPayloadTranslatorRegistrar.class)
+  public static class ReadWriteRegistrar implements 
TransformPayloadTranslatorRegistrar {
+    @Override
+    @SuppressWarnings({
+      "rawtypes",
+    })
+    public Map<
+            ? extends Class<? extends PTransform>,
+            ? extends PTransformTranslation.TransformPayloadTranslator>
+        getTransformPayloadTranslators() {
+      return ImmutableMap
+          .<Class<? extends PTransform>, 
PTransformTranslation.TransformPayloadTranslator>builder()
+          .put(
+              BigQueryDirectReadSchemaTransform.class,
+              new BigQueryStorageReadSchemaTransformTranslator())
+          .put(BigQueryWriteSchemaTransform.class, new 
BigQueryWriteSchemaTransformTranslator())
+          .build();
+    }
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
index c1c06fc592f..c45433aaf0e 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
@@ -17,20 +17,16 @@
  */
 package org.apache.beam.sdk.io.gcp.bigquery.providers;
 
+import static 
org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryWriteConfiguration.DYNAMIC_DESTINATIONS;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
-import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.api.services.bigquery.model.TableConstraints;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.auto.service.AutoService;
-import com.google.auto.value.AutoValue;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method;
@@ -42,15 +38,11 @@ import 
org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
 import org.apache.beam.sdk.io.gcp.bigquery.RowMutationInformation;
 import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
 import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
-import 
org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransformConfiguration;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.schemas.AutoValueSchema;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.Field;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
-import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
-import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
 import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
@@ -65,12 +57,11 @@ import org.apache.beam.sdk.values.TypeDescriptors;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import org.joda.time.Duration;
 
 /**
  * An implementation of {@link TypedSchemaTransformProvider} for BigQuery 
Storage Write API jobs
- * configured via {@link BigQueryStorageWriteApiSchemaTransformConfiguration}.
+ * configured via {@link BigQueryWriteConfiguration}.
  *
  * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
  * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
@@ -81,7 +72,7 @@ import org.joda.time.Duration;
 })
 @AutoService(SchemaTransformProvider.class)
 public class BigQueryStorageWriteApiSchemaTransformProvider
-    extends 
TypedSchemaTransformProvider<BigQueryStorageWriteApiSchemaTransformConfiguration>
 {
+    extends TypedSchemaTransformProvider<BigQueryWriteConfiguration> {
   private static final Integer DEFAULT_TRIGGER_FREQUENCY_SECS = 5;
   private static final Duration DEFAULT_TRIGGERING_FREQUENCY =
       Duration.standardSeconds(DEFAULT_TRIGGER_FREQUENCY_SECS);
@@ -89,7 +80,6 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
   private static final String FAILED_ROWS_TAG = "FailedRows";
   private static final String FAILED_ROWS_WITH_ERRORS_TAG = 
"FailedRowsWithErrors";
   // magic string that tells us to write to dynamic destinations
-  protected static final String DYNAMIC_DESTINATIONS = "DYNAMIC_DESTINATIONS";
   protected static final String ROW_PROPERTY_MUTATION_INFO = 
"row_mutation_info";
   protected static final String ROW_PROPERTY_MUTATION_TYPE = "mutation_type";
   protected static final String ROW_PROPERTY_MUTATION_SQN = 
"change_sequence_number";
@@ -100,14 +90,13 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
           .build();
 
   @Override
-  protected SchemaTransform from(
-      BigQueryStorageWriteApiSchemaTransformConfiguration configuration) {
+  protected SchemaTransform from(BigQueryWriteConfiguration configuration) {
     return new BigQueryStorageWriteApiSchemaTransform(configuration);
   }
 
   @Override
   public String identifier() {
-    return 
String.format("beam:schematransform:org.apache.beam:bigquery_storage_write:v2");
+    return "beam:schematransform:org.apache.beam:bigquery_storage_write:v2";
   }
 
   @Override
@@ -130,201 +119,17 @@ public class 
BigQueryStorageWriteApiSchemaTransformProvider
     return Arrays.asList(FAILED_ROWS_TAG, FAILED_ROWS_WITH_ERRORS_TAG, 
"errors");
   }
 
-  /** Configuration for writing to BigQuery with Storage Write API. */
-  @DefaultSchema(AutoValueSchema.class)
-  @AutoValue
-  public abstract static class 
BigQueryStorageWriteApiSchemaTransformConfiguration {
-
-    static final Map<String, CreateDisposition> CREATE_DISPOSITIONS =
-        ImmutableMap.<String, CreateDisposition>builder()
-            .put(CreateDisposition.CREATE_IF_NEEDED.name(), 
CreateDisposition.CREATE_IF_NEEDED)
-            .put(CreateDisposition.CREATE_NEVER.name(), 
CreateDisposition.CREATE_NEVER)
-            .build();
-
-    static final Map<String, WriteDisposition> WRITE_DISPOSITIONS =
-        ImmutableMap.<String, WriteDisposition>builder()
-            .put(WriteDisposition.WRITE_TRUNCATE.name(), 
WriteDisposition.WRITE_TRUNCATE)
-            .put(WriteDisposition.WRITE_EMPTY.name(), 
WriteDisposition.WRITE_EMPTY)
-            .put(WriteDisposition.WRITE_APPEND.name(), 
WriteDisposition.WRITE_APPEND)
-            .build();
-
-    @AutoValue
-    public abstract static class ErrorHandling {
-      @SchemaFieldDescription("The name of the output PCollection containing 
failed writes.")
-      public abstract String getOutput();
-
-      public static Builder builder() {
-        return new 
AutoValue_BigQueryStorageWriteApiSchemaTransformProvider_BigQueryStorageWriteApiSchemaTransformConfiguration_ErrorHandling
-            .Builder();
-      }
-
-      @AutoValue.Builder
-      public abstract static class Builder {
-        public abstract Builder setOutput(String output);
-
-        public abstract ErrorHandling build();
-      }
-    }
-
-    public void validate() {
-      String invalidConfigMessage = "Invalid BigQuery Storage Write 
configuration: ";
-
-      // validate output table spec
-      checkArgument(
-          !Strings.isNullOrEmpty(this.getTable()),
-          invalidConfigMessage + "Table spec for a BigQuery Write must be 
specified.");
-
-      // if we have an input table spec, validate it
-      if (!this.getTable().equals(DYNAMIC_DESTINATIONS)) {
-        checkNotNull(BigQueryHelpers.parseTableSpec(this.getTable()));
-      }
-
-      // validate create and write dispositions
-      if (!Strings.isNullOrEmpty(this.getCreateDisposition())) {
-        checkNotNull(
-            CREATE_DISPOSITIONS.get(this.getCreateDisposition().toUpperCase()),
-            invalidConfigMessage
-                + "Invalid create disposition (%s) was specified. Available 
dispositions are: %s",
-            this.getCreateDisposition(),
-            CREATE_DISPOSITIONS.keySet());
-      }
-      if (!Strings.isNullOrEmpty(this.getWriteDisposition())) {
-        checkNotNull(
-            WRITE_DISPOSITIONS.get(this.getWriteDisposition().toUpperCase()),
-            invalidConfigMessage
-                + "Invalid write disposition (%s) was specified. Available 
dispositions are: %s",
-            this.getWriteDisposition(),
-            WRITE_DISPOSITIONS.keySet());
-      }
-
-      if (this.getErrorHandling() != null) {
-        checkArgument(
-            !Strings.isNullOrEmpty(this.getErrorHandling().getOutput()),
-            invalidConfigMessage + "Output must not be empty if error handling 
specified.");
-      }
-
-      if (this.getAutoSharding() != null
-          && this.getAutoSharding()
-          && this.getNumStreams() != null) {
-        checkArgument(
-            this.getNumStreams() == 0,
-            invalidConfigMessage
-                + "Cannot set a fixed number of streams when auto-sharding is 
enabled. Please pick only one of the two options.");
-      }
-    }
-
-    /**
-     * Instantiates a {@link 
BigQueryStorageWriteApiSchemaTransformConfiguration.Builder} instance.
-     */
-    public static Builder builder() {
-      return new 
AutoValue_BigQueryStorageWriteApiSchemaTransformProvider_BigQueryStorageWriteApiSchemaTransformConfiguration
-          .Builder();
-    }
-
-    @SchemaFieldDescription(
-        "The bigquery table to write to. Format: 
[${PROJECT}:]${DATASET}.${TABLE}")
-    public abstract String getTable();
-
-    @SchemaFieldDescription(
-        "Optional field that specifies whether the job is allowed to create 
new tables. "
-            + "The following values are supported: CREATE_IF_NEEDED (the job 
may create the table), CREATE_NEVER ("
-            + "the job must fail if the table does not exist already).")
-    @Nullable
-    public abstract String getCreateDisposition();
-
-    @SchemaFieldDescription(
-        "Specifies the action that occurs if the destination table already 
exists. "
-            + "The following values are supported: "
-            + "WRITE_TRUNCATE (overwrites the table data), "
-            + "WRITE_APPEND (append the data to the table), "
-            + "WRITE_EMPTY (job must fail if the table is not empty).")
-    @Nullable
-    public abstract String getWriteDisposition();
-
-    @SchemaFieldDescription(
-        "Determines how often to 'commit' progress into BigQuery. Default is 
every 5 seconds.")
-    @Nullable
-    public abstract Long getTriggeringFrequencySeconds();
-
-    @SchemaFieldDescription(
-        "This option enables lower latency for insertions to BigQuery but may 
ocassionally "
-            + "duplicate data elements.")
-    @Nullable
-    public abstract Boolean getUseAtLeastOnceSemantics();
-
-    @SchemaFieldDescription(
-        "This option enables using a dynamically determined number of Storage 
Write API streams to write to "
-            + "BigQuery. Only applicable to unbounded data.")
-    @Nullable
-    public abstract Boolean getAutoSharding();
-
-    @SchemaFieldDescription(
-        "Specifies the number of write streams that the Storage API sink will 
use. "
-            + "This parameter is only applicable when writing unbounded data.")
-    @Nullable
-    public abstract Integer getNumStreams();
-
-    @SchemaFieldDescription("This option specifies whether and where to output 
unwritable rows.")
-    @Nullable
-    public abstract ErrorHandling getErrorHandling();
-
-    @SchemaFieldDescription(
-        "This option enables the use of BigQuery CDC functionality. The 
expected PCollection"
-            + " should contain Beam Rows with a schema wrapping the record to 
be inserted and"
-            + " adding the CDC info similar to: {row_mutation_info: 
{mutation_type:\"...\", "
-            + "change_sequence_number:\"...\"}, record: {...}}")
-    @Nullable
-    public abstract Boolean getUseCdcWrites();
-
-    @SchemaFieldDescription(
-        "If CREATE_IF_NEEDED disposition is set, BigQuery table(s) will be 
created with this"
-            + " columns as primary key. Required when CDC writes are enabled 
with CREATE_IF_NEEDED.")
-    @Nullable
-    public abstract List<String> getPrimaryKey();
-
-    /** Builder for {@link 
BigQueryStorageWriteApiSchemaTransformConfiguration}. */
-    @AutoValue.Builder
-    public abstract static class Builder {
-
-      public abstract Builder setTable(String table);
-
-      public abstract Builder setCreateDisposition(String createDisposition);
-
-      public abstract Builder setWriteDisposition(String writeDisposition);
-
-      public abstract Builder setTriggeringFrequencySeconds(Long seconds);
-
-      public abstract Builder setUseAtLeastOnceSemantics(Boolean use);
-
-      public abstract Builder setAutoSharding(Boolean autoSharding);
-
-      public abstract Builder setNumStreams(Integer numStreams);
-
-      public abstract Builder setErrorHandling(ErrorHandling errorHandling);
-
-      public abstract Builder setUseCdcWrites(Boolean cdcWrites);
-
-      public abstract Builder setPrimaryKey(List<String> pkColumns);
-
-      /** Builds a {@link BigQueryStorageWriteApiSchemaTransformConfiguration} 
instance. */
-      public abstract BigQueryStorageWriteApiSchemaTransformProvider
-              .BigQueryStorageWriteApiSchemaTransformConfiguration
-          build();
-    }
-  }
-
   /**
    * A {@link SchemaTransform} for BigQuery Storage Write API, configured with 
{@link
-   * BigQueryStorageWriteApiSchemaTransformConfiguration} and instantiated by 
{@link
+   * BigQueryWriteConfiguration} and instantiated by {@link
    * BigQueryStorageWriteApiSchemaTransformProvider}.
    */
-  protected static class BigQueryStorageWriteApiSchemaTransform extends 
SchemaTransform {
+  public static class BigQueryStorageWriteApiSchemaTransform extends 
SchemaTransform {
 
     private BigQueryServices testBigQueryServices = null;
-    private final BigQueryStorageWriteApiSchemaTransformConfiguration 
configuration;
+    private final BigQueryWriteConfiguration configuration;
 
-    BigQueryStorageWriteApiSchemaTransform(
-        BigQueryStorageWriteApiSchemaTransformConfiguration configuration) {
+    BigQueryStorageWriteApiSchemaTransform(BigQueryWriteConfiguration 
configuration) {
       configuration.validate();
       this.configuration = configuration;
     }
@@ -420,8 +225,7 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
     @Override
     public PCollectionRowTuple expand(PCollectionRowTuple input) {
       // Check that the input exists
-      checkArgument(input.has(INPUT_ROWS_TAG), "Missing expected input tag: 
%s", INPUT_ROWS_TAG);
-      PCollection<Row> inputRows = input.get(INPUT_ROWS_TAG);
+      PCollection<Row> inputRows = input.getSinglePCollection();
 
       BigQueryIO.Write<Row> write = 
createStorageWriteApiTransform(inputRows.getSchema());
 
@@ -540,18 +344,18 @@ public class 
BigQueryStorageWriteApiSchemaTransformProvider
 
       if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) {
         CreateDisposition createDisposition =
-            
BigQueryStorageWriteApiSchemaTransformConfiguration.CREATE_DISPOSITIONS.get(
-                configuration.getCreateDisposition().toUpperCase());
+            
CreateDisposition.valueOf(configuration.getCreateDisposition().toUpperCase());
         write = write.withCreateDisposition(createDisposition);
       }
 
       if (!Strings.isNullOrEmpty(configuration.getWriteDisposition())) {
         WriteDisposition writeDisposition =
-            
BigQueryStorageWriteApiSchemaTransformConfiguration.WRITE_DISPOSITIONS.get(
-                configuration.getWriteDisposition().toUpperCase());
+            
WriteDisposition.valueOf(configuration.getWriteDisposition().toUpperCase());
         write = write.withWriteDisposition(writeDisposition);
       }
-
+      if (!Strings.isNullOrEmpty(configuration.getKmsKey())) {
+        write = write.withKmsKey(configuration.getKmsKey());
+      }
       if (this.testBigQueryServices != null) {
         write = write.withTestServices(testBigQueryServices);
       }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java
new file mode 100644
index 00000000000..4296da7e0cd
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery.providers;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+
+/**
+ * Configuration for writing to BigQuery with SchemaTransforms. Used by {@link
+ * BigQueryStorageWriteApiSchemaTransformProvider} and {@link
+ * BigQueryFileLoadsSchemaTransformProvider}.
+ */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class BigQueryWriteConfiguration {
+  protected static final String DYNAMIC_DESTINATIONS = "DYNAMIC_DESTINATIONS";
+
+  @AutoValue
+  public abstract static class ErrorHandling {
+    @SchemaFieldDescription("The name of the output PCollection containing 
failed writes.")
+    public abstract String getOutput();
+
+    public static Builder builder() {
+      return new AutoValue_BigQueryWriteConfiguration_ErrorHandling.Builder();
+    }
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+      public abstract Builder setOutput(String output);
+
+      public abstract ErrorHandling build();
+    }
+  }
+
+  public void validate() {
+    String invalidConfigMessage = "Invalid BigQuery Storage Write 
configuration: ";
+
+    // validate output table spec
+    checkArgument(
+        !Strings.isNullOrEmpty(this.getTable()),
+        invalidConfigMessage + "Table spec for a BigQuery Write must be 
specified.");
+
+    // if we have an input table spec, validate it
+    if (!this.getTable().equals(DYNAMIC_DESTINATIONS)) {
+      checkNotNull(BigQueryHelpers.parseTableSpec(this.getTable()));
+    }
+
+    // validate create and write dispositions
+    String createDisposition = getCreateDisposition();
+    if (createDisposition != null && !createDisposition.isEmpty()) {
+      List<String> createDispositions =
+          Arrays.stream(BigQueryIO.Write.CreateDisposition.values())
+              .map(c -> c.name())
+              .collect(Collectors.toList());
+      Preconditions.checkArgument(
+          createDispositions.contains(createDisposition.toUpperCase()),
+          "Invalid create disposition (%s) was specified. Available 
dispositions are: %s",
+          createDisposition,
+          createDispositions);
+    }
+    String writeDisposition = getWriteDisposition();
+    if (writeDisposition != null && !writeDisposition.isEmpty()) {
+      List<String> writeDispostions =
+          Arrays.stream(BigQueryIO.Write.WriteDisposition.values())
+              .map(w -> w.name())
+              .collect(Collectors.toList());
+      Preconditions.checkArgument(
+          writeDispostions.contains(writeDisposition.toUpperCase()),
+          "Invalid write disposition (%s) was specified. Available 
dispositions are: %s",
+          writeDisposition,
+          writeDispostions);
+    }
+
+    ErrorHandling errorHandling = getErrorHandling();
+    if (errorHandling != null) {
+      checkArgument(
+          !Strings.isNullOrEmpty(errorHandling.getOutput()),
+          invalidConfigMessage + "Output must not be empty if error handling 
specified.");
+    }
+
+    Boolean autoSharding = getAutoSharding();
+    Integer numStreams = getNumStreams();
+    if (autoSharding != null && autoSharding && numStreams != null) {
+      checkArgument(
+          numStreams == 0,
+          invalidConfigMessage
+              + "Cannot set a fixed number of streams when auto-sharding is 
enabled. Please pick only one of the two options.");
+    }
+  }
+
+  /** Instantiates a {@link BigQueryWriteConfiguration.Builder} instance. */
+  public static Builder builder() {
+    return new AutoValue_BigQueryWriteConfiguration.Builder();
+  }
+
+  @SchemaFieldDescription(
+      "The bigquery table to write to. Format: 
[${PROJECT}:]${DATASET}.${TABLE}")
+  public abstract String getTable();
+
+  @SchemaFieldDescription(
+      "Optional field that specifies whether the job is allowed to create new 
tables. "
+          + "The following values are supported: CREATE_IF_NEEDED (the job may 
create the table), CREATE_NEVER ("
+          + "the job must fail if the table does not exist already).")
+  @Nullable
+  public abstract String getCreateDisposition();
+
+  @SchemaFieldDescription(
+      "Specifies the action that occurs if the destination table already 
exists. "
+          + "The following values are supported: "
+          + "WRITE_TRUNCATE (overwrites the table data), "
+          + "WRITE_APPEND (append the data to the table), "
+          + "WRITE_EMPTY (job must fail if the table is not empty).")
+  @Nullable
+  public abstract String getWriteDisposition();
+
+  @SchemaFieldDescription(
+      "Determines how often to 'commit' progress into BigQuery. Default is 
every 5 seconds.")
+  @Nullable
+  public abstract Long getTriggeringFrequencySeconds();
+
+  @SchemaFieldDescription(
+      "This option enables lower latency for insertions to BigQuery but may 
ocassionally "
+          + "duplicate data elements.")
+  @Nullable
+  public abstract Boolean getUseAtLeastOnceSemantics();
+
+  @SchemaFieldDescription(
+      "This option enables using a dynamically determined number of Storage 
Write API streams to write to "
+          + "BigQuery. Only applicable to unbounded data.")
+  @Nullable
+  public abstract Boolean getAutoSharding();
+
+  @SchemaFieldDescription(
+      "Specifies the number of write streams that the Storage API sink will 
use. "
+          + "This parameter is only applicable when writing unbounded data.")
+  @Nullable
+  public abstract Integer getNumStreams();
+
+  @SchemaFieldDescription("Use this Cloud KMS key to encrypt your data")
+  @Nullable
+  public abstract String getKmsKey();
+
+  @SchemaFieldDescription("This option specifies whether and where to output 
unwritable rows.")
+  @Nullable
+  public abstract ErrorHandling getErrorHandling();
+
+  @SchemaFieldDescription(
+      "This option enables the use of BigQuery CDC functionality. The expected 
PCollection"
+          + " should contain Beam Rows with a schema wrapping the record to be 
inserted and"
+          + " adding the CDC info similar to: {row_mutation_info: 
{mutation_type:\"...\", "
+          + "change_sequence_number:\"...\"}, record: {...}}")
+  @Nullable
+  public abstract Boolean getUseCdcWrites();
+
+  @SchemaFieldDescription(
+      "If CREATE_IF_NEEDED disposition is set, BigQuery table(s) will be 
created with this"
+          + " columns as primary key. Required when CDC writes are enabled 
with CREATE_IF_NEEDED.")
+  @Nullable
+  public abstract List<String> getPrimaryKey();
+
+  /** Builder for {@link BigQueryWriteConfiguration}. */
+  @AutoValue.Builder
+  public abstract static class Builder {
+
+    public abstract Builder setTable(String table);
+
+    public abstract Builder setCreateDisposition(String createDisposition);
+
+    public abstract Builder setWriteDisposition(String writeDisposition);
+
+    public abstract Builder setTriggeringFrequencySeconds(Long seconds);
+
+    public abstract Builder setUseAtLeastOnceSemantics(Boolean use);
+
+    public abstract Builder setAutoSharding(Boolean autoSharding);
+
+    public abstract Builder setNumStreams(Integer numStreams);
+
+    public abstract Builder setKmsKey(String kmsKey);
+
+    public abstract Builder setErrorHandling(ErrorHandling errorHandling);
+
+    public abstract Builder setUseCdcWrites(Boolean cdcWrites);
+
+    public abstract Builder setPrimaryKey(List<String> pkColumns);
+
+    /** Builds a {@link BigQueryWriteConfiguration} instance. */
+    public abstract BigQueryWriteConfiguration build();
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteSchemaTransformProvider.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteSchemaTransformProvider.java
new file mode 100644
index 00000000000..abab169d693
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteSchemaTransformProvider.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery.providers;
+
+import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A BigQuery Write SchemaTransformProvider that routes to either {@link
+ * BigQueryFileLoadsSchemaTransformProvider} or {@link
+ * BigQueryStorageWriteApiSchemaTransformProvider}.
+ *
+ * <p>Internal only. Used by the Managed Transform layer.
+ */
+@Internal
+@AutoService(SchemaTransformProvider.class)
+public class BigQueryWriteSchemaTransformProvider
+    extends TypedSchemaTransformProvider<BigQueryWriteConfiguration> {
+  @Override
+  public String identifier() {
+    return getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_WRITE);
+  }
+
+  @Override
+  protected SchemaTransform from(BigQueryWriteConfiguration configuration) {
+    return new BigQueryWriteSchemaTransform(configuration);
+  }
+
+  public static class BigQueryWriteSchemaTransform extends SchemaTransform {
+    private final BigQueryWriteConfiguration configuration;
+
+    BigQueryWriteSchemaTransform(BigQueryWriteConfiguration configuration) {
+      configuration.validate();
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      if 
(input.getSinglePCollection().isBounded().equals(PCollection.IsBounded.BOUNDED))
 {
+        return input.apply(new 
BigQueryFileLoadsSchemaTransformProvider().from(configuration));
+      } else { // UNBOUNDED
+        return input.apply(
+            new 
BigQueryStorageWriteApiSchemaTransformProvider().from(configuration));
+      }
+    }
+
+    public Row getConfigurationRow() {
+      try {
+        // To stay consistent with our SchemaTransform configuration naming 
conventions,
+        // we sort lexicographically
+        return SchemaRegistry.createDefault()
+            .getToRowFunction(BigQueryWriteConfiguration.class)
+            .apply(configuration)
+            .sorted()
+            .toSnakeCase();
+      } catch (NoSuchSchemaException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProviderTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProviderTest.java
deleted file mode 100644
index dd8bb9fc866..00000000000
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProviderTest.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.bigquery;
-
-import static 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryFileLoadsWriteSchemaTransformProvider.INPUT_TAG;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThrows;
-
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryFileLoadsWriteSchemaTransformProvider.BigQueryWriteSchemaTransform;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
-import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
-import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
-import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.Schema.Field;
-import org.apache.beam.sdk.schemas.Schema.FieldType;
-import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.DisplayData.Identifier;
-import org.apache.beam.sdk.transforms.display.DisplayData.Item;
-import org.apache.beam.sdk.values.PCollectionRowTuple;
-import org.apache.beam.sdk.values.Row;
-import org.apache.commons.lang3.tuple.Pair;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Test for {@link BigQueryFileLoadsWriteSchemaTransformProvider}. */
-@RunWith(JUnit4.class)
-public class BigQueryFileLoadsWriteSchemaTransformProviderTest {
-
-  private static final String PROJECT = "fakeproject";
-  private static final String DATASET = "fakedataset";
-  private static final String TABLE_ID = "faketable";
-
-  private static final TableReference TABLE_REFERENCE =
-      new 
TableReference().setProjectId(PROJECT).setDatasetId(DATASET).setTableId(TABLE_ID);
-
-  private static final Schema SCHEMA =
-      Schema.of(Field.of("name", FieldType.STRING), Field.of("number", 
FieldType.INT64));
-
-  private static final TableSchema TABLE_SCHEMA = 
BigQueryUtils.toTableSchema(SCHEMA);
-
-  private static final List<Row> ROWS =
-      Arrays.asList(
-          Row.withSchema(SCHEMA).withFieldValue("name", 
"a").withFieldValue("number", 1L).build(),
-          Row.withSchema(SCHEMA).withFieldValue("name", 
"b").withFieldValue("number", 2L).build(),
-          Row.withSchema(SCHEMA).withFieldValue("name", 
"c").withFieldValue("number", 3L).build());
-
-  private static final BigQueryOptions OPTIONS =
-      TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
-  private final FakeDatasetService fakeDatasetService = new 
FakeDatasetService();
-  private final FakeJobService fakeJobService = new FakeJobService();
-  private final TemporaryFolder temporaryFolder = new TemporaryFolder();
-  private final FakeBigQueryServices fakeBigQueryServices =
-      new FakeBigQueryServices()
-          .withJobService(fakeJobService)
-          .withDatasetService(fakeDatasetService);
-
-  @Before
-  public void setUp() throws IOException, InterruptedException {
-    FakeDatasetService.setUp();
-    fakeDatasetService.createDataset(PROJECT, DATASET, "", "", null);
-    temporaryFolder.create();
-    OPTIONS.setProject(PROJECT);
-    OPTIONS.setTempLocation(temporaryFolder.getRoot().getAbsolutePath());
-  }
-
-  @After
-  public void tearDown() {
-    temporaryFolder.delete();
-  }
-
-  @Rule public transient TestPipeline p = TestPipeline.fromOptions(OPTIONS);
-
-  @Test
-  public void testLoad() throws IOException, InterruptedException {
-    BigQueryFileLoadsWriteSchemaTransformProvider provider =
-        new BigQueryFileLoadsWriteSchemaTransformProvider();
-    BigQueryFileLoadsWriteSchemaTransformConfiguration configuration =
-        BigQueryFileLoadsWriteSchemaTransformConfiguration.builder()
-            .setTableSpec(BigQueryHelpers.toTableSpec(TABLE_REFERENCE))
-            .setWriteDisposition(WriteDisposition.WRITE_TRUNCATE.name())
-            .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name())
-            .build();
-    BigQueryWriteSchemaTransform schemaTransform =
-        (BigQueryWriteSchemaTransform) provider.from(configuration);
-    schemaTransform.setTestBigQueryServices(fakeBigQueryServices);
-    String tag = provider.inputCollectionNames().get(0);
-    PCollectionRowTuple input =
-        PCollectionRowTuple.of(tag, 
p.apply(Create.of(ROWS).withRowSchema(SCHEMA)));
-    input.apply(schemaTransform);
-
-    p.run();
-
-    assertNotNull(fakeDatasetService.getTable(TABLE_REFERENCE));
-    assertEquals(ROWS.size(), fakeDatasetService.getAllRows(PROJECT, DATASET, 
TABLE_ID).size());
-  }
-
-  @Test
-  public void testValidatePipelineOptions() {
-    List<
-            Pair<
-                BigQueryFileLoadsWriteSchemaTransformConfiguration.Builder,
-                Class<? extends Exception>>>
-        cases =
-            Arrays.asList(
-                Pair.of(
-                    
BigQueryFileLoadsWriteSchemaTransformConfiguration.builder()
-                        .setTableSpec("project.doesnot.exist")
-                        
.setCreateDisposition(CreateDisposition.CREATE_NEVER.name())
-                        
.setWriteDisposition(WriteDisposition.WRITE_APPEND.name()),
-                    InvalidConfigurationException.class),
-                Pair.of(
-                    
BigQueryFileLoadsWriteSchemaTransformConfiguration.builder()
-                        .setTableSpec(String.format("%s.%s.%s", PROJECT, 
DATASET, "doesnotexist"))
-                        
.setCreateDisposition(CreateDisposition.CREATE_NEVER.name())
-                        
.setWriteDisposition(WriteDisposition.WRITE_EMPTY.name()),
-                    InvalidConfigurationException.class),
-                Pair.of(
-                    
BigQueryFileLoadsWriteSchemaTransformConfiguration.builder()
-                        .setTableSpec("project.doesnot.exist")
-                        
.setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name())
-                        
.setWriteDisposition(WriteDisposition.WRITE_APPEND.name()),
-                    null));
-    for (Pair<
-            BigQueryFileLoadsWriteSchemaTransformConfiguration.Builder, 
Class<? extends Exception>>
-        caze : cases) {
-      BigQueryWriteSchemaTransform transform = 
transformFrom(caze.getLeft().build());
-      if (caze.getRight() != null) {
-        assertThrows(caze.getRight(), () -> 
transform.validate(p.getOptions()));
-      } else {
-        transform.validate(p.getOptions());
-      }
-    }
-  }
-
-  @Test
-  public void testToWrite() {
-    List<
-            Pair<
-                BigQueryFileLoadsWriteSchemaTransformConfiguration.Builder,
-                BigQueryIO.Write<TableRow>>>
-        cases =
-            Arrays.asList(
-                Pair.of(
-                    
BigQueryFileLoadsWriteSchemaTransformConfiguration.builder()
-                        
.setTableSpec(BigQueryHelpers.toTableSpec(TABLE_REFERENCE))
-                        
.setCreateDisposition(CreateDisposition.CREATE_NEVER.name())
-                        
.setWriteDisposition(WriteDisposition.WRITE_EMPTY.name()),
-                    BigQueryIO.writeTableRows()
-                        .to(TABLE_REFERENCE)
-                        .withCreateDisposition(CreateDisposition.CREATE_NEVER)
-                        .withWriteDisposition(WriteDisposition.WRITE_EMPTY)
-                        .withSchema(TABLE_SCHEMA)),
-                Pair.of(
-                    
BigQueryFileLoadsWriteSchemaTransformConfiguration.builder()
-                        
.setTableSpec(BigQueryHelpers.toTableSpec(TABLE_REFERENCE))
-                        
.setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name())
-                        
.setWriteDisposition(WriteDisposition.WRITE_TRUNCATE.name()),
-                    BigQueryIO.writeTableRows()
-                        .to(TABLE_REFERENCE)
-                        
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
-                        .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)
-                        .withSchema(TABLE_SCHEMA)));
-    for (Pair<
-            BigQueryFileLoadsWriteSchemaTransformConfiguration.Builder, 
BigQueryIO.Write<TableRow>>
-        caze : cases) {
-      BigQueryWriteSchemaTransform transform = 
transformFrom(caze.getLeft().build());
-      Map<Identifier, Item> gotDisplayData = 
DisplayData.from(transform.toWrite(SCHEMA)).asMap();
-      Map<Identifier, Item> wantDisplayData = 
DisplayData.from(caze.getRight()).asMap();
-      Set<Identifier> keys = new HashSet<>();
-      keys.addAll(gotDisplayData.keySet());
-      keys.addAll(wantDisplayData.keySet());
-      for (Identifier key : keys) {
-        Item got = null;
-        Item want = null;
-        if (gotDisplayData.containsKey(key)) {
-          got = gotDisplayData.get(key);
-        }
-        if (wantDisplayData.containsKey(key)) {
-          want = wantDisplayData.get(key);
-        }
-        assertEquals(want, got);
-      }
-    }
-  }
-
-  @Test
-  public void validatePCollectionRowTupleInput() {
-    PCollectionRowTuple empty = PCollectionRowTuple.empty(p);
-    PCollectionRowTuple valid =
-        PCollectionRowTuple.of(
-            INPUT_TAG, p.apply("CreateRowsWithValidSchema", 
Create.of(ROWS)).setRowSchema(SCHEMA));
-
-    PCollectionRowTuple invalid =
-        PCollectionRowTuple.of(
-            INPUT_TAG,
-            p.apply(
-                "CreateRowsWithInvalidSchema",
-                Create.of(
-                    Row.nullRow(
-                        Schema.builder().addNullableField("name", 
FieldType.STRING).build()))));
-
-    BigQueryWriteSchemaTransform transform =
-        transformFrom(
-            BigQueryFileLoadsWriteSchemaTransformConfiguration.builder()
-                .setTableSpec(BigQueryHelpers.toTableSpec(TABLE_REFERENCE))
-                
.setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name())
-                .setWriteDisposition(WriteDisposition.WRITE_APPEND.name())
-                .build());
-
-    assertThrows(IllegalArgumentException.class, () -> 
transform.validate(empty));
-
-    assertThrows(IllegalStateException.class, () -> 
transform.validate(invalid));
-
-    transform.validate(valid);
-
-    p.run();
-  }
-
-  private BigQueryWriteSchemaTransform transformFrom(
-      BigQueryFileLoadsWriteSchemaTransformConfiguration configuration) {
-    BigQueryFileLoadsWriteSchemaTransformProvider provider =
-        new BigQueryFileLoadsWriteSchemaTransformProvider();
-    BigQueryWriteSchemaTransform transform =
-        (BigQueryWriteSchemaTransform) provider.from(configuration);
-
-    transform.setTestBigQueryServices(fakeBigQueryServices);
-
-    return transform;
-  }
-}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProviderTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProviderTest.java
new file mode 100644
index 00000000000..897d95da3b1
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProviderTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery.providers;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import com.google.api.services.bigquery.model.TableReference;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
+import 
org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryFileLoadsSchemaTransformProvider.BigQueryFileLoadsSchemaTransform;
+import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
+import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
+import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
+import org.apache.beam.sdk.managed.Managed;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.util.construction.PipelineTranslation;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test for {@link BigQueryFileLoadsSchemaTransformProvider}. */
+@RunWith(JUnit4.class)
+public class BigQueryFileLoadsSchemaTransformProviderTest {
+
+  private static final String PROJECT = "fakeproject";
+  private static final String DATASET = "fakedataset";
+  private static final String TABLE_ID = "faketable";
+
+  private static final TableReference TABLE_REFERENCE =
+      new 
TableReference().setProjectId(PROJECT).setDatasetId(DATASET).setTableId(TABLE_ID);
+
+  private static final Schema SCHEMA =
+      Schema.of(Field.of("name", FieldType.STRING), Field.of("number", 
FieldType.INT64));
+
+  private static final List<Row> ROWS =
+      Arrays.asList(
+          Row.withSchema(SCHEMA).withFieldValue("name", 
"a").withFieldValue("number", 1L).build(),
+          Row.withSchema(SCHEMA).withFieldValue("name", 
"b").withFieldValue("number", 2L).build(),
+          Row.withSchema(SCHEMA).withFieldValue("name", 
"c").withFieldValue("number", 3L).build());
+
+  private static final BigQueryOptions OPTIONS =
+      TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
+  private final FakeDatasetService fakeDatasetService = new 
FakeDatasetService();
+  private final FakeJobService fakeJobService = new FakeJobService();
+  private final TemporaryFolder temporaryFolder = new TemporaryFolder();
+  private final FakeBigQueryServices fakeBigQueryServices =
+      new FakeBigQueryServices()
+          .withJobService(fakeJobService)
+          .withDatasetService(fakeDatasetService);
+
+  @Before
+  public void setUp() throws IOException, InterruptedException {
+    FakeDatasetService.setUp();
+    fakeDatasetService.createDataset(PROJECT, DATASET, "", "", null);
+    temporaryFolder.create();
+    OPTIONS.setProject(PROJECT);
+    OPTIONS.setTempLocation(temporaryFolder.getRoot().getAbsolutePath());
+  }
+
+  @After
+  public void tearDown() {
+    temporaryFolder.delete();
+  }
+
+  @Rule public transient TestPipeline p = TestPipeline.fromOptions(OPTIONS);
+
+  @Test
+  public void testLoad() throws IOException, InterruptedException {
+    BigQueryFileLoadsSchemaTransformProvider provider =
+        new BigQueryFileLoadsSchemaTransformProvider();
+    BigQueryWriteConfiguration configuration =
+        BigQueryWriteConfiguration.builder()
+            .setTable(BigQueryHelpers.toTableSpec(TABLE_REFERENCE))
+            .setWriteDisposition(WriteDisposition.WRITE_TRUNCATE.name())
+            .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name())
+            .build();
+    BigQueryFileLoadsSchemaTransform schemaTransform =
+        (BigQueryFileLoadsSchemaTransform) provider.from(configuration);
+    schemaTransform.setTestBigQueryServices(fakeBigQueryServices);
+    String tag = provider.inputCollectionNames().get(0);
+    PCollectionRowTuple input =
+        PCollectionRowTuple.of(tag, 
p.apply(Create.of(ROWS).withRowSchema(SCHEMA)));
+    input.apply(schemaTransform);
+
+    p.run();
+
+    assertNotNull(fakeDatasetService.getTable(TABLE_REFERENCE));
+    assertEquals(ROWS.size(), fakeDatasetService.getAllRows(PROJECT, DATASET, 
TABLE_ID).size());
+  }
+
+  @Test
+  public void testManagedChoosesFileLoadsForBoundedWrites() {
+    PCollection<Row> batchInput = 
p.apply(Create.of(ROWS)).setRowSchema(SCHEMA);
+    batchInput.apply(
+        Managed.write(Managed.BIGQUERY)
+            .withConfig(ImmutableMap.of("table", "project.dataset.table")));
+
+    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
+    List<RunnerApi.PTransform> writeTransformProto =
+        pipelineProto.getComponents().getTransformsMap().values().stream()
+            .filter(
+                tr ->
+                    tr.getUniqueName()
+                        
.contains(BigQueryFileLoadsSchemaTransform.class.getSimpleName()))
+            .collect(Collectors.toList());
+    assertThat(writeTransformProto.size(), greaterThan(0));
+    p.enableAbandonedNodeEnforcement(false);
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java
new file mode 100644
index 00000000000..63727107a65
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery.providers;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
+import org.apache.beam.sdk.managed.Managed;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PeriodicImpulse;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** This class tests the execution of {@link Managed} BigQueryIO. */
+@RunWith(JUnit4.class)
+public class BigQueryManagedIT {
+  @Rule public TestName testName = new TestName();
+  @Rule public transient TestPipeline writePipeline = TestPipeline.create();
+  @Rule public transient TestPipeline readPipeline = TestPipeline.create();
+
+  private static final Schema SCHEMA =
+      Schema.of(
+          Schema.Field.of("str", Schema.FieldType.STRING),
+          Schema.Field.of("number", Schema.FieldType.INT64));
+
+  private static final List<Row> ROWS =
+      LongStream.range(0, 20)
+          .mapToObj(
+              i ->
+                  Row.withSchema(SCHEMA)
+                      .withFieldValue("str", Long.toString(i))
+                      .withFieldValue("number", i)
+                      .build())
+          .collect(Collectors.toList());
+
+  private static final BigqueryClient BQ_CLIENT = new 
BigqueryClient("BigQueryManagedIT");
+
+  private static final String PROJECT =
+      TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+  private static final String BIG_QUERY_DATASET_ID = "bigquery_managed_" + 
System.nanoTime();
+
+  @BeforeClass
+  public static void setUpTestEnvironment() throws IOException, 
InterruptedException {
+    // Create one BQ dataset for all test cases.
+    BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID, null);
+  }
+
+  @AfterClass
+  public static void cleanup() {
+    BQ_CLIENT.deleteDataset(PROJECT, BIG_QUERY_DATASET_ID);
+  }
+
+  @Test
+  public void testBatchFileLoadsWriteRead() {
+    String table =
+        String.format("%s:%s.%s", PROJECT, BIG_QUERY_DATASET_ID, 
testName.getMethodName());
+    Map<String, Object> config = ImmutableMap.of("table", table);
+
+    // file loads requires a GCS temp location
+    String tempLocation = 
writePipeline.getOptions().as(TestPipelineOptions.class).getTempRoot();
+    writePipeline.getOptions().setTempLocation(tempLocation);
+
+    // batch write
+    PCollectionRowTuple.of("input", getInput(writePipeline, false))
+        .apply(Managed.write(Managed.BIGQUERY).withConfig(config));
+    writePipeline.run().waitUntilFinish();
+
+    // read and validate
+    PCollection<Row> outputRows =
+        readPipeline
+            .apply(Managed.read(Managed.BIGQUERY).withConfig(config))
+            .getSinglePCollection();
+    PAssert.that(outputRows).containsInAnyOrder(ROWS);
+    readPipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testStreamingStorageWriteRead() {
+    String table =
+        String.format("%s:%s.%s", PROJECT, BIG_QUERY_DATASET_ID, 
testName.getMethodName());
+    Map<String, Object> config = ImmutableMap.of("table", table);
+
+    // streaming write
+    PCollectionRowTuple.of("input", getInput(writePipeline, true))
+        .apply(Managed.write(Managed.BIGQUERY).withConfig(config));
+    writePipeline.run().waitUntilFinish();
+
+    // read and validate
+    PCollection<Row> outputRows =
+        readPipeline
+            .apply(Managed.read(Managed.BIGQUERY).withConfig(config))
+            .getSinglePCollection();
+    PAssert.that(outputRows).containsInAnyOrder(ROWS);
+    readPipeline.run().waitUntilFinish();
+  }
+
+  public PCollection<Row> getInput(Pipeline p, boolean isStreaming) {
+    if (isStreaming) {
+      return p.apply(
+              PeriodicImpulse.create()
+                  .startAt(new Instant(0))
+                  .stopAt(new Instant(19))
+                  .withInterval(Duration.millis(1)))
+          .apply(
+              MapElements.into(TypeDescriptors.rows())
+                  .via(
+                      i ->
+                          Row.withSchema(SCHEMA)
+                              .withFieldValue("str", 
Long.toString(i.getMillis()))
+                              .withFieldValue("number", i.getMillis())
+                              .build()))
+          .setRowSchema(SCHEMA);
+    }
+    return p.apply(Create.of(ROWS)).setRowSchema(SCHEMA);
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQuerySchemaTransformTranslationTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQuerySchemaTransformTranslationTest.java
new file mode 100644
index 00000000000..822c607aa3c
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQuerySchemaTransformTranslationTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery.providers;
+
+import static 
org.apache.beam.model.pipeline.v1.ExternalTransforms.ExpansionMethods.Enum.SCHEMA_TRANSFORM;
+import static 
org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryDirectReadSchemaTransformProvider.BigQueryDirectReadSchemaTransform;
+import static 
org.apache.beam.sdk.io.gcp.bigquery.providers.BigQuerySchemaTransformTranslation.BigQueryStorageReadSchemaTransformTranslator;
+import static 
org.apache.beam.sdk.io.gcp.bigquery.providers.BigQuerySchemaTransformTranslation.BigQueryWriteSchemaTransformTranslator;
+import static 
org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryWriteSchemaTransformProvider.BigQueryWriteSchemaTransform;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import 
org.apache.beam.model.pipeline.v1.ExternalTransforms.SchemaTransformPayload;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaTranslation;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.util.construction.BeamUrns;
+import org.apache.beam.sdk.util.construction.PipelineTranslation;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.InvalidProtocolBufferException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class BigQuerySchemaTransformTranslationTest {
+  static final BigQueryWriteSchemaTransformProvider WRITE_PROVIDER =
+      new BigQueryWriteSchemaTransformProvider();
+  static final BigQueryDirectReadSchemaTransformProvider READ_PROVIDER =
+      new BigQueryDirectReadSchemaTransformProvider();
+  static final Row WRITE_CONFIG_ROW =
+      Row.withSchema(WRITE_PROVIDER.configurationSchema())
+          .withFieldValue("table", "project:dataset.table")
+          .withFieldValue("create_disposition", "create_never")
+          .withFieldValue("write_disposition", "write_append")
+          .withFieldValue("triggering_frequency_seconds", 5L)
+          .withFieldValue("use_at_least_once_semantics", false)
+          .withFieldValue("auto_sharding", false)
+          .withFieldValue("num_streams", 5)
+          .withFieldValue("error_handling", null)
+          .build();
+  static final Row READ_CONFIG_ROW =
+      Row.withSchema(READ_PROVIDER.configurationSchema())
+          .withFieldValue("query", null)
+          .withFieldValue("table_spec", 
"apache-beam-testing.samples.weather_stations")
+          .withFieldValue("row_restriction", "col < 5")
+          .withFieldValue("selected_fields", Arrays.asList("col1", "col2", 
"col3"))
+          .build();
+
+  @Test
+  public void testRecreateWriteTransformFromRow() {
+    BigQueryWriteSchemaTransform writeTransform =
+        (BigQueryWriteSchemaTransform) WRITE_PROVIDER.from(WRITE_CONFIG_ROW);
+
+    BigQueryWriteSchemaTransformTranslator translator =
+        new BigQueryWriteSchemaTransformTranslator();
+    Row translatedRow = translator.toConfigRow(writeTransform);
+
+    BigQueryWriteSchemaTransform writeTransformFromRow =
+        translator.fromConfigRow(translatedRow, 
PipelineOptionsFactory.create());
+
+    assertEquals(WRITE_CONFIG_ROW, 
writeTransformFromRow.getConfigurationRow());
+  }
+
+  @Test
+  public void testWriteTransformProtoTranslation()
+      throws InvalidProtocolBufferException, IOException {
+    // First build a pipeline
+    Pipeline p = Pipeline.create();
+    Schema inputSchema = Schema.builder().addByteArrayField("b").build();
+    PCollection<Row> input =
+        p.apply(
+                Create.of(
+                    Collections.singletonList(
+                        Row.withSchema(inputSchema).addValue(new byte[] {1, 2, 
3}).build())))
+            .setRowSchema(inputSchema);
+
+    BigQueryWriteSchemaTransform writeTransform =
+        (BigQueryWriteSchemaTransform) WRITE_PROVIDER.from(WRITE_CONFIG_ROW);
+    PCollectionRowTuple.of("input", input).apply(writeTransform);
+
+    // Then translate the pipeline to a proto and extract 
KafkaWriteSchemaTransform proto
+    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
+    List<RunnerApi.PTransform> writeTransformProto =
+        pipelineProto.getComponents().getTransformsMap().values().stream()
+            .filter(
+                tr -> {
+                  RunnerApi.FunctionSpec spec = tr.getSpec();
+                  try {
+                    return 
spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM))
+                        && SchemaTransformPayload.parseFrom(spec.getPayload())
+                            .getIdentifier()
+                            .equals(WRITE_PROVIDER.identifier());
+                  } catch (InvalidProtocolBufferException e) {
+                    throw new RuntimeException(e);
+                  }
+                })
+            .collect(Collectors.toList());
+    assertEquals(1, writeTransformProto.size());
+    RunnerApi.FunctionSpec spec = writeTransformProto.get(0).getSpec();
+
+    // Check that the proto contains correct values
+    SchemaTransformPayload payload = 
SchemaTransformPayload.parseFrom(spec.getPayload());
+    Schema schemaFromSpec = 
SchemaTranslation.schemaFromProto(payload.getConfigurationSchema());
+    assertEquals(WRITE_PROVIDER.configurationSchema(), schemaFromSpec);
+    Row rowFromSpec = 
RowCoder.of(schemaFromSpec).decode(payload.getConfigurationRow().newInput());
+
+    assertEquals(WRITE_CONFIG_ROW, rowFromSpec);
+
+    // Use the information in the proto to recreate the 
KafkaWriteSchemaTransform
+    BigQueryWriteSchemaTransformTranslator translator =
+        new BigQueryWriteSchemaTransformTranslator();
+    BigQueryWriteSchemaTransform writeTransformFromSpec =
+        translator.fromConfigRow(rowFromSpec, PipelineOptionsFactory.create());
+
+    assertEquals(WRITE_CONFIG_ROW, 
writeTransformFromSpec.getConfigurationRow());
+  }
+
+  @Test
+  public void testReCreateReadTransformFromRow() {
+    BigQueryDirectReadSchemaTransform readTransform =
+        (BigQueryDirectReadSchemaTransform) 
READ_PROVIDER.from(READ_CONFIG_ROW);
+
+    BigQueryStorageReadSchemaTransformTranslator translator =
+        new BigQueryStorageReadSchemaTransformTranslator();
+    Row row = translator.toConfigRow(readTransform);
+
+    BigQueryDirectReadSchemaTransform readTransformFromRow =
+        translator.fromConfigRow(row, PipelineOptionsFactory.create());
+
+    assertEquals(READ_CONFIG_ROW, readTransformFromRow.getConfigurationRow());
+  }
+
+  @Test
+  public void testReadTransformProtoTranslation()
+      throws InvalidProtocolBufferException, IOException {
+    // First build a pipeline
+    Pipeline p = Pipeline.create();
+
+    BigQueryDirectReadSchemaTransform readTransform =
+        (BigQueryDirectReadSchemaTransform) 
READ_PROVIDER.from(READ_CONFIG_ROW);
+
+    PCollectionRowTuple.empty(p).apply(readTransform);
+
+    // Then translate the pipeline to a proto and extract 
KafkaReadSchemaTransform proto
+    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
+    List<RunnerApi.PTransform> readTransformProto =
+        pipelineProto.getComponents().getTransformsMap().values().stream()
+            .filter(
+                tr -> {
+                  RunnerApi.FunctionSpec spec = tr.getSpec();
+                  try {
+                    return 
spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM))
+                        && SchemaTransformPayload.parseFrom(spec.getPayload())
+                            .getIdentifier()
+                            .equals(READ_PROVIDER.identifier());
+                  } catch (InvalidProtocolBufferException e) {
+                    throw new RuntimeException(e);
+                  }
+                })
+            .collect(Collectors.toList());
+    assertEquals(1, readTransformProto.size());
+    RunnerApi.FunctionSpec spec = readTransformProto.get(0).getSpec();
+
+    // Check that the proto contains correct values
+    SchemaTransformPayload payload = 
SchemaTransformPayload.parseFrom(spec.getPayload());
+    Schema schemaFromSpec = 
SchemaTranslation.schemaFromProto(payload.getConfigurationSchema());
+    assertEquals(READ_PROVIDER.configurationSchema(), schemaFromSpec);
+    Row rowFromSpec = 
RowCoder.of(schemaFromSpec).decode(payload.getConfigurationRow().newInput());
+    assertEquals(READ_CONFIG_ROW, rowFromSpec);
+
+    // Use the information in the proto to recreate the 
KafkaReadSchemaTransform
+    BigQueryStorageReadSchemaTransformTranslator translator =
+        new BigQueryStorageReadSchemaTransformTranslator();
+    BigQueryDirectReadSchemaTransform readTransformFromSpec =
+        translator.fromConfigRow(rowFromSpec, PipelineOptionsFactory.create());
+
+    assertEquals(READ_CONFIG_ROW, readTransformFromSpec.getConfigurationRow());
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java
index 87ba2961461..7b59552bbbe 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.io.gcp.bigquery.providers;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThrows;
@@ -32,13 +34,14 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
 import 
org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransform;
-import 
org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransformConfiguration;
 import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
 import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
 import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
+import org.apache.beam.sdk.managed.Managed;
 import org.apache.beam.sdk.metrics.MetricNameFilter;
 import org.apache.beam.sdk.metrics.MetricQueryResults;
 import org.apache.beam.sdk.metrics.MetricResult;
@@ -50,13 +53,16 @@ import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.util.construction.PipelineTranslation;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionRowTuple;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TypeDescriptors;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -108,15 +114,14 @@ public class 
BigQueryStorageWriteApiSchemaTransformProviderTest {
 
   @Test
   public void testInvalidConfig() {
-    List<BigQueryStorageWriteApiSchemaTransformConfiguration.Builder> 
invalidConfigs =
+    List<BigQueryWriteConfiguration.Builder> invalidConfigs =
         Arrays.asList(
-            BigQueryStorageWriteApiSchemaTransformConfiguration.builder()
-                .setTable("not_a_valid_table_spec"),
-            BigQueryStorageWriteApiSchemaTransformConfiguration.builder()
+            
BigQueryWriteConfiguration.builder().setTable("not_a_valid_table_spec"),
+            BigQueryWriteConfiguration.builder()
                 .setTable("project:dataset.table")
                 .setCreateDisposition("INVALID_DISPOSITION"));
 
-    for (BigQueryStorageWriteApiSchemaTransformConfiguration.Builder config : 
invalidConfigs) {
+    for (BigQueryWriteConfiguration.Builder config : invalidConfigs) {
       assertThrows(
           Exception.class,
           () -> {
@@ -125,13 +130,11 @@ public class 
BigQueryStorageWriteApiSchemaTransformProviderTest {
     }
   }
 
-  public PCollectionRowTuple runWithConfig(
-      BigQueryStorageWriteApiSchemaTransformConfiguration config) {
+  public PCollectionRowTuple runWithConfig(BigQueryWriteConfiguration config) {
     return runWithConfig(config, ROWS);
   }
 
-  public PCollectionRowTuple runWithConfig(
-      BigQueryStorageWriteApiSchemaTransformConfiguration config, List<Row> 
inputRows) {
+  public PCollectionRowTuple runWithConfig(BigQueryWriteConfiguration config, 
List<Row> inputRows) {
     BigQueryStorageWriteApiSchemaTransformProvider provider =
         new BigQueryStorageWriteApiSchemaTransformProvider();
 
@@ -176,8 +179,8 @@ public class 
BigQueryStorageWriteApiSchemaTransformProviderTest {
   @Test
   public void testSimpleWrite() throws Exception {
     String tableSpec = "project:dataset.simple_write";
-    BigQueryStorageWriteApiSchemaTransformConfiguration config =
-        
BigQueryStorageWriteApiSchemaTransformConfiguration.builder().setTable(tableSpec).build();
+    BigQueryWriteConfiguration config =
+        BigQueryWriteConfiguration.builder().setTable(tableSpec).build();
 
     runWithConfig(config, ROWS);
     p.run().waitUntilFinish();
@@ -189,9 +192,9 @@ public class 
BigQueryStorageWriteApiSchemaTransformProviderTest {
 
   @Test
   public void testWriteToDynamicDestinations() throws Exception {
-    String dynamic = 
BigQueryStorageWriteApiSchemaTransformProvider.DYNAMIC_DESTINATIONS;
-    BigQueryStorageWriteApiSchemaTransformConfiguration config =
-        
BigQueryStorageWriteApiSchemaTransformConfiguration.builder().setTable(dynamic).build();
+    String dynamic = BigQueryWriteConfiguration.DYNAMIC_DESTINATIONS;
+    BigQueryWriteConfiguration config =
+        BigQueryWriteConfiguration.builder().setTable(dynamic).build();
 
     String baseTableSpec = "project:dataset.dynamic_write_";
 
@@ -273,8 +276,8 @@ public class 
BigQueryStorageWriteApiSchemaTransformProviderTest {
     String tableSpec = "project:dataset.cdc_write";
     List<String> primaryKeyColumns = ImmutableList.of("name");
 
-    BigQueryStorageWriteApiSchemaTransformConfiguration config =
-        BigQueryStorageWriteApiSchemaTransformConfiguration.builder()
+    BigQueryWriteConfiguration config =
+        BigQueryWriteConfiguration.builder()
             .setUseAtLeastOnceSemantics(true)
             .setTable(tableSpec)
             .setUseCdcWrites(true)
@@ -304,9 +307,9 @@ public class 
BigQueryStorageWriteApiSchemaTransformProviderTest {
   @Test
   public void testCDCWriteToDynamicDestinations() throws Exception {
     List<String> primaryKeyColumns = ImmutableList.of("name");
-    String dynamic = 
BigQueryStorageWriteApiSchemaTransformProvider.DYNAMIC_DESTINATIONS;
-    BigQueryStorageWriteApiSchemaTransformConfiguration config =
-        BigQueryStorageWriteApiSchemaTransformConfiguration.builder()
+    String dynamic = BigQueryWriteConfiguration.DYNAMIC_DESTINATIONS;
+    BigQueryWriteConfiguration config =
+        BigQueryWriteConfiguration.builder()
             .setUseAtLeastOnceSemantics(true)
             .setTable(dynamic)
             .setUseCdcWrites(true)
@@ -338,8 +341,8 @@ public class 
BigQueryStorageWriteApiSchemaTransformProviderTest {
   @Test
   public void testInputElementCount() throws Exception {
     String tableSpec = "project:dataset.input_count";
-    BigQueryStorageWriteApiSchemaTransformConfiguration config =
-        
BigQueryStorageWriteApiSchemaTransformConfiguration.builder().setTable(tableSpec).build();
+    BigQueryWriteConfiguration config =
+        BigQueryWriteConfiguration.builder().setTable(tableSpec).build();
 
     runWithConfig(config);
     PipelineResult result = p.run();
@@ -368,13 +371,11 @@ public class 
BigQueryStorageWriteApiSchemaTransformProviderTest {
   @Test
   public void testFailedRows() throws Exception {
     String tableSpec = "project:dataset.write_with_fail";
-    BigQueryStorageWriteApiSchemaTransformConfiguration config =
-        BigQueryStorageWriteApiSchemaTransformConfiguration.builder()
+    BigQueryWriteConfiguration config =
+        BigQueryWriteConfiguration.builder()
             .setTable(tableSpec)
             .setErrorHandling(
-                
BigQueryStorageWriteApiSchemaTransformConfiguration.ErrorHandling.builder()
-                    .setOutput("FailedRows")
-                    .build())
+                
BigQueryWriteConfiguration.ErrorHandling.builder().setOutput("FailedRows").build())
             .build();
 
     String failValue = "fail_me";
@@ -420,13 +421,11 @@ public class 
BigQueryStorageWriteApiSchemaTransformProviderTest {
   @Test
   public void testErrorCount() throws Exception {
     String tableSpec = "project:dataset.error_count";
-    BigQueryStorageWriteApiSchemaTransformConfiguration config =
-        BigQueryStorageWriteApiSchemaTransformConfiguration.builder()
+    BigQueryWriteConfiguration config =
+        BigQueryWriteConfiguration.builder()
             .setTable(tableSpec)
             .setErrorHandling(
-                
BigQueryStorageWriteApiSchemaTransformConfiguration.ErrorHandling.builder()
-                    .setOutput("FailedRows")
-                    .build())
+                
BigQueryWriteConfiguration.ErrorHandling.builder().setOutput("FailedRows").build())
             .build();
 
     Function<TableRow, Boolean> shouldFailRow =
@@ -456,4 +455,24 @@ public class 
BigQueryStorageWriteApiSchemaTransformProviderTest {
       assertEquals(expectedCount, count.getAttempted());
     }
   }
+
+  @Test
+  public void testManagedChoosesStorageApiForUnboundedWrites() {
+    PCollection<Row> batchInput =
+        
p.apply(TestStream.create(SCHEMA).addElements(ROWS.get(0)).advanceWatermarkToInfinity());
+    batchInput.apply(
+        Managed.write(Managed.BIGQUERY)
+            .withConfig(ImmutableMap.of("table", "project.dataset.table")));
+
+    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
+    List<RunnerApi.PTransform> writeTransformProto =
+        pipelineProto.getComponents().getTransformsMap().values().stream()
+            .filter(
+                tr ->
+                    tr.getUniqueName()
+                        
.contains(BigQueryStorageWriteApiSchemaTransform.class.getSimpleName()))
+            .collect(Collectors.toList());
+    assertThat(writeTransformProto.size(), greaterThan(0));
+    p.enableAbandonedNodeEnforcement(false);
+  }
 }
diff --git 
a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java 
b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
index 8477726686e..8e7e0862eff 100644
--- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
+++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
@@ -86,17 +86,20 @@ public class Managed {
   // TODO: Dynamically generate a list of supported transforms
   public static final String ICEBERG = "iceberg";
   public static final String KAFKA = "kafka";
+  public static final String BIGQUERY = "bigquery";
 
   // Supported SchemaTransforms
   public static final Map<String, String> READ_TRANSFORMS =
       ImmutableMap.<String, String>builder()
           .put(ICEBERG, 
getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_READ))
           .put(KAFKA, 
getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ))
+          .put(BIGQUERY, 
getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_READ))
           .build();
   public static final Map<String, String> WRITE_TRANSFORMS =
       ImmutableMap.<String, String>builder()
           .put(ICEBERG, 
getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_WRITE))
           .put(KAFKA, 
getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_WRITE))
+          .put(BIGQUERY, 
getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_WRITE))
           .build();
 
   /**
@@ -104,7 +107,9 @@ public class Managed {
    * supported managed sources are:
    *
    * <ul>
-   *   <li>{@link Managed#ICEBERG} : Read from Apache Iceberg
+   *   <li>{@link Managed#ICEBERG} : Read from Apache Iceberg tables
+   *   <li>{@link Managed#KAFKA} : Read from Apache Kafka topics
+   *   <li>{@link Managed#BIGQUERY} : Read from GCP BigQuery tables
    * </ul>
    */
   public static ManagedTransform read(String source) {
@@ -124,7 +129,9 @@ public class Managed {
    * managed sinks are:
    *
    * <ul>
-   *   <li>{@link Managed#ICEBERG} : Write to Apache Iceberg
+   *   <li>{@link Managed#ICEBERG} : Write to Apache Iceberg tables
+   *   <li>{@link Managed#KAFKA} : Write to Apache Kafka topics
+   *   <li>{@link Managed#BIGQUERY} : Write to GCP BigQuery tables
    * </ul>
    */
   public static ManagedTransform write(String sink) {
diff --git 
a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
 
b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
index 6f97983d326..b705306b947 100644
--- 
a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
+++ 
b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
@@ -117,7 +117,7 @@ public class ManagedSchemaTransformProvider
           "Please specify a config or a config URL, but not both.");
     }
 
-    public @Nullable String resolveUnderlyingConfig() {
+    private Map<String, Object> resolveUnderlyingConfig() {
       String yamlTransformConfig = getConfig();
       // If YAML string is empty, then attempt to read from YAML file
       if (Strings.isNullOrEmpty(yamlTransformConfig)) {
@@ -131,7 +131,8 @@ public class ManagedSchemaTransformProvider
           throw new RuntimeException(e);
         }
       }
-      return yamlTransformConfig;
+
+      return YamlUtils.yamlStringToMap(yamlTransformConfig);
     }
   }
 
@@ -152,34 +153,34 @@ public class ManagedSchemaTransformProvider
 
   static class ManagedSchemaTransform extends SchemaTransform {
     private final ManagedConfig managedConfig;
-    private final Row underlyingTransformConfig;
+    private final Row underlyingRowConfig;
     private final SchemaTransformProvider underlyingTransformProvider;
 
     ManagedSchemaTransform(
         ManagedConfig managedConfig, SchemaTransformProvider 
underlyingTransformProvider) {
       // parse config before expansion to check if it matches underlying 
transform's config schema
       Schema transformConfigSchema = 
underlyingTransformProvider.configurationSchema();
-      Row underlyingTransformConfig;
+      Row underlyingRowConfig;
       try {
-        underlyingTransformConfig = getRowConfig(managedConfig, 
transformConfigSchema);
+        underlyingRowConfig = getRowConfig(managedConfig, 
transformConfigSchema);
       } catch (Exception e) {
         throw new IllegalArgumentException(
             "Encountered an error when retrieving a Row configuration", e);
       }
 
-      this.managedConfig = managedConfig;
-      this.underlyingTransformConfig = underlyingTransformConfig;
+      this.underlyingRowConfig = underlyingRowConfig;
       this.underlyingTransformProvider = underlyingTransformProvider;
+      this.managedConfig = managedConfig;
     }
 
     @Override
     public PCollectionRowTuple expand(PCollectionRowTuple input) {
       LOG.debug(
-          "Building transform \"{}\" with Row configuration: {}",
+          "Building transform \"{}\" with configuration: {}",
           underlyingTransformProvider.identifier(),
-          underlyingTransformConfig);
+          underlyingRowConfig);
 
-      return 
input.apply(underlyingTransformProvider.from(underlyingTransformConfig));
+      return 
input.apply(underlyingTransformProvider.from(underlyingRowConfig));
     }
 
     public ManagedConfig getManagedConfig() {
@@ -201,16 +202,14 @@ public class ManagedSchemaTransformProvider
     }
   }
 
+  // May return an empty row (perhaps the underlying transform doesn't have 
any required
+  // parameters)
   @VisibleForTesting
   static Row getRowConfig(ManagedConfig config, Schema transformSchema) {
-    // May return an empty row (perhaps the underlying transform doesn't have 
any required
-    // parameters)
-    String yamlConfig = config.resolveUnderlyingConfig();
-    Map<String, Object> configMap = YamlUtils.yamlStringToMap(yamlConfig);
-
-    // The config Row object will be used to build the underlying 
SchemaTransform.
-    // If a mapping for the SchemaTransform exists, we use it to update 
parameter names and align
-    // with the underlying config schema
+    Map<String, Object> configMap = config.resolveUnderlyingConfig();
+    // Build a config Row that will be used to build the underlying 
SchemaTransform.
+    // If a mapping for the SchemaTransform exists, we use it to update 
parameter names to align
+    // with the underlying SchemaTransform config schema
     Map<String, String> mapping = 
MAPPINGS.get(config.getTransformIdentifier());
     if (mapping != null && configMap != null) {
       Map<String, Object> remappedConfig = new HashMap<>();
@@ -227,7 +226,7 @@ public class ManagedSchemaTransformProvider
     return YamlUtils.toBeamRow(configMap, transformSchema, false);
   }
 
-  // We load providers seperately, after construction, to prevent the
+  // We load providers separately, after construction, to prevent the
   // 'ManagedSchemaTransformProvider' from being initialized in a recursive 
loop
   // when being loaded using 'AutoValue'.
   synchronized Map<String, SchemaTransformProvider> getAllProviders() {
diff --git 
a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java
 
b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java
index 4cf752747be..30476a30d37 100644
--- 
a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java
+++ 
b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java
@@ -50,9 +50,27 @@ public class ManagedTransformConstants {
   private static final Map<String, String> KAFKA_WRITE_MAPPINGS =
       ImmutableMap.<String, String>builder().put("data_format", 
"format").build();
 
+  private static final Map<String, String> BIGQUERY_READ_MAPPINGS =
+      ImmutableMap.<String, String>builder()
+          .put("table", "table_spec")
+          .put("fields", "selected_fields")
+          .build();
+
+  private static final Map<String, String> BIGQUERY_WRITE_MAPPINGS =
+      ImmutableMap.<String, String>builder()
+          .put("at_least_once", "use_at_least_once_semantics")
+          .put("triggering_frequency", "triggering_frequency_seconds")
+          .build();
+
   public static final Map<String, Map<String, String>> MAPPINGS =
       ImmutableMap.<String, Map<String, String>>builder()
           .put(getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ), 
KAFKA_READ_MAPPINGS)
           .put(getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_WRITE), 
KAFKA_WRITE_MAPPINGS)
+          .put(
+              getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_READ),
+              BIGQUERY_READ_MAPPINGS)
+          .put(
+              getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_WRITE),
+              BIGQUERY_WRITE_MAPPINGS)
           .build();
 }
diff --git 
a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java
 
b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java
index e9edf8751e3..a287ec6260c 100644
--- 
a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java
+++ 
b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java
@@ -88,8 +88,7 @@ public class ManagedSchemaTransformProviderTest {
             .withFieldValue("extra_integer", 123)
             .build();
     Row configRow =
-        ManagedSchemaTransformProvider.getRowConfig(
-            config, new TestSchemaTransformProvider().configurationSchema());
+        ManagedSchemaTransformProvider.getRowConfig(config, 
TestSchemaTransformProvider.SCHEMA);
 
     assertEquals(expectedRow, configRow);
   }
diff --git a/sdks/python/apache_beam/transforms/managed.py 
b/sdks/python/apache_beam/transforms/managed.py
index 22ee15b1de1..cbcb6de56ed 100644
--- a/sdks/python/apache_beam/transforms/managed.py
+++ b/sdks/python/apache_beam/transforms/managed.py
@@ -77,12 +77,16 @@ from apache_beam.transforms.ptransform import PTransform
 
 ICEBERG = "iceberg"
 KAFKA = "kafka"
+BIGQUERY = "bigquery"
 _MANAGED_IDENTIFIER = "beam:transform:managed:v1"
 _EXPANSION_SERVICE_JAR_TARGETS = {
     "sdks:java:io:expansion-service:shadowJar": [KAFKA, ICEBERG],
+    "sdks:java:io:google-cloud-platform:expansion-service:shadowJar": [
+        BIGQUERY
+    ]
 }
 
-__all__ = ["ICEBERG", "KAFKA", "Read", "Write"]
+__all__ = ["ICEBERG", "KAFKA", "BIGQUERY", "Read", "Write"]
 
 
 class Read(PTransform):
@@ -90,6 +94,7 @@ class Read(PTransform):
   _READ_TRANSFORMS = {
       ICEBERG: ManagedTransforms.Urns.ICEBERG_READ.urn,
       KAFKA: ManagedTransforms.Urns.KAFKA_READ.urn,
+      BIGQUERY: ManagedTransforms.Urns.BIGQUERY_READ.urn
   }
 
   def __init__(
@@ -130,6 +135,7 @@ class Write(PTransform):
   _WRITE_TRANSFORMS = {
       ICEBERG: ManagedTransforms.Urns.ICEBERG_WRITE.urn,
       KAFKA: ManagedTransforms.Urns.KAFKA_WRITE.urn,
+      BIGQUERY: ManagedTransforms.Urns.BIGQUERY_WRITE.urn
   }
 
   def __init__(

Reply via email to