This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 11f9bce485c Generate external transform wrappers using a script 
(#29834)
11f9bce485c is described below

commit 11f9bce485c4f6fe466ff4bf5073d2414e43678c
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Thu Feb 22 16:13:55 2024 -0500

    Generate external transform wrappers using a script (#29834)
---
 ...=> beam_PostCommit_Python_Examples_Direct.json} |   0
 .../beam_PostCommit_Python_Xlang_Gcp_Dataflow.json |   1 +
 ...> beam_PostCommit_Python_Xlang_Gcp_Direct.json} |   0
 ... beam_PostCommit_Python_Xlang_IO_Dataflow.json} |   0
 .github/workflows/README.md                        |   1 +
 .../beam_PreCommit_Xlang_Generated_Transforms.yml  | 114 ++++++
 .gitignore                                         |   1 +
 CHANGES.md                                         |   1 +
 build.gradle.kts                                   |   5 +
 .../org/apache/beam/gradle/BeamModulePlugin.groovy | 100 ++---
 scripts/ci/release/test/resources/mass_comment.txt |   2 +
 .../GenerateSequenceSchemaTransformProvider.java   |   2 +-
 sdks/python/MANIFEST.in                            |   1 +
 sdks/python/apache_beam/io/__init__.py             |   1 +
 .../io/external/xlang_bigqueryio_it_test.py        |  23 +-
 .../apache_beam/io/gcp/bigtableio_it_test.py       |  22 +-
 .../transforms/external_transform_provider.py      |   8 +-
 .../external_transform_provider_it_test.py         | 413 +++++++++++++++++++
 .../transforms/external_transform_provider_test.py | 140 -------
 .../transforms/xlang/__init__.py}                  |  15 +-
 sdks/python/build.gradle                           |  19 +
 sdks/python/gen_xlang_wrappers.py                  | 447 +++++++++++++++++++++
 sdks/python/pyproject.toml                         |   6 +
 sdks/python/pytest.ini                             |   1 +
 sdks/python/python_xlang_wrapper.template          |  36 ++
 sdks/python/setup.py                               |  49 ++-
 sdks/python/test-suites/dataflow/common.gradle     |   5 +-
 sdks/python/test-suites/direct/build.gradle        |   7 +
 sdks/python/test-suites/direct/common.gradle       |   8 +-
 sdks/python/test-suites/xlang/build.gradle         |  56 +--
 sdks/python/tox.ini                                |   4 +-
 sdks/standard_expansion_services.yaml              |  77 ++++
 sdks/standard_external_transforms.yaml             |  52 +++
 33 files changed, 1330 insertions(+), 287 deletions(-)

diff --git 
a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json 
b/.github/trigger_files/beam_PostCommit_Python_Examples_Direct.json
similarity index 100%
copy from .github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json
copy to .github/trigger_files/beam_PostCommit_Python_Examples_Direct.json
diff --git 
a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json 
b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json
index e69de29bb2d..8b137891791 100644
--- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json
+++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json
@@ -0,0 +1 @@
+
diff --git 
a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json 
b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
similarity index 100%
copy from .github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json
copy to .github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
diff --git 
a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json 
b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json
similarity index 100%
copy from .github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json
copy to .github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json
diff --git a/.github/workflows/README.md b/.github/workflows/README.md
index f882553ef9b..42eaaaafdac 100644
--- a/.github/workflows/README.md
+++ b/.github/workflows/README.md
@@ -271,6 +271,7 @@ PreCommit Jobs run in a schedule and also get triggered in 
a PR if relevant sour
 | [ PreCommit Website 
](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Website.yml) 
| N/A |`Run Website PreCommit`| 
[![.github/workflows/beam_PreCommit_Website.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Website.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Website.yml?query=event%3Aschedule)
 |
 | [ PreCommit Website Stage GCS 
](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Website_Stage_GCS.yml)
 | N/A |`Run Website_Stage_GCS PreCommit`| 
[![.github/workflows/beam_PreCommit_Website_Stage_GCS.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Website_Stage_GCS.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Website_Stage_GCS.yml?query=event%3Aschedule)
 |
 | [ PreCommit Whitespace 
](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Whitespace.yml)
 | N/A |`Run Whitespace PreCommit`| 
[![.github/workflows/beam_PreCommit_Whitespace.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Whitespace.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Whitespace.yml?query=event%3Aschedule)
 |
+| [ PreCommit Xlang Generated Transforms 
](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml)
 | N/A |`Run Xlang_Generated_Transforms PreCommit`| 
[![.github/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml?query=event%3Asch
 [...]
 
 ### PostCommit Jobs
 
diff --git a/.github/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml 
b/.github/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml
new file mode 100644
index 00000000000..f8d64eb5d4a
--- /dev/null
+++ b/.github/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml
@@ -0,0 +1,114 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: PreCommit Xlang Generated Transforms
+
+on:
+  push:
+    tags: ['v*']
+    branches: ['master', 'release-*']
+    paths:
+      - 'model/**'
+      - 'sdks/python/**'
+      - 'sdks/java/expansion-service/**'
+      - 'sdks/java/core/**'
+      - 'sdks/java/io/**'
+      - 'sdks/java/extensions/sql/**'
+      - 'release/**'
+      - '.github/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml'
+  pull_request_target:
+    branches: ['master', 'release-*']
+    paths:
+      - 'model/**'
+      - 'sdks/python/**'
+      - 'sdks/java/expansion-service/**'
+      - 'sdks/java/core/**'
+      - 'sdks/java/io/**'
+      - 'sdks/java/extensions/sql/**'
+      - 'release/**'
+      - 'release/trigger_all_tests.json'
+      - '.github/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml'
+  issue_comment:
+    types: [created]
+  schedule:
+    - cron: '30 2/6 * * *'
+  workflow_dispatch:
+
+#Setting explicit permissions for the action to avoid the default permissions 
which are `write-all` in case of pull_request_target event
+permissions:
+  actions: write
+  pull-requests: read
+  checks: read
+  contents: read
+  deployments: read
+  id-token: none
+  issues: read
+  discussions: read
+  packages: read
+  pages: read
+  repository-projects: read
+  security-events: read
+  statuses: read
+
+# This allows a subsequently queued workflow run to interrupt previous runs
+concurrency:
+  group: '${{ github.workflow }} @ ${{ github.event.issue.number || 
github.event.pull_request.head.label || github.sha || github.head_ref || 
github.ref }}-${{ github.event.schedule || github.event.comment.id || 
github.event.sender.login }}'
+  cancel-in-progress: true
+
+env:
+  GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
+  GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }}
+  GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}
+
+jobs:
+  beam_PreCommit_Xlang_Generated_Transforms:
+    name: ${{ matrix.job_name }} (${{ matrix.job_phrase }} ${{ 
matrix.python_version }})
+    timeout-minutes: 120
+    runs-on: ['self-hosted', ubuntu-20.04, main]
+    strategy:
+      fail-fast: false
+      matrix:
+        job_name: ['beam_PreCommit_Xlang_Generated_Transforms']
+        job_phrase: ['Run Xlang_Generated_Transforms PreCommit']
+        python_version: ['3.8']
+    if: |
+      github.event_name == 'push' ||
+      github.event_name == 'workflow_dispatch' ||
+      github.event_name == 'pull_request_target' ||
+      (github.event_name == 'schedule' && github.repository == 'apache/beam') 
||
+      startsWith(github.event.comment.body, 'Run Xlang_Generated_Transforms 
PreCommit')
+    steps:
+      - uses: actions/checkout@v4
+      - name: Setup repository
+        uses: ./.github/actions/setup-action
+        with:
+          comment_phrase: ${{ matrix.job_phrase }}
+          github_token: ${{ secrets.GITHUB_TOKEN }}
+          github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }}
+      - name: Setup environment
+        uses: ./.github/actions/setup-environment-action
+        with:
+          java-version: 8
+          python-version: ${{ matrix.python_version }}
+      - name: Set PY_VER_CLEAN
+        id: set_py_ver_clean
+        run: |
+          PY_VER=${{ matrix.python_version }}
+          PY_VER_CLEAN=${PY_VER//.}
+          echo "py_ver_clean=$PY_VER_CLEAN" >> $GITHUB_OUTPUT
+      - name: run Cross-Language Wrapper Validation script
+        uses: ./.github/actions/gradle-command-self-hosted-action
+        with:
+          gradle-command: 
:sdks:python:test-suites:direct:crossLanguageWrapperValidationPreCommit
\ No newline at end of file
diff --git a/.gitignore b/.gitignore
index 3f7d7af44a8..c76441078f3 100644
--- a/.gitignore
+++ b/.gitignore
@@ -52,6 +52,7 @@ sdks/python/**/*.egg
 sdks/python/LICENSE
 sdks/python/NOTICE
 sdks/python/README.md
+sdks/python/apache_beam/transforms/xlang/*
 sdks/python/apache_beam/portability/api/*
 sdks/python/apache_beam/yaml/docs/*
 sdks/python/nosetests*.xml
diff --git a/CHANGES.md b/CHANGES.md
index 249897b1697..2e23d72e664 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -58,6 +58,7 @@
 
 * New highly anticipated feature X added to Python SDK 
([#X](https://github.com/apache/beam/issues/X)).
 * New highly anticipated feature Y added to Java SDK 
([#Y](https://github.com/apache/beam/issues/Y)).
+* The Python SDK will now include automatically generated wrappers for 
external Java transforms! ([#29834](https://github.com/apache/beam/pull/29834))
 
 ## I/Os
 
diff --git a/build.gradle.kts b/build.gradle.kts
index 67653e59c0d..788d9a48dc0 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -647,6 +647,11 @@ tasks.register("checkSetup") {
   dependsOn(":examples:java:wordCount")
 }
 
+// Generates external transform config
+project.tasks.register("generateExternalTransformsConfig") {
+  dependsOn(":sdks:python:generateExternalTransformsConfig")
+}
+
 // Configure the release plugin to do only local work; the release manager 
determines what, if
 // anything, to push. On failure, the release manager can reset the branch 
without pushing.
 release {
diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 5b73940b99d..6434746fd3a 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -321,19 +321,17 @@ class BeamModulePlugin implements Plugin<Project> {
 
   // A class defining the common properties in a given suite of cross-language 
tests
   // Properties are shared across runners and are used when creating a 
CrossLanguageUsingJavaExpansionConfiguration object
-  static class CrossLanguageTaskCommon {
+  static class CrossLanguageTask {
     // Used as the task name for cross-language
     String name
-    // The expansion service's project path (required)
-    String expansionProjectPath
+    // List of project paths for required expansion services
+    List<String> expansionProjectPaths
     // Collect Python pipeline tests with this marker
     String collectMarker
-    // Job server startup task.
-    TaskProvider startJobServer
-    // Job server cleanup task.
-    TaskProvider cleanupJobServer
-    // any additional environment variables specific to the suite of tests
+    // Additional environment variables to set before running tests
     Map<String,String> additionalEnvs
+    // Additional Python dependencies to install before running tests
+    List<String> additionalDeps
   }
 
   // A class defining the configuration for CrossLanguageUsingJavaExpansion.
@@ -349,18 +347,16 @@ class BeamModulePlugin implements Plugin<Project> {
     ]
     // Additional pytest options
     List<String> pytestOptions = []
-    // Job server startup task.
-    TaskProvider startJobServer
-    // Job server cleanup task.
-    TaskProvider cleanupJobServer
     // Number of parallel test runs.
     Integer numParallelTests = 1
-    // Project path for the expansion service to start up
-    String expansionProjectPath
+    // List of project paths for required expansion services
+    List<String> expansionProjectPaths
     // Collect Python pipeline tests with this marker
     String collectMarker
     // any additional environment variables to be exported
     Map<String,String> additionalEnvs
+    // Additional Python dependencies to install before running tests
+    List<String> additionalDeps
   }
 
   // A class defining the configuration for CrossLanguageValidatesRunner.
@@ -2576,7 +2572,7 @@ class BeamModulePlugin implements Plugin<Project> {
     /** 
***********************************************************************************************/
     // Method to create the createCrossLanguageUsingJavaExpansionTask.
     // The method takes CrossLanguageUsingJavaExpansionConfiguration as 
parameter.
-    // This method creates a task that runs Python SDK pipeline tests that use 
Java transforms via an input expansion service
+    // This method creates a task that runs Python SDK test-suites that use 
external Java transforms
     project.ext.createCrossLanguageUsingJavaExpansionTask = {
       // This task won't work if the python build file doesn't exist.
       if (!project.project(":sdks:python").buildFile.exists()) {
@@ -2586,49 +2582,29 @@ class BeamModulePlugin implements Plugin<Project> {
       def config = it ? it as CrossLanguageUsingJavaExpansionConfiguration : 
new CrossLanguageUsingJavaExpansionConfiguration()
 
       project.evaluationDependsOn(":sdks:python")
-      project.evaluationDependsOn(config.expansionProjectPath)
+      for (path in config.expansionProjectPaths) {
+        project.evaluationDependsOn(path)
+      }
       project.evaluationDependsOn(":sdks:java:extensions:python")
 
-      // Setting up args to launch the expansion service
       def pythonDir = project.project(":sdks:python").projectDir
-      def javaExpansionPort = -1 // will be populated in setupTask
-      def expansionJar = 
project.project(config.expansionProjectPath).shadowJar.archivePath
-      def javaClassLookupAllowlistFile = 
project.project(config.expansionProjectPath).projectDir.getPath()
-      def expansionServiceOpts = [
-        "group_id": project.name,
-        "java_expansion_service_jar": expansionJar,
-        "java_expansion_service_allowlist_file": javaClassLookupAllowlistFile,
-      ]
       def usesDataflowRunner = 
config.pythonPipelineOptions.contains("--runner=TestDataflowRunner") || 
config.pythonPipelineOptions.contains("--runner=DataflowRunner")
       def javaContainerSuffix = getSupportedJavaVersion()
 
-      // 1. Builds the chosen expansion service jar and launches it
-      def setupTask = project.tasks.register(config.name+"Setup") {
-        dependsOn ':sdks:java:container:' + javaContainerSuffix + ':docker'
-        dependsOn 
project.project(config.expansionProjectPath).shadowJar.getPath()
-        dependsOn 'installGcpTest'
+      // Sets up, collects, and runs Python pipeline tests
+      project.tasks.register(config.name+"PythonUsingJava") {
+        group = "Verification"
+        description = "Runs Python SDK pipeline tests that use a Java 
expansion service"
+        // Each expansion service we use needs to be built before running 
these tests
+        // The built jars will be started up automatically using the 
BeamJarExpansionService utility
+        for (path in config.expansionProjectPaths) {
+          dependsOn project.project(path).shadowJar.getPath()
+        }
+        dependsOn ":sdks:java:container:$javaContainerSuffix:docker"
+        dependsOn "installGcpTest"
         if (usesDataflowRunner) {
           dependsOn 
":sdks:python:test-suites:dataflow:py${project.ext.pythonVersion.replace('.', 
'')}:initializeForDataflowJob"
         }
-        doLast {
-          project.exec {
-            // Prepare a port to use for the expansion service
-            javaExpansionPort = getRandomPort()
-            expansionServiceOpts.put("java_port", javaExpansionPort)
-            // setup test env
-            def serviceArgs = 
project.project(':sdks:python').mapToArgString(expansionServiceOpts)
-            executable 'sh'
-            args '-c', ". ${project.ext.envdir}/bin/activate && 
$pythonDir/scripts/run_expansion_services.sh stop --group_id ${project.name} && 
$pythonDir/scripts/run_expansion_services.sh start $serviceArgs"
-          }
-        }
-      }
-
-      // 2. Sets up, collects, and runs Python pipeline tests
-      def pythonTask = project.tasks.register(config.name+"PythonUsingJava") {
-        group = "Verification"
-        description = "Runs Python SDK pipeline tests that use a Java 
expansion service"
-        dependsOn setupTask
-        dependsOn config.startJobServer
         doLast {
           def beamPythonTestPipelineOptions = [
             "pipeline_opts": config.pythonPipelineOptions + 
(usesDataflowRunner ? [
@@ -2641,29 +2617,19 @@ class BeamModulePlugin implements Plugin<Project> {
           def cmdArgs = 
project.project(':sdks:python').mapToArgString(beamPythonTestPipelineOptions)
 
           project.exec {
-            environment "EXPANSION_JAR", expansionJar
-            environment "EXPANSION_PORT", javaExpansionPort
-            for (envs in config.additionalEnvs){
-              environment envs.getKey(), envs.getValue()
+            // environment variable to indicate that jars have been built
+            environment "EXPANSION_JARS", config.expansionProjectPaths
+            String additionalDependencyCmd = ""
+            if (config.additionalDeps != null && 
!config.additionalDeps.isEmpty()){
+              additionalDependencyCmd = "&& pip install 
${config.additionalDeps.join(' ')} "
             }
             executable 'sh'
-            args '-c', ". ${project.ext.envdir}/bin/activate && cd $pythonDir 
&& ./scripts/run_integration_test.sh $cmdArgs"
+            args '-c', ". ${project.ext.envdir}/bin/activate " +
+                additionalDependencyCmd +
+                "&& cd $pythonDir && ./scripts/run_integration_test.sh 
$cmdArgs"
           }
         }
       }
-
-      // 3. Shuts down the expansion service
-      def cleanupTask = project.tasks.register(config.name+'Cleanup', Exec) {
-        // teardown test env
-        executable 'sh'
-        args '-c', ". ${project.ext.envdir}/bin/activate && 
$pythonDir/scripts/run_expansion_services.sh stop --group_id ${project.name}"
-      }
-
-      setupTask.configure {finalizedBy cleanupTask}
-      config.startJobServer.configure {finalizedBy config.cleanupJobServer}
-
-      cleanupTask.configure{mustRunAfter pythonTask}
-      config.cleanupJobServer.configure{mustRunAfter pythonTask}
     }
 
     /** 
***********************************************************************************************/
diff --git a/scripts/ci/release/test/resources/mass_comment.txt 
b/scripts/ci/release/test/resources/mass_comment.txt
index 93468b0c961..1f6f340eb0b 100644
--- a/scripts/ci/release/test/resources/mass_comment.txt
+++ b/scripts/ci/release/test/resources/mass_comment.txt
@@ -79,6 +79,7 @@ Run PythonDocs PreCommit
 Run PythonFormatter PreCommit
 Run PythonLint PreCommit
 Run Python_PVR_Flink PreCommit
+Run Python_Xlang_Gcp_Direct PostCommit
 Run RAT PreCommit
 Run SQL PostCommit
 Run SQL PreCommit
@@ -94,6 +95,7 @@ Run Twister2 ValidatesRunner
 Run Typescript PreCommit
 Run ULR Loopback ValidatesRunner
 Run Whitespace PreCommit
+Run Xlang_Generated_Transforms PreCommit
 Run XVR_Direct PostCommit
 Run XVR_Flink PostCommit
 Run XVR_JavaUsingPython_Dataflow PostCommit
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/providers/GenerateSequenceSchemaTransformProvider.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/providers/GenerateSequenceSchemaTransformProvider.java
index 4b693f883fb..d9dfc2a90bd 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/providers/GenerateSequenceSchemaTransformProvider.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/providers/GenerateSequenceSchemaTransformProvider.java
@@ -66,7 +66,7 @@ public class GenerateSequenceSchemaTransformProvider
   public String description() {
     return String.format(
         "Outputs a PCollection of Beam Rows, each containing a single INT64 "
-            + "number called \"value\". The count is produced from the given 
\"start\""
+            + "number called \"value\". The count is produced from the given 
\"start\" "
             + "value and either up to the given \"end\" or until 2^63 - 1.%n"
             + "To produce an unbounded PCollection, simply do not specify an 
\"end\" value. "
             + "Unbounded sequences can specify a \"rate\" for output 
elements.%n"
diff --git a/sdks/python/MANIFEST.in b/sdks/python/MANIFEST.in
index 60b0989d9af..d40273a9340 100644
--- a/sdks/python/MANIFEST.in
+++ b/sdks/python/MANIFEST.in
@@ -16,6 +16,7 @@
 #
 
 include gen_protos.py
+include gen_xlang_wrappers.py
 include README.md
 include NOTICE
 include LICENSE
diff --git a/sdks/python/apache_beam/io/__init__.py 
b/sdks/python/apache_beam/io/__init__.py
index 4945da97d90..83d45d81a5a 100644
--- a/sdks/python/apache_beam/io/__init__.py
+++ b/sdks/python/apache_beam/io/__init__.py
@@ -36,6 +36,7 @@ try:
   from apache_beam.io.gcp.bigquery import *
   from apache_beam.io.gcp.pubsub import *
   from apache_beam.io.gcp import gcsio
+  from apache_beam.transforms.xlang.io import *
 except ImportError:
   pass
 # pylint: enable=wrong-import-order, wrong-import-position
diff --git a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py 
b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py
index c1e9754526e..cfbb411b4e5 100644
--- a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py
+++ b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py
@@ -53,6 +53,10 @@ _LOGGER = logging.getLogger(__name__)
 
 
 @pytest.mark.uses_gcp_java_expansion_service
[email protected](
+    os.environ.get('EXPANSION_JARS'),
+    "EXPANSION_JARS environment var is not provided, "
+    "indicating that jars have not been built")
 class BigQueryXlangStorageWriteIT(unittest.TestCase):
   BIGQUERY_DATASET = 'python_xlang_storage_write'
 
@@ -112,10 +116,6 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase):
     _LOGGER.info(
         "Created dataset %s in project %s", self.dataset_id, self.project)
 
-    self.assertTrue(
-        os.environ.get('EXPANSION_PORT'), "Expansion service port not found!")
-    self.expansion_service = ('localhost:%s' % 
os.environ.get('EXPANSION_PORT'))
-
   def tearDown(self):
     try:
       _LOGGER.info(
@@ -162,8 +162,7 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase):
               table=table_id,
               method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
               schema=schema,
-              use_at_least_once=use_at_least_once,
-              expansion_service=self.expansion_service))
+              use_at_least_once=use_at_least_once))
     hamcrest_assert(p, bq_matcher)
 
   def test_all_types(self):
@@ -243,8 +242,7 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase):
       _ = (
           p
           | beam.Create(row_elements)
-          | StorageWriteToBigQuery(
-              table=table_id, expansion_service=self.expansion_service))
+          | StorageWriteToBigQuery(table=table_id))
     hamcrest_assert(p, bq_matcher)
 
   def test_write_to_dynamic_destinations(self):
@@ -268,8 +266,7 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase):
               table=lambda record: spec_with_project + str(record['int']),
               method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
               schema=self.ALL_TYPES_SCHEMA,
-              use_at_least_once=False,
-              expansion_service=self.expansion_service))
+              use_at_least_once=False))
     hamcrest_assert(p, all_of(*bq_matchers))
 
   def test_write_to_dynamic_destinations_with_beam_rows(self):
@@ -303,8 +300,7 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase):
           | beam.io.WriteToBigQuery(
               table=lambda record: spec_with_project + str(record.my_int),
               method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
-              use_at_least_once=False,
-              expansion_service=self.expansion_service))
+              use_at_least_once=False))
     hamcrest_assert(p, all_of(*bq_matchers))
 
   def run_streaming(self, table_name, num_streams=0, use_at_least_once=False):
@@ -335,8 +331,7 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase):
               triggering_frequency=1,
               with_auto_sharding=auto_sharding,
               num_storage_api_streams=num_streams,
-              use_at_least_once=use_at_least_once,
-              expansion_service=self.expansion_service))
+              use_at_least_once=use_at_least_once))
     hamcrest_assert(p, bq_matcher)
 
   def skip_if_not_dataflow_runner(self) -> bool:
diff --git a/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py 
b/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py
index 867dca9a5e7..13909cded1f 100644
--- a/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py
@@ -58,10 +58,11 @@ def instance_prefix(instance):
 
 @pytest.mark.uses_gcp_java_expansion_service
 @pytest.mark.uses_transform_service
[email protected](
-    os.environ.get('EXPANSION_PORT'),
-    "EXPANSION_PORT environment var is not provided.")
 @unittest.skipIf(client is None, 'Bigtable dependencies are not installed')
[email protected](
+    os.environ.get('EXPANSION_JARS'),
+    "EXPANSION_JARS environment var is not provided, "
+    "indicating that jars have not been built")
 class TestReadFromBigTableIT(unittest.TestCase):
   INSTANCE = "bt-read-tests"
   TABLE_ID = "test-table"
@@ -70,7 +71,6 @@ class TestReadFromBigTableIT(unittest.TestCase):
     self.test_pipeline = TestPipeline(is_integration_test=True)
     self.args = self.test_pipeline.get_full_options_as_args()
     self.project = self.test_pipeline.get_option('project')
-    self.expansion_service = ('localhost:%s' % 
os.environ.get('EXPANSION_PORT'))
 
     instance_id = instance_prefix(self.INSTANCE)
 
@@ -141,8 +141,7 @@ class TestReadFromBigTableIT(unittest.TestCase):
           | bigtableio.ReadFromBigtable(
               project_id=self.project,
               instance_id=self.instance.instance_id,
-              table_id=self.table.table_id,
-              expansion_service=self.expansion_service)
+              table_id=self.table.table_id)
           | "Extract cells" >> beam.Map(lambda row: row._cells))
 
       assert_that(cells, equal_to(expected_cells))
@@ -150,10 +149,11 @@ class TestReadFromBigTableIT(unittest.TestCase):
 
 @pytest.mark.uses_gcp_java_expansion_service
 @pytest.mark.uses_transform_service
[email protected](
-    os.environ.get('EXPANSION_PORT'),
-    "EXPANSION_PORT environment var is not provided.")
 @unittest.skipIf(client is None, 'Bigtable dependencies are not installed')
[email protected](
+    os.environ.get('EXPANSION_JARS'),
+    "EXPANSION_JARS environment var is not provided, "
+    "indicating that jars have not been built")
 class TestWriteToBigtableXlangIT(unittest.TestCase):
   # These are integration tests for the cross-language write transform.
   INSTANCE = "bt-write-xlang"
@@ -164,7 +164,6 @@ class TestWriteToBigtableXlangIT(unittest.TestCase):
     cls.test_pipeline = TestPipeline(is_integration_test=True)
     cls.project = cls.test_pipeline.get_option('project')
     cls.args = cls.test_pipeline.get_full_options_as_args()
-    cls.expansion_service = ('localhost:%s' % os.environ.get('EXPANSION_PORT'))
 
     instance_id = instance_prefix(cls.INSTANCE)
 
@@ -215,8 +214,7 @@ class TestWriteToBigtableXlangIT(unittest.TestCase):
               project_id=self.project,
               instance_id=self.instance.instance_id,
               table_id=self.table.table_id,
-              use_cross_language=True,
-              expansion_service=self.expansion_service))
+              use_cross_language=True))
 
   def test_set_mutation(self):
     row1: DirectRow = DirectRow('key-1')
diff --git a/sdks/python/apache_beam/transforms/external_transform_provider.py 
b/sdks/python/apache_beam/transforms/external_transform_provider.py
index 26cc31471e6..2799bd1b9e9 100644
--- a/sdks/python/apache_beam/transforms/external_transform_provider.py
+++ b/sdks/python/apache_beam/transforms/external_transform_provider.py
@@ -144,7 +144,7 @@ class ExternalTransformProvider:
 
   A :class:`ExternalTransform` subclass is generated for each external
   transform, and is named based on what can be inferred from the URN
-  (see :param urn_pattern).
+  (see the `urn_pattern` parameter).
 
   These classes are generated when :class:`ExternalTransformProvider` is
   initialized. We need to give it one or more expansion service addresses that
@@ -256,7 +256,7 @@ class ExternalTransformProvider:
 
       if skipped_urns:
         logging.info(
-            "Skipped URN(s) in %s that don't follow the pattern [%s]: %s",
+            "Skipped URN(s) in %s that don't follow the pattern \"%s\": %s",
             target,
             self._urn_pattern,
             skipped_urns)
@@ -268,6 +268,10 @@ class ExternalTransformProvider:
     """Get a list of available ExternalTransform names and identifiers"""
     return list(self._name_to_urn.items())
 
+  def get_all(self) -> Dict[str, ExternalTransform]:
+    """Get all ExternalTransform"""
+    return self._transforms
+
   def get(self, name) -> ExternalTransform:
     """Get an ExternalTransform by its inferred class name"""
     return self._transforms[self._name_to_urn[name]]
diff --git 
a/sdks/python/apache_beam/transforms/external_transform_provider_it_test.py 
b/sdks/python/apache_beam/transforms/external_transform_provider_it_test.py
new file mode 100644
index 00000000000..a53001c85fd
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/external_transform_provider_it_test.py
@@ -0,0 +1,413 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import importlib
+import logging
+import os
+import secrets
+import shutil
+import time
+import typing
+import unittest
+from os.path import dirname
+
+import numpy
+import pytest
+import yaml
+
+import apache_beam as beam
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+from apache_beam.transforms.external import BeamJarExpansionService
+from apache_beam.transforms.external_transform_provider import 
STANDARD_URN_PATTERN
+from apache_beam.transforms.external_transform_provider import 
ExternalTransform
+from apache_beam.transforms.external_transform_provider import 
ExternalTransformProvider
+from apache_beam.transforms.external_transform_provider import 
camel_case_to_snake_case
+from apache_beam.transforms.external_transform_provider import 
infer_name_from_identifier
+from apache_beam.transforms.external_transform_provider import 
snake_case_to_lower_camel_case
+from apache_beam.transforms.external_transform_provider import 
snake_case_to_upper_camel_case
+from apache_beam.transforms.xlang.io import GenerateSequence
+
+
+class NameAndTypeUtilsTest(unittest.TestCase):
+  def test_snake_case_to_upper_camel_case(self):
+    test_cases = [("", ""), ("test", "Test"), ("test_name", "TestName"),
+                  ("test_double_underscore", "TestDoubleUnderscore"),
+                  ("TEST_CAPITALIZED", "TestCapitalized"),
+                  ("_prepended_underscore", "PrependedUnderscore"),
+                  ("appended_underscore_", "AppendedUnderscore")]
+    for case in test_cases:
+      self.assertEqual(case[1], snake_case_to_upper_camel_case(case[0]))
+
+  def test_snake_case_to_lower_camel_case(self):
+    test_cases = [("", ""), ("test", "test"), ("test_name", "testName"),
+                  ("test_double_underscore", "testDoubleUnderscore"),
+                  ("TEST_CAPITALIZED", "testCapitalized"),
+                  ("_prepended_underscore", "prependedUnderscore"),
+                  ("appended_underscore_", "appendedUnderscore")]
+    for case in test_cases:
+      self.assertEqual(case[1], snake_case_to_lower_camel_case(case[0]))
+
+  def test_camel_case_to_snake_case(self):
+    test_cases = [("", ""), ("Test", "test"), ("TestName", "test_name"),
+                  ("TestDoubleUnderscore",
+                   "test_double_underscore"), ("MyToLoFo", "my_to_lo_fo"),
+                  ("BEGINNINGAllCaps",
+                   "beginning_all_caps"), ("AllCapsENDING", "all_caps_ending"),
+                  ("AllCapsMIDDLEWord", "all_caps_middle_word"),
+                  ("lowerCamelCase", "lower_camel_case")]
+    for case in test_cases:
+      self.assertEqual(case[1], camel_case_to_snake_case(case[0]))
+
+  def test_infer_name_from_identifier(self):
+    standard_test_cases = [
+        ("beam:schematransform:org.apache.beam:transform:v1", "Transform"),
+        ("beam:schematransform:org.apache.beam:my_transform:v1",
+         "MyTransform"), (
+             "beam:schematransform:org.apache.beam:my_transform:v2",
+             "MyTransformV2"),
+        ("beam:schematransform:org.apache.beam:fe_fi_fo_fum:v2", 
"FeFiFoFumV2"),
+        ("beam:schematransform:bad_match:my_transform:v1", None)
+    ]
+    for case in standard_test_cases:
+      self.assertEqual(
+          case[1], infer_name_from_identifier(case[0], STANDARD_URN_PATTERN))
+
+    custom_pattern_cases = [
+        # (<pattern>, <urn>, <expected output>)
+        (
+            r"^custom:transform:([\w-]+):(\w+)$",
+            "custom:transform:my_transform:v1",
+            "MyTransformV1"),
+        (
+            r"^org.user:([\w-]+):([\w-]+):([\w-]+):external$",
+            "org.user:some:custom_transform:we_made:external",
+            "SomeCustomTransformWeMade"),
+        (
+            r"^([\w-]+):user.transforms",
+            "my_eXTErnal:user.transforms",
+            "MyExternal"),
+        (r"^([\w-]+):user.transforms", "my_external:badinput.transforms", 
None),
+    ]
+    for case in custom_pattern_cases:
+      self.assertEqual(case[2], infer_name_from_identifier(case[1], case[0]))
+
+
[email protected]_io_java_expansion_service
[email protected](
+    os.environ.get('EXPANSION_JARS'),
+    "EXPANSION_JARS environment var is not provided, "
+    "indicating that jars have not been built")
+class ExternalTransformProviderIT(unittest.TestCase):
+  def test_generate_sequence_config_schema_and_description(self):
+    provider = ExternalTransformProvider(
+        BeamJarExpansionService(":sdks:java:io:expansion-service:shadowJar"))
+
+    self.assertTrue((
+        'GenerateSequence',
+        'beam:schematransform:org.apache.beam:generate_sequence:v1'
+    ) in provider.get_available())
+
+    GenerateSequence = provider.get('GenerateSequence')
+    config_schema = GenerateSequence.configuration_schema
+    for param in ['start', 'end', 'rate']:
+      self.assertTrue(param in config_schema)
+
+    description_substring = (
+        "Outputs a PCollection of Beam Rows, each "
+        "containing a single INT64")
+    self.assertTrue(description_substring in GenerateSequence.description)
+
+  def test_run_generate_sequence(self):
+    provider = ExternalTransformProvider(
+        BeamJarExpansionService(":sdks:java:io:expansion-service:shadowJar"))
+
+    with beam.Pipeline() as p:
+      numbers = p | provider.GenerateSequence(
+          start=0, end=10) | beam.Map(lambda row: row.value)
+
+      assert_that(numbers, equal_to([i for i in range(10)]))
+
+
[email protected]_wrapper_generation
[email protected](
+    os.environ.get('EXPANSION_JARS'),
+    "EXPANSION_JARS environment var is not provided, "
+    "indicating that jars have not been built")
+class AutoGenerationScriptIT(unittest.TestCase):
+  """
+  This class tests the generation and regeneration operations in
+  `gen_xlang_wrappers.py`.
+  """
+
+  # tests cases will use GenerateSequence
+  GEN_SEQ_IDENTIFIER = \
+    'beam:schematransform:org.apache.beam:generate_sequence:v1'
+
+  def setUp(self):
+    # import script from top-level sdks/python directory
+    self.sdk_dir = os.path.abspath(dirname(dirname(dirname(__file__))))
+    spec = importlib.util.spec_from_file_location(
+        'gen_xlang_wrappers',
+        os.path.join(self.sdk_dir, 'gen_xlang_wrappers.py'))
+    self.script = importlib.util.module_from_spec(spec)
+    spec.loader.exec_module(self.script)
+    args = TestPipeline(is_integration_test=True).get_full_options_as_args()
+    runner = PipelineOptions(args).get_all_options()['runner']
+    if runner and "direct" not in runner.lower():
+      self.skipTest(
+          "It is sufficient to run this test in the DirectRunner "
+          "test suite only.")
+
+    self.test_dir_name = 'test_gen_script_%d_%s' % (
+        int(time.time()), secrets.token_hex(3))
+    self.test_dir = os.path.join(
+        os.path.abspath(dirname(__file__)), self.test_dir_name)
+    self.service_config_path = os.path.join(
+        self.test_dir, "test_expansion_service_config.yaml")
+    self.transform_config_path = os.path.join(
+        self.test_dir, "test_transform_config.yaml")
+    os.mkdir(self.test_dir)
+
+  def tearDown(self):
+    shutil.rmtree(self.test_dir, ignore_errors=False)
+
+  def delete_and_validate(self):
+    self.script.delete_generated_files(self.test_dir)
+    self.assertEqual(len(os.listdir(self.test_dir)), 0)
+
+  def test_script_fails_with_invalid_destinations(self):
+    expansion_service_config = {
+        "gradle_target": 'sdks:java:io:expansion-service:shadowJar',
+        'destinations': {
+            'python': 'apache_beam/some_nonexistent_dir'
+        }
+    }
+    with self.assertRaises(ValueError):
+      self.create_and_check_transforms_config_exists(expansion_service_config)
+
+  def test_pretty_types(self):
+    types = [
+        typing.Optional[typing.List[str]],
+        numpy.int16,
+        str,
+        typing.Dict[str, numpy.float64],
+        typing.Optional[typing.Dict[str, typing.List[numpy.int64]]],
+        typing.Dict[int, typing.Optional[str]]
+    ]
+
+    expected_type_names = [('List[str]', True), ('numpy.int16', False),
+                           ('str', False), ('Dict[str, numpy.float64]', False),
+                           ('Dict[str, List[numpy.int64]]', True),
+                           ('Dict[int, Union[str, NoneType]]', False)]
+
+    for i in range(len(types)):
+      self.assertEqual(
+          self.script.pretty_type(types[i]), expected_type_names[i])
+
+  def create_and_check_transforms_config_exists(self, 
expansion_service_config):
+    with open(self.service_config_path, 'w') as f:
+      yaml.dump([expansion_service_config], f)
+
+    self.script.generate_transforms_config(
+        self.service_config_path, self.transform_config_path)
+    self.assertTrue(os.path.exists(self.transform_config_path))
+
+  def create_and_validate_transforms_config(
+      self, expansion_service_config, expected_name, expected_destination):
+    self.create_and_check_transforms_config_exists(expansion_service_config)
+
+    with open(self.transform_config_path) as f:
+      configs = yaml.safe_load(f)
+      gen_seq_config = None
+      for config in configs:
+        if config['identifier'] == self.GEN_SEQ_IDENTIFIER:
+          gen_seq_config = config
+      self.assertIsNotNone(gen_seq_config)
+      self.assertEqual(
+          gen_seq_config['default_service'],
+          expansion_service_config['gradle_target'])
+      self.assertEqual(gen_seq_config['name'], expected_name)
+      self.assertEqual(
+          gen_seq_config['destinations']['python'], expected_destination)
+      self.assertIn("end", gen_seq_config['fields'])
+      self.assertIn("start", gen_seq_config['fields'])
+      self.assertIn("rate", gen_seq_config['fields'])
+
+  def get_module(self, dest):
+    module_name = dest.replace('apache_beam/', '').replace('/', '_')
+    module = 'apache_beam.transforms.%s.%s' % (self.test_dir_name, module_name)
+    return importlib.import_module(module)
+
+  def write_wrappers_to_destinations_and_validate(
+      self, destinations: typing.List[str]):
+    """
+    Generate wrappers from the config path and validate all destinations are
+    included.
+    Then write wrappers to destinations and validate all destination paths
+    exist.
+
+    :return: Generated wrappers grouped by destination
+    """
+    grouped_wrappers = self.script.get_wrappers_from_transform_configs(
+        self.transform_config_path)
+    for dest in destinations:
+      self.assertIn(dest, grouped_wrappers)
+
+    # write to our test directory to avoid messing with other files
+    self.script.write_wrappers_to_destinations(
+        grouped_wrappers, self.test_dir, format_code=False)
+
+    for dest in destinations:
+      self.assertTrue(
+          os.path.exists(
+              os.path.join(
+                  self.test_dir,
+                  dest.replace('apache_beam/', '').replace('/', '_') + ".py")))
+    return grouped_wrappers
+
+  def test_script_workflow(self):
+    expected_destination = 'apache_beam/transforms'
+    expansion_service_config = {
+        "gradle_target": 'sdks:java:io:expansion-service:shadowJar',
+        'destinations': {
+            'python': expected_destination
+        }
+    }
+
+    self.create_and_validate_transforms_config(
+        expansion_service_config, 'GenerateSequence', expected_destination)
+    grouped_wrappers = self.write_wrappers_to_destinations_and_validate(
+        [expected_destination])
+    # at least the GenerateSequence wrapper is set to this destination
+    self.assertGreaterEqual(len(grouped_wrappers[expected_destination]), 1)
+
+    # check the wrapper exists in this destination and has correct properties
+    output_module = self.get_module(expected_destination)
+    self.assertTrue(hasattr(output_module, 'GenerateSequence'))
+    # Since our config isn't skipping any transforms,
+    # it should include these two Kafka IOs as well
+    self.assertTrue(hasattr(output_module, 'KafkaWrite'))
+    self.assertTrue(hasattr(output_module, 'KafkaRead'))
+    self.assertTrue(
+        isinstance(output_module.GenerateSequence(start=0), ExternalTransform))
+    self.assertEqual(
+        output_module.GenerateSequence.identifier, self.GEN_SEQ_IDENTIFIER)
+
+    self.delete_and_validate()
+
+  def test_script_workflow_with_modified_transforms(self):
+    modified_name = 'ModifiedSequence'
+    modified_dest = 'apache_beam/io/gcp'
+    expansion_service_config = {
+        "gradle_target": 'sdks:java:io:expansion-service:shadowJar',
+        'destinations': {
+            'python': 'apache_beam/transforms'
+        },
+        'transforms': {
+            'beam:schematransform:org.apache.beam:generate_sequence:v1': {
+                'name': modified_name,
+                'destinations': {
+                    'python': modified_dest
+                }
+            }
+        }
+    }
+
+    self.create_and_validate_transforms_config(
+        expansion_service_config, modified_name, modified_dest)
+
+    grouped_wrappers = self.write_wrappers_to_destinations_and_validate(
+        [modified_dest])
+    self.assertIn(modified_name, grouped_wrappers[modified_dest][0])
+    self.assertEqual(len(grouped_wrappers[modified_dest]), 1)
+
+    # check the modified wrapper exists in the modified destination
+    # and check it has the correct properties
+    output_module = self.get_module(modified_dest)
+    self.assertTrue(
+        isinstance(output_module.ModifiedSequence(start=0), ExternalTransform))
+    self.assertEqual(
+        output_module.ModifiedSequence.identifier, self.GEN_SEQ_IDENTIFIER)
+
+    self.delete_and_validate()
+
+  def test_script_workflow_with_skipped_transform(self):
+    expansion_service_config = {
+        "gradle_target": 'sdks:java:io:expansion-service:shadowJar',
+        'destinations': {
+            'python': f'apache_beam/transforms/{self.test_dir_name}'
+        },
+        'skip_transforms': [
+            'beam:schematransform:org.apache.beam:generate_sequence:v1'
+        ]
+    }
+
+    with open(self.service_config_path, 'w') as f:
+      yaml.dump([expansion_service_config], f)
+
+    self.script.generate_transforms_config(
+        self.service_config_path, self.transform_config_path)
+
+    # gen sequence shouldn't exist in the transform config
+    with open(self.transform_config_path) as f:
+      transforms = yaml.safe_load(f)
+      gen_seq_config = None
+      for transform in transforms:
+        if transform['identifier'] == self.GEN_SEQ_IDENTIFIER:
+          gen_seq_config = transform
+      self.assertIsNone(gen_seq_config)
+
+  def test_run_pipeline_with_generated_transform(self):
+    with beam.Pipeline() as p:
+      numbers = (
+          p | GenerateSequence(start=0, end=10)
+          | beam.Map(lambda row: row.value))
+      assert_that(numbers, equal_to([i for i in range(10)]))
+
+  def test_check_standard_external_transforms_config_in_sync(self):
+    """
+    This test creates a transforms config file and checks it against
+    `sdks/standard_external_transforms.yaml`. Fails if the two configs don't
+     match.
+
+    Fix by running `./gradlew generateExternalTransformsConfig` and
+    committing the changes.
+    """
+    sdks_dir = os.path.abspath(dirname(self.sdk_dir))
+    self.script.generate_transforms_config(
+        os.path.join(sdks_dir, 'standard_expansion_services.yaml'),
+        self.transform_config_path)
+    with open(self.transform_config_path) as f:
+      test_config = yaml.safe_load(f)
+    with open(os.path.join(sdks_dir, 'standard_external_transforms.yaml'),
+              'r') as f:
+      standard_config = yaml.safe_load(f)
+
+    self.assertEqual(
+        test_config,
+        standard_config,
+        "The standard xlang transforms config file "
+        "\"standard_external_transforms.yaml\" is out of sync! Please update"
+        "by running './gradlew generateExternalTransformsConfig'"
+        "and committing the changes.")
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()
diff --git 
a/sdks/python/apache_beam/transforms/external_transform_provider_test.py 
b/sdks/python/apache_beam/transforms/external_transform_provider_test.py
deleted file mode 100644
index 36fe9b5c4bd..00000000000
--- a/sdks/python/apache_beam/transforms/external_transform_provider_test.py
+++ /dev/null
@@ -1,140 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-import logging
-import os
-import unittest
-
-import pytest
-
-import apache_beam as beam
-from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.testing.util import assert_that
-from apache_beam.testing.util import equal_to
-from apache_beam.transforms.external import BeamJarExpansionService
-from apache_beam.transforms.external_transform_provider import 
STANDARD_URN_PATTERN
-from apache_beam.transforms.external_transform_provider import 
ExternalTransformProvider
-from apache_beam.transforms.external_transform_provider import 
camel_case_to_snake_case
-from apache_beam.transforms.external_transform_provider import 
infer_name_from_identifier
-from apache_beam.transforms.external_transform_provider import 
snake_case_to_lower_camel_case
-from apache_beam.transforms.external_transform_provider import 
snake_case_to_upper_camel_case
-
-
-class NameUtilsTest(unittest.TestCase):
-  def test_snake_case_to_upper_camel_case(self):
-    test_cases = [("", ""), ("test", "Test"), ("test_name", "TestName"),
-                  ("test_double_underscore", "TestDoubleUnderscore"),
-                  ("TEST_CAPITALIZED", "TestCapitalized"),
-                  ("_prepended_underscore", "PrependedUnderscore"),
-                  ("appended_underscore_", "AppendedUnderscore")]
-    for case in test_cases:
-      self.assertEqual(case[1], snake_case_to_upper_camel_case(case[0]))
-
-  def test_snake_case_to_lower_camel_case(self):
-    test_cases = [("", ""), ("test", "test"), ("test_name", "testName"),
-                  ("test_double_underscore", "testDoubleUnderscore"),
-                  ("TEST_CAPITALIZED", "testCapitalized"),
-                  ("_prepended_underscore", "prependedUnderscore"),
-                  ("appended_underscore_", "appendedUnderscore")]
-    for case in test_cases:
-      self.assertEqual(case[1], snake_case_to_lower_camel_case(case[0]))
-
-  def test_camel_case_to_snake_case(self):
-    test_cases = [("", ""), ("Test", "test"), ("TestName", "test_name"),
-                  ("TestDoubleUnderscore",
-                   "test_double_underscore"), ("MyToLoFo", "my_to_lo_fo"),
-                  ("BEGINNINGAllCaps",
-                   "beginning_all_caps"), ("AllCapsENDING", "all_caps_ending"),
-                  ("AllCapsMIDDLEWord", "all_caps_middle_word"),
-                  ("lowerCamelCase", "lower_camel_case")]
-    for case in test_cases:
-      self.assertEqual(case[1], camel_case_to_snake_case(case[0]))
-
-  def test_infer_name_from_identifier(self):
-    standard_test_cases = [
-        ("beam:schematransform:org.apache.beam:transform:v1", "Transform"),
-        ("beam:schematransform:org.apache.beam:my_transform:v1",
-         "MyTransform"), (
-             "beam:schematransform:org.apache.beam:my_transform:v2",
-             "MyTransformV2"),
-        ("beam:schematransform:org.apache.beam:fe_fi_fo_fum:v2", 
"FeFiFoFumV2"),
-        ("beam:schematransform:bad_match:my_transform:v1", None)
-    ]
-    for case in standard_test_cases:
-      self.assertEqual(
-          case[1], infer_name_from_identifier(case[0], STANDARD_URN_PATTERN))
-
-    custom_pattern_cases = [
-        # (<pattern>, <urn>, <expected output>)
-        (
-            r"^custom:transform:([\w-]+):(\w+)$",
-            "custom:transform:my_transform:v1",
-            "MyTransformV1"),
-        (
-            r"^org.user:([\w-]+):([\w-]+):([\w-]+):external$",
-            "org.user:some:custom_transform:we_made:external",
-            "SomeCustomTransformWeMade"),
-        (
-            r"^([\w-]+):user.transforms",
-            "my_eXTErnal:user.transforms",
-            "MyExternal"),
-        (r"^([\w-]+):user.transforms", "my_external:badinput.transforms", 
None),
-    ]
-    for case in custom_pattern_cases:
-      self.assertEqual(case[2], infer_name_from_identifier(case[1], case[0]))
-
-
[email protected]_io_java_expansion_service
[email protected](
-    os.environ.get('EXPANSION_PORT'),
-    "EXPANSION_PORT environment var is not provided.")
-class ExternalTransformProviderTest(unittest.TestCase):
-  def setUp(self):
-    self.test_pipeline = TestPipeline(is_integration_test=True)
-
-  def test_generate_sequence_config_schema_and_description(self):
-    provider = ExternalTransformProvider(
-        BeamJarExpansionService(":sdks:java:io:expansion-service:shadowJar"))
-
-    self.assertTrue((
-        'GenerateSequence',
-        'beam:schematransform:org.apache.beam:generate_sequence:v1'
-    ) in provider.get_available())
-
-    GenerateSequence = provider.get('GenerateSequence')
-    config_schema = GenerateSequence.configuration_schema
-    for param in ['start', 'end', 'rate']:
-      self.assertTrue(param in config_schema)
-
-    description_substring = (
-        "Outputs a PCollection of Beam Rows, each "
-        "containing a single INT64")
-    self.assertTrue(description_substring in GenerateSequence.description)
-
-  def test_run_generate_sequence(self):
-    provider = ExternalTransformProvider(
-        BeamJarExpansionService(":sdks:java:io:expansion-service:shadowJar"))
-
-    with beam.Pipeline() as p:
-      numbers = p | provider.GenerateSequence(
-          start=0, end=10) | beam.Map(lambda row: row.value)
-
-      assert_that(numbers, equal_to([i for i in range(10)]))
-
-
-if __name__ == '__main__':
-  logging.getLogger().setLevel(logging.INFO)
-  unittest.main()
diff --git a/sdks/python/MANIFEST.in 
b/sdks/python/apache_beam/transforms/xlang/__init__.py
similarity index 67%
copy from sdks/python/MANIFEST.in
copy to sdks/python/apache_beam/transforms/xlang/__init__.py
index 60b0989d9af..ba896615cbc 100644
--- a/sdks/python/MANIFEST.in
+++ b/sdks/python/apache_beam/transforms/xlang/__init__.py
@@ -15,8 +15,13 @@
 # limitations under the License.
 #
 
-include gen_protos.py
-include README.md
-include NOTICE
-include LICENSE
-include LICENSE.python
+"""
+This package contains autogenerated Python wrappers for cross-language
+transforms available in other Beam SDKs.
+
+For documentation on creating and using cross-language transforms, see:
+https://beam.apache.org/documentation/programming-guide/#use-x-lang-transforms
+
+For more information on transform wrapper generation, see the top level script:
+`gen_xlang_wrappers.py`
+"""
diff --git a/sdks/python/build.gradle b/sdks/python/build.gradle
index 7f2bc7f5d42..85b26ca3a46 100644
--- a/sdks/python/build.gradle
+++ b/sdks/python/build.gradle
@@ -66,6 +66,25 @@ artifacts {
   distTarBall file: file("${buildDir}/${tarball}"), builtBy: sdist
 }
 
+tasks.register("generateExternalTransformsConfig") {
+  description "Discovers external transforms and regenerates the config at 
sdks/standard_expansion_services.yaml"
+
+  dependsOn buildPython
+  // Need to build all expansion services listed in 
sdks/standard_expansion_services.yaml
+  dependsOn ":sdks:java:io:google-cloud-platform:expansion-service:build"
+  dependsOn ":sdks:java:io:expansion-service:build"
+  // Keep this in-sync with pyproject.toml
+  def PyYaml = "'pyyaml>=3.12,<7.0.0'"
+
+  doLast {
+    exec {
+      executable 'sh'
+      args '-c', "pip install $PyYaml && " +
+              "python gen_xlang_wrappers.py --cleanup --generate-config-only"
+    }
+  }
+}
+
 // Create Python wheels for given platform and Python version
 // build identifiers for cibuildwheel
 def platform_identifiers_map = [
diff --git a/sdks/python/gen_xlang_wrappers.py 
b/sdks/python/gen_xlang_wrappers.py
new file mode 100644
index 00000000000..a75fc05cba7
--- /dev/null
+++ b/sdks/python/gen_xlang_wrappers.py
@@ -0,0 +1,447 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Generates Python wrappers for external transforms (specifically,
+SchemaTransforms)
+"""
+
+import argparse
+import datetime
+import logging
+import os
+import shutil
+import subprocess
+import typing
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Union
+
+import yaml
+
+from gen_protos import LICENSE_HEADER
+from gen_protos import PROJECT_ROOT
+from gen_protos import PYTHON_SDK_ROOT
+
+SUPPORTED_SDK_DESTINATIONS = ['python']
+PYTHON_SUFFIX = "_et.py"
+PY_WRAPPER_OUTPUT_DIR = os.path.join(
+    PYTHON_SDK_ROOT, 'apache_beam', 'transforms', 'xlang')
+
+
+def generate_transforms_config(input_services, output_file):
+  """
+  Generates a YAML file containing a list of transform configurations.
+
+  Takes an input YAML file containing a list of expansion service gradle
+  targets. Each service must provide a `destinations` field that specifies the
+  default package (relative path) that generated wrappers should be imported
+  to. A default destination package is specified for each SDK, like so::
+
+    - gradle_target: 'sdks:java:io:expansion-service:shadowJar'
+      destinations:
+        python: 'apache_beam/io'
+
+  We use :class:`ExternalTransformProvider` to discover external
+  transforms. Then, we extract the necessary details of each transform and
+  compile them into a new YAML file, which is later used to generate wrappers.
+
+  Importing generated transforms to an existing package
+  -----------------------------------------------------
+  When running the script on the config above, a new module will be created at
+  `apache_beam/transforms/xlang/io.py`. This contains all
+  generated wrappers that are set to destination 'apache_beam/io'. Finally,
+  to make these available to the `apache_beam.io` package (or any package
+  really), just add the following line to the package's `__init__.py` file::
+    from apache_beam.transforms.xlang.io import *
+
+  Modifying a transform's name and destination
+  --------------------------------------------
+  Each service may also specify modifications for particular transform.
+  Currently, one can modify the generated wrapper's **name** and
+  **destination** package:
+    - By default, the transform's identifier is used to generate the wrapper
+      class name. This can be overriden by manually providing a name.
+    - By default, generated wrappers are made available to the package provided
+      by their respective expansion service. This can be overridden by
+      providing a relative path to a different package.
+
+  See the following example for what such modifications can look like::
+
+    - gradle_target: 'sdks:java:io:expansion-service:shadowJar'
+      destinations:
+        python: 'apache_beam/io'
+      transforms:
+        'beam:schematransform:org.apache.beam:my_transform:v1':
+          name: 'MyCustomTransformName'
+          destinations:
+            python: 'apache_beam/io/gcp'
+
+  For the above example, we would take the transform with identifier
+  `beam:schematransform:org.apache.beam:my_transform:v1` and by default infer
+  a wrapper class name of `MyTransform` then write it to the module
+  `apache_beam/transforms/xlang/io.py`. With these modifications
+  however, we instead use the provided name `MyCustomTransformName` and write
+  it to `apache_beam/transforms/xlang/io_gcp.py`.
+  Similar to above, this can be made available by importing it in the
+  `__init__.py` file like so::
+    from apache_beam.transforms.xlang.io_gcp import *
+
+  Skipping transforms
+  -------------------
+  To skip a particular transform, simply list its identifier in the
+  `skip_transforms` field, like so::
+
+    - gradle_target: 'sdks:java:io:expansion-service:shadowJar'
+      destinations:
+        python: 'apache_beam/io'
+      skip_transforms:
+        - 'beam:schematransform:org.apache.beam:some_transform:v1'
+  """
+  from apache_beam.transforms.external import BeamJarExpansionService
+  from apache_beam.transforms.external_transform_provider import 
ExternalTransform
+  from apache_beam.transforms.external_transform_provider import 
ExternalTransformProvider
+
+  transform_list: List[Dict[str, Any]] = []
+
+  with open(input_services) as f:
+    services = yaml.safe_load(f)
+  for service in services:
+    target = service['gradle_target']
+
+    if "destinations" not in service:
+      raise ValueError(
+          f"Expansion service with target '{target}' does not "
+          "specify any default destinations.")
+    service_destinations: Dict[str, str] = service['destinations']
+    for sdk, dest in service_destinations.items():
+      validate_sdks_destinations(sdk, dest, target)
+
+    transforms_to_skip = service.get('skip_transforms', [])
+
+    # use dynamic provider to discover and populate wrapper details
+    provider = ExternalTransformProvider(BeamJarExpansionService(target))
+    discovered: Dict[str, ExternalTransform] = provider.get_all()
+    for identifier, wrapper in discovered.items():
+      if identifier in transforms_to_skip:
+        continue
+
+      transform_destinations = service_destinations.copy()
+
+      # apply any modifications
+      modified_transform = {}
+      if 'transforms' in service and identifier in service['transforms']:
+        modified_transform = service['transforms'][identifier]
+      for sdk, dest in modified_transform.get('destinations', {}).items():
+        validate_sdks_destinations(sdk, dest, target, identifier)
+        transform_destinations[sdk] = dest  # override the destination
+      name = modified_transform.get('name', wrapper.__name__)
+
+      fields = {}
+      for param in wrapper.configuration_schema.values():
+        (tp, nullable) = pretty_type(param.type)
+        field_info = {
+            'type': str(tp),
+            'description': param.description,
+            'nullable': nullable
+        }
+        fields[param.original_name] = field_info
+
+      transform = {
+          'identifier': identifier,
+          'name': name,
+          'destinations': transform_destinations,
+          'default_service': target,
+          'fields': fields,
+          'description': wrapper.description
+      }
+      transform_list.append(transform)
+
+  with open(output_file, 'w') as f:
+    f.write(LICENSE_HEADER.lstrip())
+    f.write(
+        "# NOTE: This file is autogenerated and should "
+        "not be edited by hand.\n")
+    f.write(
+        "# Configs are generated based on the expansion service\n"
+        f"# configuration in {input_services.replace(PROJECT_ROOT, '')}.\n")
+    f.write("# Refer to gen_xlang_wrappers.py for more info.\n")
+    dt = datetime.datetime.now().date()
+    f.write(f"#\n# Last updated on: {dt}\n\n")
+    yaml.dump(transform_list, f)
+  logging.info("Successfully wrote transform configs to file: %s", output_file)
+
+
+def validate_sdks_destinations(sdk, dest, service, identifier=None):
+  if identifier:
+    message = f"Identifier '{identifier}'"
+  else:
+    message = f"Service '{service}'"
+  if sdk not in SUPPORTED_SDK_DESTINATIONS:
+    raise ValueError(
+        message + " specifies a destination for an invalid SDK:"
+        f" '{sdk}'. The supported SDKs are {SUPPORTED_SDK_DESTINATIONS}")
+  if not os.path.isdir(os.path.join(PYTHON_SDK_ROOT, *dest.split('/'))):
+    raise ValueError(
+        message + f" specifies an invalid destination '{dest}'."
+        " Please make sure the destination is an existing directory.")
+
+
+def pretty_type(tp):
+  """
+  Takes a type and returns a tuple containing a pretty string representing it
+  and a bool signifying if it is nullable or not.
+
+  For optional types, the contained type is unwrapped and returned. This does
+  not recurse however, so inner Optional types are not affected.
+  E.g. the input typing.Optional[typing.Dict[int, typing.Optional[str]]] will
+  return (Dict[int, Union[str, NoneType]], True)
+  """
+  nullable = False
+  if (typing.get_origin(tp) is Union and type(None) in typing.get_args(tp)):
+    nullable = True
+    # only unwrap if it's a single nullable type. if the type is truly a union
+    # of multiple types, leave it alone.
+    args = typing.get_args(tp)
+    if len(args) == 2:
+      tp = list(filter(lambda t: not isinstance(t, type(None)), args))[0]
+
+  # TODO(ahmedabu98): Make this more generic to support other remote SDKs
+  # Potentially use Runner API types
+  if tp.__module__ == 'builtins':
+    tp = tp.__name__
+  elif tp.__module__ == 'typing':
+    tp = str(tp).replace("typing.", "")
+  elif tp.__module__ == 'numpy':
+    tp = "%s.%s" % (tp.__module__, tp.__name__)
+
+  return (tp, nullable)
+
+
+def camel_case_to_snake_case(string):
+  """Convert camelCase to snake_case"""
+  arr = []
+  word = []
+  for i, n in enumerate(string):
+    # If seeing an upper letter after a lower letter, we just witnessed a word
+    # If seeing an upper letter and the next letter is lower, we may have just
+    # witnessed an all caps word
+    if n.isupper() and ((i > 0 and string[i - 1].islower()) or
+                        (i + 1 < len(string) and string[i + 1].islower())):
+      arr.append(''.join(word))
+      word = [n.lower()]
+    else:
+      word.append(n.lower())
+  arr.append(''.join(word))
+  return '_'.join(arr).strip('_')
+
+
+def get_wrappers_from_transform_configs(config_file) -> Dict[str, List[str]]:
+  """
+  Generates code for external transform wrapper classes (subclasses of
+  :class:`ExternalTransform`).
+
+  Takes a YAML file containing a list of SchemaTransform configurations. For
+  each configuration, the code for a wrapper class is generated, along with any
+  documentation that may be included.
+
+  Each configuration must include a destination file that the generated class
+  will be written to.
+
+  Returns the generated classes, grouped by destination.
+  """
+  from jinja2 import Environment
+  from jinja2 import FileSystemLoader
+
+  env = Environment(loader=FileSystemLoader(PYTHON_SDK_ROOT))
+  python_wrapper_template = env.get_template("python_xlang_wrapper.template")
+
+  # maintain a list of wrappers to write in each file. if modified destinations
+  # are used, we may end up with multiple wrappers in one file.
+  destinations: Dict[str, List[str]] = {}
+
+  with open(config_file) as f:
+    transforms = yaml.safe_load(f)
+    for config in transforms:
+      default_service = config['default_service']
+      description = config['description']
+      destination = config['destinations']['python']
+      name = config['name']
+      fields = config['fields']
+      identifier = config['identifier']
+
+      parameters = []
+      for param, info in fields.items():
+        pythonic_name = camel_case_to_snake_case(param)
+        param_details = {
+            "name": pythonic_name,
+            "type": info['type'],
+            "description": info['description'],
+        }
+
+        if info['nullable']:
+          param_details["default"] = None
+        parameters.append(param_details)
+
+      # Python syntax requires function definitions to have
+      # non-default parameters first
+      parameters = sorted(parameters, key=lambda p: 'default' in p)
+      default_service = f"BeamJarExpansionService(\"{default_service}\")"
+
+      python_wrapper_class = python_wrapper_template.render(
+          class_name=name,
+          identifier=identifier,
+          parameters=parameters,
+          description=description,
+          default_expansion_service=default_service)
+
+      if destination not in destinations:
+        destinations[destination] = []
+      destinations[destination].append(python_wrapper_class)
+
+  return destinations
+
+
+def write_wrappers_to_destinations(
+    grouped_wrappers: Dict[str, List[str]],
+    output_dir=PY_WRAPPER_OUTPUT_DIR,
+    format_code=True):
+  """
+  Takes a dictionary of generated wrapper code, grouped by destination.
+  For each destination, create a new file containing the respective wrapper
+  classes. Each file includes the Apache License header and relevant imports.
+  Note: the Jinja template should already follow linting and formatting rules.
+  """
+  written_files = []
+  for dest, wrappers in grouped_wrappers.items():
+    module_name = dest.replace('apache_beam/', '').replace('/', '_')
+    module_path = os.path.join(output_dir, module_name) + ".py"
+    with open(module_path, "w") as file:
+      file.write(LICENSE_HEADER.lstrip())
+      file.write(
+          "\n# NOTE: This file contains autogenerated external transform(s)\n"
+          "# and should not be edited by hand.\n"
+          "# Refer to gen_xlang_wrappers.py for more info.\n\n")
+      file.write(
+          "\"\"\""
+          "Cross-language transforms in this module can be imported from the\n"
+          f":py:mod:`{dest.replace('/', '.')}` package."
+          "\"\"\"\n\n")
+      file.write(
+          "# pylint:disable=line-too-long\n\n"
+          "from apache_beam.transforms.external import "
+          "BeamJarExpansionService\n"
+          "from apache_beam.transforms.external_transform_provider "
+          "import ExternalTransform\n")
+      for wrapper in wrappers:
+        file.write(wrapper + "\n")
+    written_files.append(module_path)
+
+  logging.info("Created external transform wrapper modules: %s", written_files)
+
+  if format_code:
+    formatting_cmd = ['yapf', '--in-place', *written_files]
+    subprocess.run(formatting_cmd, capture_output=True, check=True)
+
+
+def delete_generated_files(root_dir):
+  """Scans for and deletes generated wrapper files."""
+  logging.info("Deleting external transform wrappers from dir %s", root_dir)
+  deleted_files = os.listdir(root_dir)
+  for file in deleted_files:
+    if file == '__init__.py':
+      deleted_files.remove(file)
+      continue
+    path = os.path.join(root_dir, file)
+    if os.path.isfile(path) or os.path.islink(path):
+      os.unlink(os.path.join(root_dir, file))
+    else:
+      shutil.rmtree(path)
+  logging.info("Successfully deleted files: %s", deleted_files)
+
+
+def run_script(
+    cleanup,
+    generate_config_only,
+    input_expansion_services,
+    transforms_config_source):
+  # Cleanup first if requested. This is needed to remove outdated wrappers.
+  if cleanup:
+    delete_generated_files(PY_WRAPPER_OUTPUT_DIR)
+
+  # This step requires the expansion service.
+  # Only generate a transforms config file if none are provided
+  if not transforms_config_source:
+    output_transforms_config = os.path.join(
+        PROJECT_ROOT, 'sdks', 'standard_external_transforms.yaml')
+    generate_transforms_config(
+        input_services=input_expansion_services,
+        output_file=output_transforms_config)
+
+    transforms_config_source = output_transforms_config
+  else:
+    if not os.path.exists(transforms_config_source):
+      raise RuntimeError(
+          "Could not find the provided transforms config "
+          f"source: {transforms_config_source}")
+
+  if generate_config_only:
+    return
+
+  wrappers_grouped_by_destination = get_wrappers_from_transform_configs(
+      transforms_config_source)
+
+  write_wrappers_to_destinations(wrappers_grouped_by_destination)
+
+
+if __name__ == '__main__':
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--cleanup',
+      dest='cleanup',
+      action='store_true',
+      help="Whether to cleanup existing generated wrappers first.")
+  parser.add_argument(
+      '--generate-config-only',
+      dest='generate_config_only',
+      action='store_true',
+      help="If set, will generate the transform config only without generating"
+      "any wrappers.")
+  parser.add_argument(
+      '--input-expansion-services',
+      dest='input_expansion_services',
+      default=os.path.join(
+          PROJECT_ROOT, 'sdks', 'standard_expansion_services.yaml'),
+      help=(
+          "Absolute path to the input YAML file that contains "
+          "expansion service configs. Ignored if a transforms config"
+          "source is provided."))
+  parser.add_argument(
+      '--transforms-config-source',
+      dest='transforms_config_source',
+      help=(
+          "Absolute path to a source transforms config YAML file to "
+          "generate wrapper modules from. If not provided, one will be "
+          "created by this script."))
+  args = parser.parse_args()
+
+  run_script(
+      args.cleanup,
+      args.generate_config_only,
+      args.input_expansion_services,
+      args.transforms_config_source)
diff --git a/sdks/python/pyproject.toml b/sdks/python/pyproject.toml
index f1a65c842d5..9829671ccb7 100644
--- a/sdks/python/pyproject.toml
+++ b/sdks/python/pyproject.toml
@@ -29,6 +29,12 @@ requires = [
     "numpy>=1.14.3,<1.27", # Update setup.py as well.
     # having cython here will create wheels that are platform dependent.
     "cython==0.29.36",
+    ## deps for generating external transform wrappers:
+    # also update PyYaml bounds in sdks:python:generateExternalTransformsConfig
+    'pyyaml>=3.12,<7.0.0',
+    # also update Jinja2 bounds in test-suites/xlang/build.gradle (look for 
xlangWrapperValidation task)
+    "jinja2>=2.7.1,<4.0.0",
+    'yapf==0.29.0'
 ]
 
 
diff --git a/sdks/python/pytest.ini b/sdks/python/pytest.ini
index c95aa5974da..e78697169bb 100644
--- a/sdks/python/pytest.ini
+++ b/sdks/python/pytest.ini
@@ -34,6 +34,7 @@ markers =
     uses_java_expansion_service: collect Cross Language Java transforms test 
runs
     uses_python_expansion_service: collect Cross Language Python transforms 
test runs
     uses_io_java_expansion_service: collect Cross Language IO Java transform 
test runs (with Kafka bootstrap server)
+    xlang_wrapper_generation: collect tests that validate Cross Language 
wrapper generation
     uses_transform_service: collect Cross Language test runs that uses the 
Transform Service
     xlang_sql_expansion_service: collect for Cross Language with SQL expansion 
service test runs
     it_postcommit: collect for post-commit integration test runs
diff --git a/sdks/python/python_xlang_wrapper.template 
b/sdks/python/python_xlang_wrapper.template
new file mode 100644
index 00000000000..f3d3728aace
--- /dev/null
+++ b/sdks/python/python_xlang_wrapper.template
@@ -0,0 +1,36 @@
+{#
+   Licensed to the Apache Software Foundation (ASF) under one
+   or more contributor license agreements.  See the NOTICE file
+   distributed with this work for additional information
+   regarding copyright ownership.  The ASF licenses this file
+   to you under the Apache License, Version 2.0 (the
+   "License"); you may not use this file except in compliance
+   with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+#}
+
+class {{ class_name }}(ExternalTransform):
+{% if description %}  """
+  {{ description | wordwrap(78) | replace('\n', '\n  ') }}
+  """{% endif %}
+  identifier = "{{ identifier }}"
+
+  def __init__(
+      self,{% if parameters %}{% for param in parameters%}
+      {{ param.name }}{% if 'default' in param %}={{ param.default }}{% endif 
%},{% endfor %}{% endif %}
+      expansion_service=None):
+    {% if parameters %}"""{% for param in parameters %}
+    :param {{ param.name }}: ({{ param.type }}){% if param.description %}
+      {{ param.description | wordwrap(72) | replace('\n', '\n      ') }} {% 
endif %}{% endfor %}
+    """{% endif %}
+    self.default_expansion_service = {{ default_expansion_service }}
+    super().__init__(
+        {% for param in parameters %}{{ param.name }}={{ param.name }},
+        {% endfor %}expansion_service=expansion_service)
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index a5a14c035dc..409951cbc41 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -16,8 +16,8 @@
 #
 
 """Apache Beam SDK for Python setup file."""
-
 import glob
+import logging
 import os
 import shutil
 import subprocess
@@ -227,6 +227,50 @@ def copy_tests_from_docs():
           f'Could not locate yaml docs in {docs_src} or {docs_dest}.')
 
 
+def generate_external_transform_wrappers():
+  try:
+    sdk_dir = os.path.abspath(os.path.dirname(__file__))
+    script_exists = os.path.exists(
+        os.path.join(sdk_dir, 'gen_xlang_wrappers.py'))
+    config_exists = os.path.exists(
+        os.path.join(os.path.dirname(sdk_dir),
+                     'standard_external_transforms.yaml'))
+    # we need both the script and the standard transforms config file.
+    # at build time, we don't have access to apache_beam to discover and
+    # retrieve external transforms, so the config file has to already exist
+    if not script_exists or not config_exists:
+      generated_transforms_dir = os.path.join(
+        sdk_dir, 'apache_beam', 'transforms', 'xlang')
+
+      # if exists, this directory will have at least its __init__.py file
+      if (not os.path.exists(generated_transforms_dir) or
+              len(os.listdir(generated_transforms_dir)) <= 1):
+        message = 'External transform wrappers have not been generated '
+        if not script_exists:
+          message += 'and the generation script `gen_xlang_wrappers.py`'
+        if not config_exists:
+          message += 'and the standard external transforms config'
+        message += ' could not be found'
+        raise RuntimeError(message)
+      else:
+        logging.info(
+            'Skipping external transform wrapper generation as they '
+            'are already generated.')
+      return
+    subprocess.run([
+        sys.executable,
+        os.path.join(sdk_dir, 'gen_xlang_wrappers.py'),
+        '--cleanup',
+        '--transforms-config-source',
+        os.path.join(os.path.dirname(sdk_dir),
+                     'standard_external_transforms.yaml')
+    ], capture_output=True, check=True)
+  except subprocess.CalledProcessError as err:
+    raise RuntimeError(
+        'Could not generate external transform wrappers due to '
+        'error: %s', err.stderr)
+
+
 def get_portability_package_data():
   files = []
   portability_dir = Path(__file__).parent / 'apache_beam' / \
@@ -253,6 +297,8 @@ if __name__ == '__main__':
   # executes below.
   generate_protos_first()
 
+  generate_external_transform_wrappers()
+
   # These data files live elsewhere in the full Beam repository.
   copy_tests_from_docs()
 
@@ -385,7 +431,6 @@ if __name__ == '__main__':
               'testcontainers[mysql]>=3.0.3,<4.0.0',
               'cryptography>=41.0.2',
               'hypothesis>5.0.0,<=7.0.0',
-              'pyyaml>=3.12,<7.0.0',
           ],
           'gcp': [
               'cachetools>=3.1.0,<6',
diff --git a/sdks/python/test-suites/dataflow/common.gradle 
b/sdks/python/test-suites/dataflow/common.gradle
index d29d85af301..cadf3a6ae2c 100644
--- a/sdks/python/test-suites/dataflow/common.gradle
+++ b/sdks/python/test-suites/dataflow/common.gradle
@@ -536,10 +536,8 @@ def dataflowRegion = 
project.findProperty('dataflowRegion') ?: 'us-central1'
 project(":sdks:python:test-suites:xlang").ext.xlangTasks.each { taskMetadata ->
     createCrossLanguageUsingJavaExpansionTask(
       name: taskMetadata.name,
-      expansionProjectPath: taskMetadata.expansionProjectPath,
+      expansionProjectPaths: taskMetadata.expansionProjectPaths,
       collectMarker: taskMetadata.collectMarker,
-      startJobServer: taskMetadata.startJobServer,
-      cleanupJobServer: taskMetadata.cleanupJobServer,
       pythonPipelineOptions: [
         "--runner=TestDataflowRunner",
         "--project=${dataflowProject}",
@@ -548,6 +546,7 @@ 
project(":sdks:python:test-suites:xlang").ext.xlangTasks.each { taskMetadata ->
         
"--sdk_harness_container_image_overrides=.*java.*,gcr.io/apache-beam-testing/beam-sdk/beam_java8_sdk:latest"
       ],
       pytestOptions: basicPytestOpts,
+      additionalDeps: taskMetadata.additionalDeps,
       additionalEnvs: taskMetadata.additionalEnvs
     )
 }
diff --git a/sdks/python/test-suites/direct/build.gradle 
b/sdks/python/test-suites/direct/build.gradle
index ea643c3303a..4b102534398 100644
--- a/sdks/python/test-suites/direct/build.gradle
+++ b/sdks/python/test-suites/direct/build.gradle
@@ -42,3 +42,10 @@ task ioCrossLanguagePostCommit {
     
dependsOn.add(":sdks:python:test-suites:direct:py${getVersionSuffix(it)}:ioCrossLanguagePythonUsingJava")
   }
 }
+
+task crossLanguageWrapperValidationPreCommit {
+  // Different python versions may output types that look different and lead to
+  // false failures. To be consistent, we test on the lowest version only
+  def lowestSupportedVersion = 
getVersionsAsList('cross_language_validates_py_versions')[0]
+  
dependsOn.add(":sdks:python:test-suites:direct:py${getVersionSuffix(lowestSupportedVersion)}:xlangWrapperValidationPythonUsingJava")
+}
diff --git a/sdks/python/test-suites/direct/common.gradle 
b/sdks/python/test-suites/direct/common.gradle
index 657f7adf801..b5680c2e1e9 100644
--- a/sdks/python/test-suites/direct/common.gradle
+++ b/sdks/python/test-suites/direct/common.gradle
@@ -412,10 +412,8 @@ def gcpProject = project.findProperty('dataflowProject') 
?: 'apache-beam-testing
 project(":sdks:python:test-suites:xlang").ext.xlangTasks.each { taskMetadata ->
     createCrossLanguageUsingJavaExpansionTask(
       name: taskMetadata.name,
-      expansionProjectPath: taskMetadata.expansionProjectPath,
+      expansionProjectPaths: taskMetadata.expansionProjectPaths,
       collectMarker: taskMetadata.collectMarker,
-      startJobServer: taskMetadata.startJobServer,
-      cleanupJobServer: taskMetadata.cleanupJobServer,
       numParallelTests: 1,
       pythonPipelineOptions: [
         "--runner=TestDirectRunner",
@@ -426,6 +424,8 @@ 
project(":sdks:python:test-suites:xlang").ext.xlangTasks.each { taskMetadata ->
         "--timeout=4500", // timeout of whole command execution
         "--color=yes", // console color
         "--log-cli-level=INFO" //log level info
-      ]
+      ],
+      additionalDeps: taskMetadata.additionalDeps,
+      additionalEnvs: taskMetadata.additionalEnvs
     )
 }
diff --git a/sdks/python/test-suites/xlang/build.gradle 
b/sdks/python/test-suites/xlang/build.gradle
index 5a124ac20ce..3065ad8377e 100644
--- a/sdks/python/test-suites/xlang/build.gradle
+++ b/sdks/python/test-suites/xlang/build.gradle
@@ -16,57 +16,43 @@
  * limitations under the License.
  */
 // This is a base file to set up cross language tests for different runners
-import org.apache.beam.gradle.BeamModulePlugin
-import static org.apache.beam.gradle.BeamModulePlugin.CrossLanguageTaskCommon
+import static org.apache.beam.gradle.BeamModulePlugin.CrossLanguageTask
 project.evaluationDependsOn(":sdks:python")
 
-// Set up cross language tests
-def envDir = project.project(":sdks:python").envdir
-def jobPort = BeamModulePlugin.getRandomPort()
-def tmpDir = System.getenv("TMPDIR") ?: System.getenv("WORKSPACE") ?: "/tmp"
-def pidFile = "${tmpDir}/local_job_service_main-${jobPort}.pid"
-
-def setupTask = project.tasks.register("fnApiJobServerSetup", Exec) {
-  dependsOn ':sdks:python:installGcpTest'
-
-  executable 'sh'
-  args '-c', ". ${envDir}/bin/activate && python -m 
apache_beam.runners.portability.local_job_service_main --job_port ${jobPort} 
--pid_file ${pidFile} --background --stdout_file 
${tmpDir}/beam-fnapi-job-server.log"
-}
-
-def cleanupTask = project.tasks.register("fnApiJobServerCleanup", Exec) {
-  executable 'sh'
-  args '-c', ". ${envDir}/bin/activate && python -m 
apache_beam.runners.portability.local_job_service_main --pid_file ${pidFile} 
--stop"
-}
-
-// List of objects representing task metadata to create cross-language tasks 
from.
-// Each object contains the minimum relevant metadata.
-def xlangTasks = []
-
 // ******** Java GCP expansion service ********
 // Note: this only runs cross-language tests that use the Java GCP expansion 
service
-// To run tests that use another expansion service, create a new 
CrossLanguageTaskCommon with the
+// To run tests that use another expansion service, create a new 
CrossLanguageTask with the
 // relevant fields as done here, then add it to `xlangTasks`.
-def gcpExpansionProject = 
project.project(':sdks:java:io:google-cloud-platform:expansion-service')
+def gcpExpansionPath = 
project.project(':sdks:java:io:google-cloud-platform:expansion-service').getPath()
+def ioExpansionPath = 
project.project(':sdks:java:io:expansion-service').getPath()
 // Properties that are common across runners.
 // Used to launch the expansion service, collect the right tests, and cleanup 
afterwards
-def gcpXlangCommon = new CrossLanguageTaskCommon().tap {
+def gcpXlang = new CrossLanguageTask().tap {
     name = "gcpCrossLanguage"
-    expansionProjectPath = gcpExpansionProject.getPath()
+    expansionProjectPaths = [gcpExpansionPath]
     collectMarker = "uses_gcp_java_expansion_service"
-    startJobServer = setupTask
-    cleanupJobServer = cleanupTask
 }
 
-def ioXlangCommon = new CrossLanguageTaskCommon().tap {
+def ioXlang = new CrossLanguageTask().tap {
     name = "ioCrossLanguage"
-    expansionProjectPath = 
project.project(':sdks:java:io:expansion-service').getPath()
+    expansionProjectPaths = [ioExpansionPath]
     collectMarker = "uses_io_java_expansion_service"
-    startJobServer = setupTask
-    cleanupJobServer = cleanupTask
     //See .test-infra/kafka/bitnami/README.md for setup instructions
     additionalEnvs = 
["KAFKA_BOOTSTRAP_SERVER":project.findProperty('kafkaBootstrapServer')]
 }
 
-xlangTasks.addAll(gcpXlangCommon, ioXlangCommon)
+// This list should include all expansion service targets in 
sdks/python/standard_expansion_services.yaml
+def servicesToGenerateFrom = [ioExpansionPath, gcpExpansionPath]
+def xlangWrapperValidation = new CrossLanguageTask().tap {
+    name = "xlangWrapperValidation"
+    expansionProjectPaths = servicesToGenerateFrom
+    collectMarker = "xlang_wrapper_generation"
+    // update Jinja2 bounds in pyproject.toml as well
+    additionalDeps = ['\"Jinja2>=2.7.1,<4.0.0\"']
+}
+
+// List of task metadata objects to create cross-language tasks from.
+// Each object contains the minimum relevant metadata.
+def xlangTasks = [gcpXlang, ioXlang, xlangWrapperValidation]
 
 ext.xlangTasks = xlangTasks
\ No newline at end of file
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index f8e2b63a509..ca35c383eea 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -220,7 +220,7 @@ commands =
   #    --azure_managed_identity_client_id "abc123"
 
 [testenv:py3-yapf]
-# keep the version of yapf in sync with the 'rev' in .pre-commit-config.yaml
+# keep the version of yapf in sync with the 'rev' in .pre-commit-config.yaml 
and pyproject.toml
 deps =
   yapf==0.29.0
 commands =
@@ -228,7 +228,7 @@ commands =
   time yapf --in-place --parallel --recursive apache_beam
 
 [testenv:py3-yapf-check]
-# keep the version of yapf in sync with the 'rev' in .pre-commit-config.yaml
+# keep the version of yapf in sync with the 'rev' in .pre-commit-config.yaml 
and pyproject.toml
 deps =
   yapf==0.29.0
 commands =
diff --git a/sdks/standard_expansion_services.yaml 
b/sdks/standard_expansion_services.yaml
new file mode 100644
index 00000000000..e9e6871be82
--- /dev/null
+++ b/sdks/standard_expansion_services.yaml
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# This file enumerates the standard Apache Beam expansion services.
+# Each service must specify a package destination for each supported SDK, which
+# is where generated wrappers will go by default.
+#
+# Individual transforms can modify their destination module as well as their
+# generated wrapper class name.
+#
+# Transform identifiers listed in the `skip_transforms` field will be skipped.
+#
+# Any new gradle targets added here should also be added to:
+# - sdks/python/build.gradle (as a dependency in the 
'generateExternalTransformsConfig' task)
+# - sdks/python/test-suites/xlang/build.gradle (look for 
'servicesToGenerateFrom')
+#
+# Refer to sdks/python/gen_xlang_wrappers.py for more info.
+
+- gradle_target: 'sdks:java:io:expansion-service:shadowJar'
+  destinations:
+    python: 'apache_beam/io'
+  transforms:
+    'beam:schematransform:org.apache.beam:kafka_write:v1':
+      name: 'WriteToKafka'
+    'beam:schematransform:org.apache.beam:kafka_read:v1':
+      name: 'ReadFromKafka'
+  skip_transforms:
+    # Handwritten Kafka wrappers already exist in apache_beam/io/kafka.py
+    - 'beam:schematransform:org.apache.beam:kafka_write:v1'
+    - 'beam:schematransform:org.apache.beam:kafka_read:v1'
+
+# TODO(ahmedabu98): Enable this service in a future PR
+#- gradle_target: 
'sdks:java:io:google-cloud-platform:expansion-service:shadowJar'
+#  destinations:
+#    python: 'apache_beam/io/gcp'
+#  transforms:
+#    'beam:schematransform:org.apache.beam:spanner_cdc_read:v1':
+#      name: 'ReadFromSpannerChangeStreams'
+#  skip_transforms:
+#    # generate_sequence is already included in the Java IO expansion service
+#    - 'beam:schematransform:org.apache.beam:generate_sequence:v1'
+#    # Handwritten wrappers exist in apache_beam/io/gcp/pubsublite/
+#    - 'beam:schematransform:org.apache.beam:pubsublite_read:v1'
+#    - 'beam:schematransform:org.apache.beam:pubsublite_write:v1'
+#    # Handwritten wrapper exists in apache_beam/io/gcp/spanner.py
+#    - 'beam:schematransform:org.apache.beam:spanner_write:v1'
+#    # Native IO exists in apache_beam/io/gcp/pubsub.py
+#    - 'beam:schematransform:org.apache.beam:pubsub_read:v1'
+#    - 'beam:schematransform:org.apache.beam:pubsub_write:v1'
+#    # Native IO exists in apache_beam/io/gcp/bigquery.py
+#    - 'beam:schematransform:org.apache.beam:bigquery_fileloads_write:v1'
+#    - 'beam:schematransform:org.apache.beam:bigquery_export_read:v1'
+#    - 'beam:schematransform:org.apache.beam:bigquery_storage_write:v2'
+#    - 'beam:schematransform:org.apache.beam:bigquery_storage_read:v1'
+##    - 'beam:schematransform:org.apache.beam:bigquery_storage_write:v2'
+#    # Handwritten wrappers exists in apache_beam/io/jdbc.py
+#    - 'beam:schematransform:org.apache.beam:jdbc_write:v1'
+#    - 'beam:schematransform:org.apache.beam:jdbc_read:v1'
+#    # Handwritten wrappers exist in apache_beam/io/gcp/bigtableio.py
+#    - 'beam:schematransform:org.apache.beam:bigtable_write:v1'
+#    - 'beam:schematransform:org.apache.beam:bigtable_read:v1'
diff --git a/sdks/standard_external_transforms.yaml 
b/sdks/standard_external_transforms.yaml
new file mode 100644
index 00000000000..b43e93ab491
--- /dev/null
+++ b/sdks/standard_external_transforms.yaml
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# NOTE: This file is autogenerated and should not be edited by hand.
+# Configs are generated based on the expansion service
+# configuration in /sdks/standard_expansion_services.yaml.
+# Refer to gen_xlang_wrappers.py for more info.
+#
+# Last updated on: 2024-02-22
+
+- default_service: sdks:java:io:expansion-service:shadowJar
+  description: 'Outputs a PCollection of Beam Rows, each containing a single 
INT64
+    number called "value". The count is produced from the given "start" value 
and
+    either up to the given "end" or until 2^63 - 1.
+
+    To produce an unbounded PCollection, simply do not specify an "end" value. 
Unbounded
+    sequences can specify a "rate" for output elements.
+
+    In all cases, the sequence of numbers is generated in parallel, so there 
is no
+    inherent ordering between the generated values'
+  destinations:
+    python: apache_beam/io
+  fields:
+    end:
+      description: The maximum number to generate (exclusive). Will be an 
unbounded
+        sequence if left unspecified.
+      nullable: true
+      type: numpy.int64
+    rate:
+      description: Specifies the rate to generate a given number of elements 
per a
+        given number of seconds. Applicable only to unbounded sequences.
+      nullable: true
+      type: Row(seconds=typing.Union[numpy.int64, NoneType], elements=<class 
'numpy.int64'>)
+    start:
+      description: The minimum number to generate (inclusive).
+      nullable: false
+      type: numpy.int64
+  identifier: beam:schematransform:org.apache.beam:generate_sequence:v1
+  name: GenerateSequence


Reply via email to