ahmedabu98 commented on code in PR #29834:
URL: https://github.com/apache/beam/pull/29834#discussion_r1466599732


##########
.github/workflows/beam_PostCommit_Python_Xlang_IO_Direct.yml:
##########
@@ -0,0 +1,95 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: PostCommit Python Xlang IO Direct
+
+on:
+  schedule:
+    - cron: '30 5/6 * * *'
+  pull_request_target:
+    paths: ['release/trigger_all_tests.json', 
'.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json']

Review Comment:
   This postcommit is just for tests that use IO expansion service. Another one 
already exists for Dataflow and I'm including this one for DirectRunner. 
   
   I added the precommit you're talking about in the recent commits, and it's 
indeed triggered on relevant Java paths.



##########
sdks/python/gen_xlang_wrappers.py:
##########
@@ -0,0 +1,423 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Generates Python wrappers for external transforms (specifically,
+SchemaTransforms)
+"""
+
+import argparse
+import datetime
+import logging
+import os
+import re
+import typing
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Union
+import subprocess
+
+import yaml
+from jinja2 import Environment
+from jinja2 import FileSystemLoader
+
+from gen_protos import LICENSE_HEADER
+from gen_protos import PYTHON_SDK_ROOT
+from gen_protos import find_by_ext
+
+SUPPORTED_SDK_DESTINATIONS = ['python']
+PYTHON_SUFFIX = "_et.py"
+MARKER = "# NOTE: This file contains autogenerated external transform(s)\n"
+
+
+def generate_transforms_config(input_services, output_file):
+  """
+  Generates a YAML file containing a list of transform configurations.
+
+  Takes an input YAML file containing a list of expansion service gradle
+  targets. Each service must provide a `destinations` field that specifies the
+  default package (relative path) that generated wrappers should be written
+  under. A default destination is specified for each SDK, like so::
+
+    - gradle_target: 'sdks:java:io:expansion-service:shadowJar'
+      destinations:
+        python: 'apache_beam/io'
+
+
+  Each service may also specify modifications for particular transform.
+  Currently, one can modify the generated wrapper's name and destination file:
+
+    - By default, the transform's identifier is used to generate the wrapper
+      class name. This can be overriden by manually providing a name.
+    - By default, generated wrappers are written to files within the package
+      provided by the expansion service. This can be overridden by manually
+      providing a relative file path.
+
+  See the following example for what such modifications can look like::
+
+    - gradle_target: 'sdks:java:io:expansion-service:shadowJar'
+      destinations:
+        python: 'apache_beam/io'
+      transforms:
+        'beam:schematransform:org.apache.beam:my_transform:v1':
+          name: 'MyCustomTransformName'
+          destinations:
+            python: 'apache_beam/io/gcp/my_custom_module'
+
+  For the above example, we would take the transform with identifier
+  `beam:schematransform:org.apache.beam:my_transform:v1` and by default infer
+  a wrapper class name of `MyTransform` and write the generated code to
+  the module `apache_beam/io/my_transform_et.py`. With the modifications, we
+  instead write the wrapper to `apache_beam/io/gcp/my_custom_module_et.py` and
+  name the class `MyCustomTransformName`.
+
+  Note: we add the prefix `"_et.py"` to the module name so that we can find
+  these generated files later (e.g. to tell Git to ignore them, and to
+  delete them when needed)
+
+  To ignore a particular transform, simply list its identifier in the `ignore`
+  field, like so::
+
+    - gradle_target: 'sdks:java:io:expansion-service:shadowJar'
+      destinations:
+        python: 'apache_beam/io'
+      ignore:
+        - 'beam:schematransform:org.apache.beam:skip_me:v1':
+
+
+  We use :class:`ExternalSchemaTransformProvider` to discover external
+  transforms. Then, we extract the necessary details of each transform and
+  compile them into a new YAML file, which is later used to generate wrappers.
+  """
+  from apache_beam.transforms.external import BeamJarExpansionService
+  from apache_beam.transforms.external_schematransform_provider import 
STANDARD_URN_PATTERN
+  from apache_beam.transforms.external_schematransform_provider import 
ExternalSchemaTransform
+  from apache_beam.transforms.external_schematransform_provider import 
ExternalSchemaTransformProvider
+
+  transform_list: List[Dict[str, Any]] = []
+
+  with open(input_services) as f:
+    services = yaml.safe_load(f)
+  for service in services:
+    target = service['gradle_target']
+
+    # validate expansion service destinations
+    if "destinations" not in service:
+      raise ValueError(
+          f"Expansion service with target [{target}] does not "
+          "specify any default destinations.")
+    service_destinations: Dict[str, str] = service['destinations']
+    for sdk in service_destinations.keys():
+      if sdk not in SUPPORTED_SDK_DESTINATIONS:
+        raise ValueError(
+            f"Service with target {target} specifies a "
+            f"destination for an invalid SDK: {sdk}. The "
+            f"supported SDKs are {SUPPORTED_SDK_DESTINATIONS}")
+
+    # get transforms to skip, if any
+    ignore = service.get('ignore', [])
+
+    # use dynamic provider to discover and populate wrapper details
+    provider = ExternalSchemaTransformProvider(BeamJarExpansionService(target))
+    discovered: Dict[str, ExternalSchemaTransform] = provider.get_all()
+    for identifier, wrapper in discovered.items():
+      if identifier in ignore:
+        continue
+      # We infer the destination from the URN and service destination.
+      # For example, the Java IO expansion service defaults to Python
+      # package apache_beam/io. Kafka Write is a transform in this service
+      # with URN beam:schematransform:org.apache.beam:kafka_write:v1
+      # In this case, we infer the destination apache_beam/io/kafka_write
+      functionality_identifier = re.match(STANDARD_URN_PATTERN,
+                                          identifier).groups()[0]
+      destinations = {
+          sdk: f"{destination}/{functionality_identifier}"
+          for sdk,
+          destination in service_destinations.items()
+      }
+      name = wrapper.__name__
+
+      # apply any modifications
+      modified_transform = {}
+      if 'transforms' in service and identifier in service['transforms']:
+        modified_transform = service['transforms'][identifier]
+      if 'name' in modified_transform:
+        name = modified_transform['name']  # override the name
+      if 'destinations' in modified_transform:
+        for sdk, destination in modified_transform['destinations'].items():
+          if sdk not in SUPPORTED_SDK_DESTINATIONS:
+            raise ValueError(
+                f"Identifier {identifier} specifies a destination for "
+                f"an invalid SDK: [{sdk}]. The supported SDKs "
+                f"are {SUPPORTED_SDK_DESTINATIONS}")
+          destinations[sdk] = destination  # override the destination
+
+      # prepare information about parameters
+      fields = {}
+      for param in wrapper.configuration_schema.values():
+        tp = param.type
+        nullable = False
+        # if type is typing.Optional[...]
+        if (typing.get_origin(tp) is Union and
+            type(None) in typing.get_args(tp)):
+          nullable = True
+          # unwrap and set type to the original
+          args = typing.get_args(tp)
+          if len(args) == 2:

Review Comment:
   In the rare case we have an actual Union between multiple types, we should 
leave it alone. If it's just a union between a type and None, we can simplify 
by unwrapping. Will add a comment to mention this



##########
sdks/python/apache_beam/transforms/external_schematransform_provider_test.py:
##########
@@ -14,24 +14,43 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+
 import logging
 import os
+import secrets
+import shutil
+import time
 import unittest
+from importlib import import_module
 
 import pytest
+import yaml
 
 import apache_beam as beam
+from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
 from apache_beam.transforms.external import BeamJarExpansionService
 from apache_beam.transforms.external_schematransform_provider import 
STANDARD_URN_PATTERN
+from apache_beam.transforms.external_schematransform_provider import 
ExternalSchemaTransform
 from apache_beam.transforms.external_schematransform_provider import 
ExternalSchemaTransformProvider
 from apache_beam.transforms.external_schematransform_provider import 
camel_case_to_snake_case
 from apache_beam.transforms.external_schematransform_provider import 
infer_name_from_identifier
 from apache_beam.transforms.external_schematransform_provider import 
snake_case_to_lower_camel_case
 from apache_beam.transforms.external_schematransform_provider import 
snake_case_to_upper_camel_case
 
+try:

Review Comment:
   This block imports files that are not in `apache_beam`, which is not 
supported in some test environments.
   From some runs, I see this includes [Python unit 
tests](https://github.com/apache/beam/blob/445151694fef810d5112ea7e034659f91fceceac/.github/workflows/python_tests.yml#L93)
 and gradle tasks that run with with [tox 
environment](https://github.com/apache/beam/blob/445151694fef810d5112ea7e034659f91fceceac/.github/workflows/beam_PreCommit_Python_Transforms.yml#L96)
 (defined 
[here](https://github.com/apache/beam/blob/445151694fef810d5112ea7e034659f91fceceac/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy#L3080))



##########
sdks/python/gen_xlang_wrappers.py:
##########
@@ -0,0 +1,420 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Generates Python wrappers for external transforms (specifically,
+SchemaTransforms)
+"""
+
+import argparse
+import datetime
+import logging
+import os
+import re
+import typing
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Union
+import subprocess
+
+import yaml
+from jinja2 import Environment
+from jinja2 import FileSystemLoader
+
+from gen_protos import LICENSE_HEADER
+from gen_protos import PYTHON_SDK_ROOT
+from gen_protos import find_by_ext
+
+SUPPORTED_SDK_DESTINATIONS = ['python']
+PYTHON_SUFFIX = "_et.py"
+MARKER = "# NOTE: This file contains autogenerated external transform(s)\n"
+
+
+def generate_transforms_config(input_services, output_file):
+  """
+  Generates a YAML file containing a list of transform configurations.
+
+  Takes an input YAML file containing a list of expansion service gradle
+  targets. Each service must provide a `destinations` field that specifies the
+  default package (relative path) that generated wrappers should be written
+  under. A default destination is specified for each SDK, like so::
+
+    - gradle_target: 'sdks:java:io:expansion-service:shadowJar'
+      destinations:
+        python: 'apache_beam/io'
+
+
+  Each service may also specify modifications for particular transform.
+  Currently, one can modify the generated wrapper's name and destination file:
+
+    - By default, the transform's identifier is used to generate the wrapper
+      class name. This can be overriden by manually providing a name.
+    - By default, generated wrappers are written to files within the package
+      provided by the expansion service. This can be overridden by manually
+      providing a relative file path.
+
+  See the following example for what such modifications can look like::
+
+    - gradle_target: 'sdks:java:io:expansion-service:shadowJar'
+      destinations:
+        python: 'apache_beam/io'
+      transforms:
+        'beam:schematransform:org.apache.beam:my_transform:v1':
+          name: 'MyCustomTransformName'
+          destinations:
+            python: 'apache_beam/io/gcp/my_custom_module'
+
+  For the above example, we would take the transform with identifier
+  `beam:schematransform:org.apache.beam:my_transform:v1` and by default infer
+  a wrapper class name of `MyTransform` and write the generated code to
+  the module `apache_beam/io/my_transform_et.py`. With the modifications, we
+  instead write the wrapper to `apache_beam/io/gcp/my_custom_module_et.py` and
+  name the class `MyCustomTransformName`.
+
+  Note: we add the prefix `"_et.py"` to the module name so that we can find
+  these generated files later (e.g. to tell Git to ignore them, and to
+  delete them when needed)
+
+  To ignore a particular transform, simply list its identifier in the `ignore`
+  field, like so::
+
+    - gradle_target: 'sdks:java:io:expansion-service:shadowJar'
+      destinations:
+        python: 'apache_beam/io'
+      ignore:
+        - 'beam:schematransform:org.apache.beam:skip_me:v1':
+
+
+  We use :class:`ExternalSchemaTransformProvider` to discover external
+  transforms. Then, we extract the necessary details of each transform and
+  compile them into a new YAML file, which is later used to generate wrappers.
+  """
+  from apache_beam.transforms.external import BeamJarExpansionService
+  from apache_beam.transforms.external_schematransform_provider import 
STANDARD_URN_PATTERN
+  from apache_beam.transforms.external_schematransform_provider import 
ExternalSchemaTransform
+  from apache_beam.transforms.external_schematransform_provider import 
ExternalSchemaTransformProvider
+
+  transform_list: List[Dict[str, Any]] = []
+
+  with open(input_services) as f:
+    services = yaml.safe_load(f)
+  for service in services:
+    target = service['gradle_target']
+
+    if "destinations" not in service:
+      raise ValueError(
+          f"Expansion service with target [{target}] does not "
+          "specify any default destinations.")
+    service_destinations: Dict[str, str] = service['destinations']
+    for sdk in service_destinations.keys():
+      if sdk not in SUPPORTED_SDK_DESTINATIONS:
+        raise ValueError(
+            f"Service with target {target} specifies a "
+            f"destination for an invalid SDK: {sdk}. The "
+            f"supported SDKs are {SUPPORTED_SDK_DESTINATIONS}")
+
+    # get transforms to skip, if any
+    ignore = service.get('ignore', [])
+
+    # use dynamic provider to discover and populate wrapper details
+    provider = ExternalSchemaTransformProvider(BeamJarExpansionService(target))
+    discovered: Dict[str, ExternalSchemaTransform] = provider.get_all()
+    for identifier, wrapper in discovered.items():
+      if identifier in ignore:
+        continue
+      # We infer the destination from the URN and service destination.
+      # For example, the Java IO expansion service defaults to Python
+      # package apache_beam/io. Kafka Write is a transform in this service
+      # with URN beam:schematransform:org.apache.beam:kafka_write:v1
+      # In this case, we infer the destination apache_beam/io/kafka_write
+      functionality_identifier = re.match(STANDARD_URN_PATTERN,
+                                          identifier).groups()[0]
+      destinations = {
+          sdk: f"{destination}/{functionality_identifier}"
+          for sdk,
+          destination in service_destinations.items()
+      }
+      name = wrapper.__name__
+
+      # apply any modifications
+      modified_transform = {}
+      if 'transforms' in service and identifier in service['transforms']:
+        modified_transform = service['transforms'][identifier]
+      if 'name' in modified_transform:
+        name = modified_transform['name']  # override the name
+      if 'destinations' in modified_transform:
+        for sdk, destination in modified_transform['destinations'].items():
+          if sdk not in SUPPORTED_SDK_DESTINATIONS:
+            raise ValueError(
+                f"Identifier {identifier} specifies a destination for "
+                f"an invalid SDK: [{sdk}]. The supported SDKs "
+                f"are {SUPPORTED_SDK_DESTINATIONS}")
+          destinations[sdk] = destination  # override the destination
+
+      fields = {}
+      for param in wrapper.configuration_schema.values():
+        (tp, nullable) = prepare_type(param.type)
+
+        field_info = {
+            'type': str(tp),
+            'description': param.description,
+            'nullable': nullable
+        }
+        fields[param.original_name] = field_info
+
+      transform = {
+          'identifier': identifier,
+          'name': name,
+          'destinations': destinations,
+          'default_service': target,
+          'fields': fields,
+          'description': wrapper.description
+      }
+      transform_list.append(transform)
+
+  with open(output_file, 'w') as f:
+    f.write(LICENSE_HEADER.lstrip())
+    f.write(
+        "# NOTE: This file is autogenerated and should "
+        "not be edited by hand.\n")
+    dt = datetime.datetime.now().date()
+    f.write(f"# Last updated on: {dt}\n\n")
+    yaml.dump(transform_list, f)
+  logging.info("Successfully wrote transform configs to file: %s", output_file)
+
+
+def prepare_type(tp):
+  nullable = False
+  # if it's typing.Optional[...], unwrap to avoid redundancy. Nullability is
+  # communicated in the wrapper's constructor
+  if (typing.get_origin(tp) is Union and
+          type(None) in typing.get_args(tp)):
+    nullable = True
+    # only unwrap if it's a single nullable type. if the type is truly a union
+    # of multiple types, leave it alone.
+    args = typing.get_args(tp)
+    if len(args) == 2:
+      tp = list(filter(lambda t: t is not type(None), args))[0]
+
+  # some logic for setting the type's name to look pretty
+  # TODO(ahmedabu98): Make this more generic to support other remote SDKs
+  # Potentially use Runner API types
+  if tp.__module__ == 'builtins':
+    tp = tp.__name__
+  elif tp.__module__ == 'typing':
+    tp = str(tp).replace("typing.", "")
+  elif tp.__module__ == 'numpy':
+    tp = "%s.%s" % (tp.__module__, tp.__name__)
+
+  return (tp, nullable)
+
+def camel_case_to_snake_case(string):
+  """Convert camelCase to snake_case"""
+  arr = []
+  word = []
+  for i, n in enumerate(string):
+    # If seeing an upper letter after a lower letter, we just witnessed a word
+    # If seeing an upper letter and the next letter is lower, we may have just
+    # witnessed an all caps word
+    if n.isupper() and ((i > 0 and string[i - 1].islower()) or
+                        (i + 1 < len(string) and string[i + 1].islower())):
+      arr.append(''.join(word))
+      word = [n.lower()]
+    else:
+      word.append(n.lower())
+  arr.append(''.join(word))
+  return '_'.join(arr).strip('_')
+
+
+def get_wrappers_from_transform_configs(config_file) -> Dict[str, List[str]]:
+  """
+  Generates code for external transform wrapper classes (subclasses of
+  :class:`ExternalSchemaTransform`).
+
+  Takes a YAML file containing a list of SchemaTransform configurations. For
+  each configuration, the code for a wrapper class is generated, along with any
+  documentation that may be included.
+
+  Each configuration must include a destination file that the generated class
+  will be written to.
+
+  Returns the generated classes, grouped by destination.
+  """
+  env = Environment(loader=FileSystemLoader(PYTHON_SDK_ROOT))
+  python_wrapper_template = env.get_template("python_xlang_wrapper.template")
+
+  # maintain a list of wrappers to write in each file. if modified destinations
+  # are used, we may end up with multiple wrappers in one file.
+  destinations: Dict[str, List[str]] = {}
+
+  with open(config_file) as f:
+    transforms = yaml.safe_load(f)
+    for config in transforms:
+      default_service = config['default_service']
+      description = config['description']
+      destination = config['destinations']['python']
+      name = config['name']
+      fields = config['fields']
+      identifier = config['identifier']
+
+      parameters = []
+      for param, info in fields.items():
+        pythonic_name = camel_case_to_snake_case(param)
+        param_details = {
+            "name": pythonic_name,
+            "type": info['type'],
+            "description": info['description'],
+        }
+
+        if info['nullable']:
+          param_details["default"] = None
+        parameters.append(param_details)
+
+      # Python syntax requires function definitions to have
+      # non-default parameters first
+      parameters = sorted(parameters, key=lambda p: 'default' in p)
+      default_service = f"BeamJarExpansionService(\"{default_service}\")"
+
+      python_wrapper_class = python_wrapper_template.render(
+          class_name=name,
+          identifier=identifier,
+          parameters=parameters,
+          description=description,
+          default_expansion_service=default_service)
+
+      if destination not in destinations:
+        destinations[destination] = []
+      destinations[destination].append(python_wrapper_class)
+
+  return destinations
+
+
+def write_wrappers_to_destinations(grouped_wrappers: Dict[str, List[str]]):
+  """
+  Takes a dictionary of generated wrapper code, grouped by destination.
+  For each destination, create a new file containing the respective wrapper
+  classes. Each file includes the Apache License header and relevant imports.
+  Note: the Jinja template should already follow linting and formatting rules.
+  """
+  written_files = []
+  for dest, wrappers in grouped_wrappers.items():
+    dest += PYTHON_SUFFIX
+    absolute_dest = os.path.join(PYTHON_SDK_ROOT, *dest.split('/'))
+    with open(absolute_dest, "w") as file:
+      file.write(LICENSE_HEADER.lstrip())
+      file.write(
+          MARKER + "# and should not be edited by hand.\n"
+          "# Refer to the utility at gen_xlang_wrappers.py for more info.\n\n")
+      file.write("# pylint:disable=line-too-long\n\n")
+      file.write(
+          "from apache_beam.transforms.external import "
+          "BeamJarExpansionService\n"
+          "from apache_beam.transforms.external_schematransform_provider "
+          "import ExternalSchemaTransform\n")
+      for wrapper in wrappers:
+        file.write(wrapper + "\n")
+    written_files.append(absolute_dest)
+
+  # We only make a best effort attempt to format with yapf because not all

Review Comment:
   Yup that works!



##########
sdks/python/standard_external_transforms.yaml:
##########
@@ -0,0 +1,725 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# NOTE: This file is autogenerated and should not be edited by hand.

Review Comment:
   Done



##########
sdks/python/gen_xlang_wrappers.py:
##########
@@ -0,0 +1,423 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Generates Python wrappers for external transforms (specifically,
+SchemaTransforms)
+"""
+
+import argparse
+import datetime
+import logging
+import os
+import re
+import typing
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Union
+import subprocess
+
+import yaml
+from jinja2 import Environment
+from jinja2 import FileSystemLoader
+
+from gen_protos import LICENSE_HEADER
+from gen_protos import PYTHON_SDK_ROOT
+from gen_protos import find_by_ext
+
+SUPPORTED_SDK_DESTINATIONS = ['python']
+PYTHON_SUFFIX = "_et.py"
+MARKER = "# NOTE: This file contains autogenerated external transform(s)\n"
+
+
+def generate_transforms_config(input_services, output_file):
+  """
+  Generates a YAML file containing a list of transform configurations.
+
+  Takes an input YAML file containing a list of expansion service gradle
+  targets. Each service must provide a `destinations` field that specifies the
+  default package (relative path) that generated wrappers should be written
+  under. A default destination is specified for each SDK, like so::
+
+    - gradle_target: 'sdks:java:io:expansion-service:shadowJar'
+      destinations:
+        python: 'apache_beam/io'
+
+
+  Each service may also specify modifications for particular transform.
+  Currently, one can modify the generated wrapper's name and destination file:
+
+    - By default, the transform's identifier is used to generate the wrapper
+      class name. This can be overriden by manually providing a name.
+    - By default, generated wrappers are written to files within the package
+      provided by the expansion service. This can be overridden by manually
+      providing a relative file path.
+
+  See the following example for what such modifications can look like::
+
+    - gradle_target: 'sdks:java:io:expansion-service:shadowJar'
+      destinations:
+        python: 'apache_beam/io'
+      transforms:
+        'beam:schematransform:org.apache.beam:my_transform:v1':
+          name: 'MyCustomTransformName'
+          destinations:
+            python: 'apache_beam/io/gcp/my_custom_module'
+
+  For the above example, we would take the transform with identifier
+  `beam:schematransform:org.apache.beam:my_transform:v1` and by default infer
+  a wrapper class name of `MyTransform` and write the generated code to
+  the module `apache_beam/io/my_transform_et.py`. With the modifications, we
+  instead write the wrapper to `apache_beam/io/gcp/my_custom_module_et.py` and
+  name the class `MyCustomTransformName`.
+
+  Note: we add the prefix `"_et.py"` to the module name so that we can find
+  these generated files later (e.g. to tell Git to ignore them, and to
+  delete them when needed)
+
+  To ignore a particular transform, simply list its identifier in the `ignore`
+  field, like so::
+
+    - gradle_target: 'sdks:java:io:expansion-service:shadowJar'
+      destinations:
+        python: 'apache_beam/io'
+      ignore:
+        - 'beam:schematransform:org.apache.beam:skip_me:v1':
+
+
+  We use :class:`ExternalSchemaTransformProvider` to discover external
+  transforms. Then, we extract the necessary details of each transform and
+  compile them into a new YAML file, which is later used to generate wrappers.
+  """
+  from apache_beam.transforms.external import BeamJarExpansionService
+  from apache_beam.transforms.external_schematransform_provider import 
STANDARD_URN_PATTERN
+  from apache_beam.transforms.external_schematransform_provider import 
ExternalSchemaTransform
+  from apache_beam.transforms.external_schematransform_provider import 
ExternalSchemaTransformProvider
+
+  transform_list: List[Dict[str, Any]] = []
+
+  with open(input_services) as f:
+    services = yaml.safe_load(f)
+  for service in services:
+    target = service['gradle_target']
+
+    # validate expansion service destinations
+    if "destinations" not in service:
+      raise ValueError(
+          f"Expansion service with target [{target}] does not "
+          "specify any default destinations.")
+    service_destinations: Dict[str, str] = service['destinations']
+    for sdk in service_destinations.keys():
+      if sdk not in SUPPORTED_SDK_DESTINATIONS:
+        raise ValueError(
+            f"Service with target {target} specifies a "
+            f"destination for an invalid SDK: {sdk}. The "
+            f"supported SDKs are {SUPPORTED_SDK_DESTINATIONS}")
+
+    # get transforms to skip, if any
+    ignore = service.get('ignore', [])
+
+    # use dynamic provider to discover and populate wrapper details
+    provider = ExternalSchemaTransformProvider(BeamJarExpansionService(target))
+    discovered: Dict[str, ExternalSchemaTransform] = provider.get_all()
+    for identifier, wrapper in discovered.items():
+      if identifier in ignore:
+        continue
+      # We infer the destination from the URN and service destination.
+      # For example, the Java IO expansion service defaults to Python
+      # package apache_beam/io. Kafka Write is a transform in this service
+      # with URN beam:schematransform:org.apache.beam:kafka_write:v1
+      # In this case, we infer the destination apache_beam/io/kafka_write
+      functionality_identifier = re.match(STANDARD_URN_PATTERN,
+                                          identifier).groups()[0]
+      destinations = {
+          sdk: f"{destination}/{functionality_identifier}"
+          for sdk,
+          destination in service_destinations.items()
+      }
+      name = wrapper.__name__
+
+      # apply any modifications
+      modified_transform = {}
+      if 'transforms' in service and identifier in service['transforms']:
+        modified_transform = service['transforms'][identifier]
+      if 'name' in modified_transform:
+        name = modified_transform['name']  # override the name
+      if 'destinations' in modified_transform:
+        for sdk, destination in modified_transform['destinations'].items():
+          if sdk not in SUPPORTED_SDK_DESTINATIONS:
+            raise ValueError(
+                f"Identifier {identifier} specifies a destination for "
+                f"an invalid SDK: [{sdk}]. The supported SDKs "
+                f"are {SUPPORTED_SDK_DESTINATIONS}")
+          destinations[sdk] = destination  # override the destination
+
+      # prepare information about parameters
+      fields = {}
+      for param in wrapper.configuration_schema.values():
+        tp = param.type
+        nullable = False
+        # if type is typing.Optional[...]
+        if (typing.get_origin(tp) is Union and
+            type(None) in typing.get_args(tp)):
+          nullable = True
+          # unwrap and set type to the original
+          args = typing.get_args(tp)
+          if len(args) == 2:
+            tp = args[0]
+
+        # some logic for properly setting the type name
+        # TODO(ahmedabu98): Find a way to make this logic more generic when
+        # supporting other remote SDKs. Potentially use Runner API types
+        if tp.__module__ == 'builtins':
+          tp = tp.__name__
+        elif tp.__module__ == 'typing':
+          tp = str(tp).replace("typing.", "")
+        elif tp.__module__ == 'numpy':
+          tp = "%s.%s" % (tp.__module__, tp.__name__)
+        field_info = {

Review Comment:
   Adding a unit test



##########
sdks/python/gen_xlang_wrappers.py:
##########
@@ -0,0 +1,423 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Generates Python wrappers for external transforms (specifically,
+SchemaTransforms)
+"""
+
+import argparse
+import datetime
+import logging
+import os
+import re
+import typing
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Union
+import subprocess
+
+import yaml
+from jinja2 import Environment
+from jinja2 import FileSystemLoader
+
+from gen_protos import LICENSE_HEADER
+from gen_protos import PYTHON_SDK_ROOT
+from gen_protos import find_by_ext
+
+SUPPORTED_SDK_DESTINATIONS = ['python']
+PYTHON_SUFFIX = "_et.py"
+MARKER = "# NOTE: This file contains autogenerated external transform(s)\n"
+
+
+def generate_transforms_config(input_services, output_file):
+  """
+  Generates a YAML file containing a list of transform configurations.
+
+  Takes an input YAML file containing a list of expansion service gradle
+  targets. Each service must provide a `destinations` field that specifies the
+  default package (relative path) that generated wrappers should be written
+  under. A default destination is specified for each SDK, like so::
+
+    - gradle_target: 'sdks:java:io:expansion-service:shadowJar'
+      destinations:
+        python: 'apache_beam/io'
+
+
+  Each service may also specify modifications for particular transform.
+  Currently, one can modify the generated wrapper's name and destination file:
+
+    - By default, the transform's identifier is used to generate the wrapper
+      class name. This can be overriden by manually providing a name.
+    - By default, generated wrappers are written to files within the package
+      provided by the expansion service. This can be overridden by manually
+      providing a relative file path.
+
+  See the following example for what such modifications can look like::
+
+    - gradle_target: 'sdks:java:io:expansion-service:shadowJar'
+      destinations:
+        python: 'apache_beam/io'
+      transforms:
+        'beam:schematransform:org.apache.beam:my_transform:v1':
+          name: 'MyCustomTransformName'
+          destinations:
+            python: 'apache_beam/io/gcp/my_custom_module'
+
+  For the above example, we would take the transform with identifier
+  `beam:schematransform:org.apache.beam:my_transform:v1` and by default infer
+  a wrapper class name of `MyTransform` and write the generated code to
+  the module `apache_beam/io/my_transform_et.py`. With the modifications, we
+  instead write the wrapper to `apache_beam/io/gcp/my_custom_module_et.py` and
+  name the class `MyCustomTransformName`.
+
+  Note: we add the prefix `"_et.py"` to the module name so that we can find
+  these generated files later (e.g. to tell Git to ignore them, and to
+  delete them when needed)
+
+  To ignore a particular transform, simply list its identifier in the `ignore`
+  field, like so::
+
+    - gradle_target: 'sdks:java:io:expansion-service:shadowJar'
+      destinations:
+        python: 'apache_beam/io'
+      ignore:
+        - 'beam:schematransform:org.apache.beam:skip_me:v1':
+
+
+  We use :class:`ExternalSchemaTransformProvider` to discover external
+  transforms. Then, we extract the necessary details of each transform and
+  compile them into a new YAML file, which is later used to generate wrappers.
+  """
+  from apache_beam.transforms.external import BeamJarExpansionService
+  from apache_beam.transforms.external_schematransform_provider import 
STANDARD_URN_PATTERN
+  from apache_beam.transforms.external_schematransform_provider import 
ExternalSchemaTransform
+  from apache_beam.transforms.external_schematransform_provider import 
ExternalSchemaTransformProvider
+
+  transform_list: List[Dict[str, Any]] = []
+
+  with open(input_services) as f:
+    services = yaml.safe_load(f)
+  for service in services:
+    target = service['gradle_target']
+
+    # validate expansion service destinations
+    if "destinations" not in service:
+      raise ValueError(
+          f"Expansion service with target [{target}] does not "
+          "specify any default destinations.")
+    service_destinations: Dict[str, str] = service['destinations']
+    for sdk in service_destinations.keys():
+      if sdk not in SUPPORTED_SDK_DESTINATIONS:
+        raise ValueError(
+            f"Service with target {target} specifies a "
+            f"destination for an invalid SDK: {sdk}. The "
+            f"supported SDKs are {SUPPORTED_SDK_DESTINATIONS}")
+
+    # get transforms to skip, if any

Review Comment:
   Cleaned up some of those comments



##########
sdks/python/standard_expansion_services.yaml:
##########
@@ -0,0 +1,104 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# This file enumerates the standard Apache Beam expansion services.
+# Each service must specify a package destination for each supported SDK, which
+# is where generated wrappers will go by default.
+#
+# Individual transforms can modify their destination module as well as their
+# generated wrapper class name.
+#
+# Transform identifiers listed in the `ignore` field will be skipped.
+
+- gradle_target: 'sdks:java:io:expansion-service:shadowJar'

Review Comment:
   Yeah when we add support for other SDKs it will be used by them. I can move 
these configs to the `beam/sdks` directory, wdyt? 
   
   CC @robertwb 



##########
sdks/python/apache_beam/transforms/external_schematransform_provider_test.py:
##########
@@ -135,6 +153,317 @@ def test_run_generate_sequence(self):
       assert_that(numbers, equal_to([i for i in range(10)]))
 
 
[email protected]_io_java_expansion_service
[email protected](
+    run_script is None,
+    "Need access to gen_xlang_wrappers.py to run these tests")
+class AutoGenerationScriptTest(unittest.TestCase):
+  """
+  This class tests the generation and regeneration operations in
+  `sdks/python/gen_xlang_wrappers.py`.
+  """
+
+  # tests cases will use GenerateSequence
+  GEN_SEQ_IDENTIFIER = \
+    'beam:schematransform:org.apache.beam:generate_sequence:v1'
+
+  def setUp(self):
+    args = TestPipeline(is_integration_test=True).get_full_options_as_args()
+    runner = PipelineOptions(args).get_all_options()['runner']
+    if runner and "direct" not in runner.lower():
+      self.skipTest(
+          "It is sufficient to run this test in the DirectRunner "
+          "test suite only.")
+
+    self.test_dir_name = 'test_gen_script_%d_%s' % (
+        int(time.time()), secrets.token_hex(3))
+    self.test_dir = os.path.join(
+        os.path.abspath(os.path.dirname(__file__)), self.test_dir_name)
+    self.service_config_path = os.path.join(
+        self.test_dir, "test_expansion_service_config.yaml")
+    self.transform_config_path = os.path.join(
+        self.test_dir, "test_transform_config.yaml")
+    os.mkdir(self.test_dir)
+
+    self.assertTrue(
+        os.environ.get('EXPANSION_PORT'), "Expansion service port not found!")
+
+  def tearDown(self):
+    shutil.rmtree(self.test_dir, ignore_errors=False)
+
+  def test_script_workflow(self):
+    expansion_service_config = {
+        "gradle_target": 'sdks:java:io:expansion-service:shadowJar',
+        'destinations': {
+            'python': f'apache_beam/transforms/{self.test_dir_name}'
+        }
+    }
+    with open(self.service_config_path, 'w') as f:
+      yaml.dump([expansion_service_config], f)
+
+    # test that transform config YAML file is created
+    generate_transforms_config(
+        self.service_config_path, self.transform_config_path)
+    self.assertTrue(os.path.exists(self.transform_config_path))
+    expected_destination = \
+      f'apache_beam/transforms/{self.test_dir_name}/generate_sequence'
+    # test that transform config is populated correctly
+    with open(self.transform_config_path) as f:
+      transforms = yaml.safe_load(f)
+      gen_seq_config = None
+      for transform in transforms:
+        if transform['identifier'] == self.GEN_SEQ_IDENTIFIER:
+          gen_seq_config = transform
+      self.assertIsNotNone(gen_seq_config)
+      self.assertEqual(
+          gen_seq_config['default_service'],
+          expansion_service_config['gradle_target'])
+      self.assertEqual(gen_seq_config['name'], 'GenerateSequence')
+      self.assertEqual(
+          gen_seq_config['destinations']['python'], expected_destination)
+      self.assertIn("end", gen_seq_config['fields'])
+      self.assertIn("start", gen_seq_config['fields'])
+      self.assertIn("rate", gen_seq_config['fields'])
+
+    # test that the code for GenerateSequence is set to the right destination
+    grouped_wrappers = get_wrappers_from_transform_configs(
+        self.transform_config_path)
+    self.assertIn(expected_destination, grouped_wrappers)
+    # only the GenerateSequence wrapper is set to this destination
+    self.assertEqual(len(grouped_wrappers[expected_destination]), 1)
+
+    # test that the correct destination is created
+    write_wrappers_to_destinations(grouped_wrappers)
+    self.assertTrue(
+        os.path.exists(
+            os.path.join(self.test_dir, 'generate_sequence' + PYTHON_SUFFIX)))
+    # check the wrapper exists in this destination and has correct properties
+    generate_sequence_et = import_module(
+        expected_destination.replace('/', '.') + PYTHON_SUFFIX.rstrip('.py'))
+    self.assertTrue(hasattr(generate_sequence_et, 'GenerateSequence'))
+    self.assertTrue(
+        isinstance(
+            generate_sequence_et.GenerateSequence(start=0),
+            ExternalSchemaTransform))
+    self.assertEqual(
+        generate_sequence_et.GenerateSequence.identifier,
+        self.GEN_SEQ_IDENTIFIER)
+
+    # test that we successfully delete the destination
+    delete_generated_files(self.test_dir)
+    self.assertFalse(
+        os.path.exists(
+            os.path.join(self.test_dir, 'generate_sequence' + PYTHON_SUFFIX)))
+
+  def test_script_workflow_with_modified_transforms(self):
+    modified_name = 'ModifiedSequence'
+    modified_dest = \
+      f'apache_beam/transforms/{self.test_dir_name}/new_dir/modified_gen_seq'
+    expansion_service_config = {
+        "gradle_target": 'sdks:java:io:expansion-service:shadowJar',
+        'destinations': {
+            'python': f'apache_beam/transforms/{self.test_dir_name}'
+        },
+        'transforms': {
+            'beam:schematransform:org.apache.beam:generate_sequence:v1': {
+                'name': modified_name,
+                'destinations': {
+                    'python': modified_dest
+                }
+            }
+        }
+    }
+    os.mkdir(os.path.join(self.test_dir, 'new_dir'))
+
+    with open(self.service_config_path, 'w') as f:
+      yaml.dump([expansion_service_config], f)
+
+    # test that transform config YAML file is successfully created
+    generate_transforms_config(
+        self.service_config_path, self.transform_config_path)
+    self.assertTrue(os.path.exists(self.transform_config_path))
+
+    # test that transform config is populated correctly
+    with open(self.transform_config_path) as f:
+      transforms = yaml.safe_load(f)
+      gen_seq_config = None
+      for transform in transforms:
+        if transform['identifier'] == self.GEN_SEQ_IDENTIFIER:
+          gen_seq_config = transform
+      self.assertIsNotNone(gen_seq_config)
+      self.assertEqual(
+          gen_seq_config['default_service'],
+          expansion_service_config['gradle_target'])
+      self.assertEqual(gen_seq_config['name'], modified_name)
+      self.assertEqual(gen_seq_config['destinations']['python'], modified_dest)
+
+    # test that the code for 'ModifiedSequence' is set to the right destination
+    grouped_wrappers = get_wrappers_from_transform_configs(
+        self.transform_config_path)
+    self.assertIn(modified_dest, grouped_wrappers)
+    self.assertIn(modified_name, grouped_wrappers[modified_dest][0])
+    # only one wrapper is set to this destination
+    self.assertEqual(len(grouped_wrappers[modified_dest]), 1)
+
+    # test that the modified destination is successfully created
+    write_wrappers_to_destinations(grouped_wrappers)
+    self.assertTrue(
+        os.path.exists(
+            os.path.join(
+                self.test_dir, 'new_dir', 'modified_gen_seq' + PYTHON_SUFFIX)))
+    # check the modified wrapper exists in the modified destination
+    # and check it has the correct properties
+    modified_gen_seq_et = import_module(
+        modified_dest.replace('/', '.') + PYTHON_SUFFIX.rstrip('.py'))
+    self.assertTrue(
+        isinstance(
+            modified_gen_seq_et.ModifiedSequence(start=0),
+            ExternalSchemaTransform))
+    self.assertEqual(
+        modified_gen_seq_et.ModifiedSequence.identifier,
+        self.GEN_SEQ_IDENTIFIER)
+
+    # test that we successfully delete the destination
+    delete_generated_files(self.test_dir)
+    self.assertFalse(
+        os.path.exists(
+            os.path.join(
+                self.test_dir, 'new_dir', 'modified_gen_seq' + PYTHON_SUFFIX)))
+
+  def test_script_workflow_with_multiple_wrappers_same_destination(self):
+    modified_dest = f'apache_beam/transforms/{self.test_dir_name}/my_wrappers'
+    expansion_service_config = {
+        "gradle_target": 'sdks:java:io:expansion-service:shadowJar',
+        'destinations': {
+            'python': f'apache_beam/transforms/{self.test_dir_name}'
+        },
+        'transforms': {
+            'beam:schematransform:org.apache.beam:generate_sequence:v1': {
+                'destinations': {
+                    'python': modified_dest
+                }
+            },
+            'beam:schematransform:org.apache.beam:kafka_read:v1': {
+                'destinations': {
+                    'python': modified_dest
+                }
+            },
+            'beam:schematransform:org.apache.beam:kafka_write:v1': {
+                'destinations': {
+                    'python': modified_dest
+                }
+            }
+        }
+    }
+
+    with open(self.service_config_path, 'w') as f:
+      yaml.dump([expansion_service_config], f)
+
+    # test that transform config YAML file is successfully created
+    generate_transforms_config(
+        self.service_config_path, self.transform_config_path)
+    self.assertTrue(os.path.exists(self.transform_config_path))
+
+    # test that our transform configs have the same destination
+    with open(self.transform_config_path) as f:
+      transforms = yaml.safe_load(f)
+      for transform in transforms:
+        if transform['identifier'] in expansion_service_config['transforms']:
+          self.assertEqual(transform['destinations']['python'], modified_dest)
+
+    grouped_wrappers = get_wrappers_from_transform_configs(
+        self.transform_config_path)
+    # check all 3 wrappers are set to this destination
+    self.assertEqual(len(grouped_wrappers[modified_dest]), 3)
+
+    # write wrappers to destination then check that all 3 exist there
+    write_wrappers_to_destinations(grouped_wrappers)
+    my_wrappers_et = import_module(
+        modified_dest.replace('/', '.') + PYTHON_SUFFIX.rstrip('.py'))
+    self.assertTrue(hasattr(my_wrappers_et, 'GenerateSequence'))
+    self.assertTrue(hasattr(my_wrappers_et, 'KafkaWrite'))
+    self.assertTrue(hasattr(my_wrappers_et, 'KafkaRead'))
+
+  def test_script_workflow_with_ignored_transform(self):
+    expansion_service_config = {
+        "gradle_target": 'sdks:java:io:expansion-service:shadowJar',
+        'destinations': {
+            'python': f'apache_beam/transforms/{self.test_dir_name}'
+        },
+        'ignore': ['beam:schematransform:org.apache.beam:generate_sequence:v1']
+    }
+
+    with open(self.service_config_path, 'w') as f:
+      yaml.dump([expansion_service_config], f)
+
+    generate_transforms_config(
+        self.service_config_path, self.transform_config_path)
+
+    # test that transform config is populated correctly

Review Comment:
   Will do, thanks



##########
sdks/python/setup.py:
##########
@@ -359,7 +403,7 @@ def get_portability_package_data():
               'testcontainers[mysql]>=3.0.3,<4.0.0',
               'cryptography>=41.0.2',
               'hypothesis>5.0.0,<=7.0.0',
-              'pyyaml>=3.12,<7.0.0',
+              'jinja2>=2.7.1,<4.0.0'

Review Comment:
   We have multiple tests that run the script (which uses jinja), so we need it 
in both



##########
sdks/python/gen_xlang_wrappers.py:
##########
@@ -0,0 +1,423 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Generates Python wrappers for external transforms (specifically,
+SchemaTransforms)
+"""
+
+import argparse
+import datetime
+import logging
+import os
+import re
+import typing
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Union
+import subprocess
+
+import yaml
+from jinja2 import Environment
+from jinja2 import FileSystemLoader
+
+from gen_protos import LICENSE_HEADER
+from gen_protos import PYTHON_SDK_ROOT
+from gen_protos import find_by_ext
+
+SUPPORTED_SDK_DESTINATIONS = ['python']
+PYTHON_SUFFIX = "_et.py"
+MARKER = "# NOTE: This file contains autogenerated external transform(s)\n"
+
+
+def generate_transforms_config(input_services, output_file):
+  """
+  Generates a YAML file containing a list of transform configurations.
+
+  Takes an input YAML file containing a list of expansion service gradle
+  targets. Each service must provide a `destinations` field that specifies the
+  default package (relative path) that generated wrappers should be written
+  under. A default destination is specified for each SDK, like so::
+
+    - gradle_target: 'sdks:java:io:expansion-service:shadowJar'
+      destinations:
+        python: 'apache_beam/io'
+
+
+  Each service may also specify modifications for particular transform.
+  Currently, one can modify the generated wrapper's name and destination file:
+
+    - By default, the transform's identifier is used to generate the wrapper
+      class name. This can be overriden by manually providing a name.
+    - By default, generated wrappers are written to files within the package
+      provided by the expansion service. This can be overridden by manually
+      providing a relative file path.
+
+  See the following example for what such modifications can look like::
+
+    - gradle_target: 'sdks:java:io:expansion-service:shadowJar'
+      destinations:
+        python: 'apache_beam/io'
+      transforms:
+        'beam:schematransform:org.apache.beam:my_transform:v1':
+          name: 'MyCustomTransformName'
+          destinations:
+            python: 'apache_beam/io/gcp/my_custom_module'
+
+  For the above example, we would take the transform with identifier
+  `beam:schematransform:org.apache.beam:my_transform:v1` and by default infer
+  a wrapper class name of `MyTransform` and write the generated code to
+  the module `apache_beam/io/my_transform_et.py`. With the modifications, we
+  instead write the wrapper to `apache_beam/io/gcp/my_custom_module_et.py` and
+  name the class `MyCustomTransformName`.
+
+  Note: we add the prefix `"_et.py"` to the module name so that we can find
+  these generated files later (e.g. to tell Git to ignore them, and to
+  delete them when needed)
+
+  To ignore a particular transform, simply list its identifier in the `ignore`
+  field, like so::
+
+    - gradle_target: 'sdks:java:io:expansion-service:shadowJar'
+      destinations:
+        python: 'apache_beam/io'
+      ignore:
+        - 'beam:schematransform:org.apache.beam:skip_me:v1':
+
+
+  We use :class:`ExternalSchemaTransformProvider` to discover external
+  transforms. Then, we extract the necessary details of each transform and
+  compile them into a new YAML file, which is later used to generate wrappers.
+  """
+  from apache_beam.transforms.external import BeamJarExpansionService
+  from apache_beam.transforms.external_schematransform_provider import 
STANDARD_URN_PATTERN
+  from apache_beam.transforms.external_schematransform_provider import 
ExternalSchemaTransform
+  from apache_beam.transforms.external_schematransform_provider import 
ExternalSchemaTransformProvider
+
+  transform_list: List[Dict[str, Any]] = []
+
+  with open(input_services) as f:
+    services = yaml.safe_load(f)
+  for service in services:
+    target = service['gradle_target']
+
+    # validate expansion service destinations
+    if "destinations" not in service:
+      raise ValueError(
+          f"Expansion service with target [{target}] does not "
+          "specify any default destinations.")
+    service_destinations: Dict[str, str] = service['destinations']
+    for sdk in service_destinations.keys():
+      if sdk not in SUPPORTED_SDK_DESTINATIONS:
+        raise ValueError(
+            f"Service with target {target} specifies a "
+            f"destination for an invalid SDK: {sdk}. The "
+            f"supported SDKs are {SUPPORTED_SDK_DESTINATIONS}")
+
+    # get transforms to skip, if any
+    ignore = service.get('ignore', [])
+
+    # use dynamic provider to discover and populate wrapper details
+    provider = ExternalSchemaTransformProvider(BeamJarExpansionService(target))
+    discovered: Dict[str, ExternalSchemaTransform] = provider.get_all()
+    for identifier, wrapper in discovered.items():
+      if identifier in ignore:
+        continue
+      # We infer the destination from the URN and service destination.
+      # For example, the Java IO expansion service defaults to Python
+      # package apache_beam/io. Kafka Write is a transform in this service
+      # with URN beam:schematransform:org.apache.beam:kafka_write:v1
+      # In this case, we infer the destination apache_beam/io/kafka_write
+      functionality_identifier = re.match(STANDARD_URN_PATTERN,
+                                          identifier).groups()[0]
+      destinations = {
+          sdk: f"{destination}/{functionality_identifier}"
+          for sdk,
+          destination in service_destinations.items()
+      }
+      name = wrapper.__name__
+
+      # apply any modifications
+      modified_transform = {}
+      if 'transforms' in service and identifier in service['transforms']:
+        modified_transform = service['transforms'][identifier]
+      if 'name' in modified_transform:
+        name = modified_transform['name']  # override the name
+      if 'destinations' in modified_transform:
+        for sdk, destination in modified_transform['destinations'].items():
+          if sdk not in SUPPORTED_SDK_DESTINATIONS:
+            raise ValueError(
+                f"Identifier {identifier} specifies a destination for "
+                f"an invalid SDK: [{sdk}]. The supported SDKs "
+                f"are {SUPPORTED_SDK_DESTINATIONS}")
+          destinations[sdk] = destination  # override the destination
+
+      # prepare information about parameters
+      fields = {}
+      for param in wrapper.configuration_schema.values():
+        tp = param.type
+        nullable = False
+        # if type is typing.Optional[...]
+        if (typing.get_origin(tp) is Union and
+            type(None) in typing.get_args(tp)):
+          nullable = True
+          # unwrap and set type to the original
+          args = typing.get_args(tp)
+          if len(args) == 2:
+            tp = args[0]
+
+        # some logic for properly setting the type name
+        # TODO(ahmedabu98): Find a way to make this logic more generic when
+        # supporting other remote SDKs. Potentially use Runner API types
+        if tp.__module__ == 'builtins':
+          tp = tp.__name__
+        elif tp.__module__ == 'typing':
+          tp = str(tp).replace("typing.", "")
+        elif tp.__module__ == 'numpy':
+          tp = "%s.%s" % (tp.__module__, tp.__name__)
+        field_info = {
+            'type': str(tp),
+            'description': param.description,
+            'nullable': nullable
+        }
+        fields[param.original_name] = field_info
+
+      transform = {
+          'identifier': identifier,
+          'name': name,
+          'destinations': destinations,
+          'default_service': target,
+          'fields': fields,
+          'description': wrapper.description
+      }
+      transform_list.append(transform)
+
+  with open(output_file, 'w') as f:
+    f.write(LICENSE_HEADER.lstrip())
+    f.write(
+        "# NOTE: This file is autogenerated and should "
+        "not be edited by hand.\n")
+    dt = datetime.datetime.now().date()
+    f.write(f"# Last updated on: {dt}\n\n")
+    yaml.dump(transform_list, f)
+
+
+def camel_case_to_snake_case(string):

Review Comment:
   I had trouble with defining it only in one place because this script exists 
outside of the `apache_beam` package. This module can't import the script and 
the script can't import any module in `apache_beam`.



##########
sdks/python/standard_expansion_services.yaml:
##########
@@ -0,0 +1,104 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# This file enumerates the standard Apache Beam expansion services.
+# Each service must specify a package destination for each supported SDK, which
+# is where generated wrappers will go by default.
+#
+# Individual transforms can modify their destination module as well as their
+# generated wrapper class name.
+#
+# Transform identifiers listed in the `ignore` field will be skipped.
+
+- gradle_target: 'sdks:java:io:expansion-service:shadowJar'
+  destinations:
+    python: 'apache_beam/io'
+  transforms:
+    'beam:schematransform:org.apache.beam:kafka_write:v1':
+      name: 'WriteToKafka'
+      destinations:
+        python: 'apache_beam/io/kafka'
+    'beam:schematransform:org.apache.beam:kafka_read:v1':
+      name: 'ReadFromKafka'
+      destinations:
+        python: 'apache_beam/io/kafka'
+
+- gradle_target: 
'sdks:java:io:google-cloud-platform:expansion-service:shadowJar'
+  destinations:
+    python: 'apache_beam/io/gcp'
+  transforms:
+    'beam:schematransform:org.apache.beam:bigquery_storage_write:v2':
+      name: 'StorageWriteToBigQuery'
+      destinations:
+        python: 'apache_beam/io/gcp/bigquery'
+    'beam:schematransform:org.apache.beam:bigquery_storage_read:v1':
+      name: 'StorageReadFromBigQuery'
+      destinations:
+        python: 'apache_beam/io/gcp/bigquery'
+    'beam:schematransform:org.apache.beam:bigquery_fileloads_write:v1':
+      name: 'FileLoadsToBigQuery'
+      destinations:
+        python: 'apache_beam/io/gcp/bigquery'
+    'beam:schematransform:org.apache.beam:bigquery_export_read:v1':
+      name: 'ExportReadFromBigQuery'
+      destinations:
+        python: 'apache_beam/io/gcp/bigquery'
+    'beam:schematransform:org.apache.beam:bigtable_write:v1':
+      name: 'WriteToBigtable'
+      destinations:
+        python: 'apache_beam/io/gcp/bigtable'
+    'beam:schematransform:org.apache.beam:bigtable_read:v1':
+      name: 'ReadFromBigtable'
+      destinations:
+        python: 'apache_beam/io/gcp/bigtable'
+    'beam:schematransform:org.apache.beam:pubsub_read:v1':
+      name: 'ReadFromPubSub'
+      destinations:
+        python: 'apache_beam/io/gcp/pubsub'
+    'beam:schematransform:org.apache.beam:pubsub_write:v1':
+      name: 'WriteToPubSub'
+      destinations:
+        python: 'apache_beam/io/gcp/pubsub'
+    'beam:schematransform:org.apache.beam:pubsublite_read:v1':
+      name: 'ReadFromPubSubLite'
+      destinations:
+        python: 'apache_beam/io/gcp/pubsublite'
+    'beam:schematransform:org.apache.beam:pubsublite_write:v1':
+      name: 'WriteToPubSubLite'
+      destinations:
+        python: 'apache_beam/io/gcp/pubsublite'
+    'beam:schematransform:org.apache.beam:spanner_cdc_read:v1':
+      name: 'ReadFromSpannerChangeStreams'
+      destinations:
+        python: 'apache_beam/io/gcp/spanner'
+    'beam:schematransform:org.apache.beam:spanner_write:v1':
+      name: 'WriteToSpanner'
+      destinations:
+        python: 'apache_beam/io/gcp/spanner'
+    'beam:schematransform:org.apache.beam:jdbc_write:v1':
+      name: 'WriteToJdbc'
+      destinations:
+        python: 'apache_beam/io/jdbc'
+    'beam:schematransform:org.apache.beam:jdbc_read:v1':
+      name: 'ReadFromJdbc'
+      destinations:
+        python: 'apache_beam/io/jdbc'
+  ignore:

Review Comment:
   Good call, thanks



##########
sdks/python/standard_external_transforms.yaml:
##########
@@ -0,0 +1,712 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# NOTE: This file is autogenerated and should not be edited by hand.
+# Last updated on: 2024-01-22
+
+- default_service: sdks:java:io:expansion-service:shadowJar
+  description: 'Outputs a PCollection of Beam Rows, each containing a single 
INT64
+    number called "value". The count is produced from the given "start"value 
and either
+    up to the given "end" or until 2^63 - 1.
+
+    To produce an unbounded PCollection, simply do not specify an "end" value. 
Unbounded
+    sequences can specify a "rate" for output elements.
+
+    In all cases, the sequence of numbers is generated in parallel, so there 
is no
+    inherent ordering between the generated values'
+  destinations:
+    python: apache_beam/io/generate_sequence
+  fields:
+    end:
+      description: The maximum number to generate (exclusive). Will be an 
unbounded
+        sequence if left unspecified.
+      nullable: true
+      type: numpy.int64
+    rate:
+      description: Specifies the rate to generate a given number of elements 
per a
+        given number of seconds. Applicable only to unbounded sequences.
+      nullable: true
+      type: Row(seconds=typing.Union[numpy.int64, NoneType], elements=<class 
'numpy.int64'>)
+    start:
+      description: The minimum number to generate (inclusive).
+      nullable: false
+      type: numpy.int64
+  identifier: beam:schematransform:org.apache.beam:generate_sequence:v1
+  name: GenerateSequence
+- default_service: sdks:java:io:expansion-service:shadowJar
+  description: ''
+  destinations:
+    python: apache_beam/io/kafka
+  fields:
+    autoOffsetResetConfig:
+      description: "What to do when there is no initial offset in Kafka or if 
the\
+        \ current offset does not exist any more on the server. (1) earliest: 
automatically\

Review Comment:
   From what I can tell, it depends on whether it's a string wrapped in double 
quotes, single quotes, or nothing at all. The double 
   When it's wrapped in double quotes, long lines are broken up with the 
backslashes. I'm not sure how YAML picks what to wrap the string with.
   
   Nevertheless, this doesn't affect the final documentation outcomes. These 
backslashes aren't present in the generated wrappers.



##########
sdks/python/apache_beam/transforms/external_schematransform_provider_test.py:
##########
@@ -135,6 +153,317 @@ def test_run_generate_sequence(self):
       assert_that(numbers, equal_to([i for i in range(10)]))
 
 
[email protected]_io_java_expansion_service
[email protected](
+    run_script is None,
+    "Need access to gen_xlang_wrappers.py to run these tests")
+class AutoGenerationScriptTest(unittest.TestCase):
+  """
+  This class tests the generation and regeneration operations in
+  `sdks/python/gen_xlang_wrappers.py`.
+  """
+
+  # tests cases will use GenerateSequence
+  GEN_SEQ_IDENTIFIER = \
+    'beam:schematransform:org.apache.beam:generate_sequence:v1'
+
+  def setUp(self):
+    args = TestPipeline(is_integration_test=True).get_full_options_as_args()
+    runner = PipelineOptions(args).get_all_options()['runner']
+    if runner and "direct" not in runner.lower():
+      self.skipTest(
+          "It is sufficient to run this test in the DirectRunner "
+          "test suite only.")
+
+    self.test_dir_name = 'test_gen_script_%d_%s' % (
+        int(time.time()), secrets.token_hex(3))
+    self.test_dir = os.path.join(
+        os.path.abspath(os.path.dirname(__file__)), self.test_dir_name)
+    self.service_config_path = os.path.join(
+        self.test_dir, "test_expansion_service_config.yaml")
+    self.transform_config_path = os.path.join(
+        self.test_dir, "test_transform_config.yaml")
+    os.mkdir(self.test_dir)
+
+    self.assertTrue(
+        os.environ.get('EXPANSION_PORT'), "Expansion service port not found!")
+
+  def tearDown(self):
+    shutil.rmtree(self.test_dir, ignore_errors=False)
+
+  def test_script_workflow(self):
+    expansion_service_config = {
+        "gradle_target": 'sdks:java:io:expansion-service:shadowJar',
+        'destinations': {
+            'python': f'apache_beam/transforms/{self.test_dir_name}'
+        }
+    }
+    with open(self.service_config_path, 'w') as f:
+      yaml.dump([expansion_service_config], f)
+
+    # test that transform config YAML file is created
+    generate_transforms_config(
+        self.service_config_path, self.transform_config_path)
+    self.assertTrue(os.path.exists(self.transform_config_path))
+    expected_destination = \
+      f'apache_beam/transforms/{self.test_dir_name}/generate_sequence'
+    # test that transform config is populated correctly
+    with open(self.transform_config_path) as f:
+      transforms = yaml.safe_load(f)
+      gen_seq_config = None
+      for transform in transforms:
+        if transform['identifier'] == self.GEN_SEQ_IDENTIFIER:
+          gen_seq_config = transform
+      self.assertIsNotNone(gen_seq_config)
+      self.assertEqual(
+          gen_seq_config['default_service'],
+          expansion_service_config['gradle_target'])
+      self.assertEqual(gen_seq_config['name'], 'GenerateSequence')
+      self.assertEqual(
+          gen_seq_config['destinations']['python'], expected_destination)
+      self.assertIn("end", gen_seq_config['fields'])
+      self.assertIn("start", gen_seq_config['fields'])
+      self.assertIn("rate", gen_seq_config['fields'])
+
+    # test that the code for GenerateSequence is set to the right destination
+    grouped_wrappers = get_wrappers_from_transform_configs(
+        self.transform_config_path)
+    self.assertIn(expected_destination, grouped_wrappers)
+    # only the GenerateSequence wrapper is set to this destination
+    self.assertEqual(len(grouped_wrappers[expected_destination]), 1)
+
+    # test that the correct destination is created
+    write_wrappers_to_destinations(grouped_wrappers)
+    self.assertTrue(
+        os.path.exists(
+            os.path.join(self.test_dir, 'generate_sequence' + PYTHON_SUFFIX)))
+    # check the wrapper exists in this destination and has correct properties
+    generate_sequence_et = import_module(
+        expected_destination.replace('/', '.') + PYTHON_SUFFIX.rstrip('.py'))
+    self.assertTrue(hasattr(generate_sequence_et, 'GenerateSequence'))
+    self.assertTrue(
+        isinstance(
+            generate_sequence_et.GenerateSequence(start=0),
+            ExternalSchemaTransform))
+    self.assertEqual(
+        generate_sequence_et.GenerateSequence.identifier,
+        self.GEN_SEQ_IDENTIFIER)
+
+    # test that we successfully delete the destination
+    delete_generated_files(self.test_dir)
+    self.assertFalse(
+        os.path.exists(
+            os.path.join(self.test_dir, 'generate_sequence' + PYTHON_SUFFIX)))
+
+  def test_script_workflow_with_modified_transforms(self):
+    modified_name = 'ModifiedSequence'
+    modified_dest = \
+      f'apache_beam/transforms/{self.test_dir_name}/new_dir/modified_gen_seq'
+    expansion_service_config = {
+        "gradle_target": 'sdks:java:io:expansion-service:shadowJar',
+        'destinations': {
+            'python': f'apache_beam/transforms/{self.test_dir_name}'
+        },
+        'transforms': {
+            'beam:schematransform:org.apache.beam:generate_sequence:v1': {
+                'name': modified_name,
+                'destinations': {
+                    'python': modified_dest
+                }
+            }
+        }
+    }
+    os.mkdir(os.path.join(self.test_dir, 'new_dir'))
+
+    with open(self.service_config_path, 'w') as f:
+      yaml.dump([expansion_service_config], f)
+
+    # test that transform config YAML file is successfully created
+    generate_transforms_config(
+        self.service_config_path, self.transform_config_path)
+    self.assertTrue(os.path.exists(self.transform_config_path))
+
+    # test that transform config is populated correctly
+    with open(self.transform_config_path) as f:
+      transforms = yaml.safe_load(f)
+      gen_seq_config = None
+      for transform in transforms:
+        if transform['identifier'] == self.GEN_SEQ_IDENTIFIER:
+          gen_seq_config = transform
+      self.assertIsNotNone(gen_seq_config)
+      self.assertEqual(
+          gen_seq_config['default_service'],
+          expansion_service_config['gradle_target'])
+      self.assertEqual(gen_seq_config['name'], modified_name)
+      self.assertEqual(gen_seq_config['destinations']['python'], modified_dest)
+
+    # test that the code for 'ModifiedSequence' is set to the right destination
+    grouped_wrappers = get_wrappers_from_transform_configs(
+        self.transform_config_path)
+    self.assertIn(modified_dest, grouped_wrappers)
+    self.assertIn(modified_name, grouped_wrappers[modified_dest][0])
+    # only one wrapper is set to this destination
+    self.assertEqual(len(grouped_wrappers[modified_dest]), 1)
+
+    # test that the modified destination is successfully created
+    write_wrappers_to_destinations(grouped_wrappers)
+    self.assertTrue(
+        os.path.exists(
+            os.path.join(
+                self.test_dir, 'new_dir', 'modified_gen_seq' + PYTHON_SUFFIX)))
+    # check the modified wrapper exists in the modified destination
+    # and check it has the correct properties
+    modified_gen_seq_et = import_module(
+        modified_dest.replace('/', '.') + PYTHON_SUFFIX.rstrip('.py'))
+    self.assertTrue(
+        isinstance(
+            modified_gen_seq_et.ModifiedSequence(start=0),
+            ExternalSchemaTransform))
+    self.assertEqual(
+        modified_gen_seq_et.ModifiedSequence.identifier,
+        self.GEN_SEQ_IDENTIFIER)
+
+    # test that we successfully delete the destination
+    delete_generated_files(self.test_dir)
+    self.assertFalse(
+        os.path.exists(
+            os.path.join(
+                self.test_dir, 'new_dir', 'modified_gen_seq' + PYTHON_SUFFIX)))
+
+  def test_script_workflow_with_multiple_wrappers_same_destination(self):
+    modified_dest = f'apache_beam/transforms/{self.test_dir_name}/my_wrappers'
+    expansion_service_config = {
+        "gradle_target": 'sdks:java:io:expansion-service:shadowJar',
+        'destinations': {
+            'python': f'apache_beam/transforms/{self.test_dir_name}'
+        },
+        'transforms': {
+            'beam:schematransform:org.apache.beam:generate_sequence:v1': {
+                'destinations': {
+                    'python': modified_dest
+                }
+            },
+            'beam:schematransform:org.apache.beam:kafka_read:v1': {
+                'destinations': {
+                    'python': modified_dest
+                }
+            },
+            'beam:schematransform:org.apache.beam:kafka_write:v1': {
+                'destinations': {
+                    'python': modified_dest
+                }
+            }
+        }
+    }
+
+    with open(self.service_config_path, 'w') as f:
+      yaml.dump([expansion_service_config], f)
+
+    # test that transform config YAML file is successfully created
+    generate_transforms_config(
+        self.service_config_path, self.transform_config_path)
+    self.assertTrue(os.path.exists(self.transform_config_path))
+
+    # test that our transform configs have the same destination
+    with open(self.transform_config_path) as f:
+      transforms = yaml.safe_load(f)
+      for transform in transforms:
+        if transform['identifier'] in expansion_service_config['transforms']:
+          self.assertEqual(transform['destinations']['python'], modified_dest)
+
+    grouped_wrappers = get_wrappers_from_transform_configs(
+        self.transform_config_path)
+    # check all 3 wrappers are set to this destination
+    self.assertEqual(len(grouped_wrappers[modified_dest]), 3)
+
+    # write wrappers to destination then check that all 3 exist there
+    write_wrappers_to_destinations(grouped_wrappers)
+    my_wrappers_et = import_module(
+        modified_dest.replace('/', '.') + PYTHON_SUFFIX.rstrip('.py'))
+    self.assertTrue(hasattr(my_wrappers_et, 'GenerateSequence'))
+    self.assertTrue(hasattr(my_wrappers_et, 'KafkaWrite'))
+    self.assertTrue(hasattr(my_wrappers_et, 'KafkaRead'))
+
+  def test_script_workflow_with_ignored_transform(self):
+    expansion_service_config = {
+        "gradle_target": 'sdks:java:io:expansion-service:shadowJar',
+        'destinations': {
+            'python': f'apache_beam/transforms/{self.test_dir_name}'
+        },
+        'ignore': ['beam:schematransform:org.apache.beam:generate_sequence:v1']
+    }
+
+    with open(self.service_config_path, 'w') as f:
+      yaml.dump([expansion_service_config], f)
+
+    generate_transforms_config(
+        self.service_config_path, self.transform_config_path)
+
+    # test that transform config is populated correctly
+    with open(self.transform_config_path) as f:
+      transforms = yaml.safe_load(f)
+      gen_seq_config = None
+      for transform in transforms:
+        if transform['identifier'] == self.GEN_SEQ_IDENTIFIER:
+          gen_seq_config = transform
+      self.assertIsNone(gen_seq_config)
+
+  def test_run_pipeline_with_script_generated_transform(self):
+    modified_dest = f'apache_beam/transforms/{self.test_dir_name}/gen_seq'
+    expansion_service_config = {
+        "gradle_target": 'sdks:java:io:expansion-service:shadowJar',
+        'destinations': {
+            'python': f'apache_beam/transforms/{self.test_dir_name}'
+        },
+        'transforms': {
+            'beam:schematransform:org.apache.beam:generate_sequence:v1': {
+                'name': 'MyGenSeq', 'destinations': {
+                    'python': modified_dest
+                }
+            }
+        }
+    }
+    with open(self.service_config_path, 'w') as f:
+      yaml.dump([expansion_service_config], f)
+
+    run_script(False, self.service_config_path, self.transform_config_path)
+
+    gen_seq_et = import_module(
+        modified_dest.replace('/', '.') + PYTHON_SUFFIX.rstrip('.py'))
+
+    with beam.Pipeline() as p:
+      numbers = (
+          p | gen_seq_et.MyGenSeq(start=0, end=10)
+          | beam.Map(lambda row: row.value))
+
+      assert_that(numbers, equal_to([i for i in range(10)]))
+
+  def test_check_standard_external_transforms_config_in_sync(self):
+    """
+    This test creates a transforms config file and checks it against the file
+    in the SDK root `standard_external_transforms.yaml`. Fails if the
+    test is out of sync.
+
+    Fix by running `python gen_xlang_wrappers.py` and committing the changes.
+    """
+    generate_transforms_config(
+        os.path.join(PYTHON_SDK_ROOT, 'standard_expansion_services.yaml'),
+        self.transform_config_path)
+    with open(self.transform_config_path) as f:
+      test_config = yaml.safe_load(f)
+    with open(os.path.join(PYTHON_SDK_ROOT,
+                           'standard_external_transforms.yaml')) as f:
+      standard_config = yaml.safe_load(f)
+
+    self.assertEqual(
+        test_config,
+        standard_config,
+        "The standard xlang transforms config file "
+        "\"standard_external_transforms.yaml\" is out of sync! "
+        "Please update by running script "
+        "`python gen_xlang_wrappers.py` and commit the changes.")

Review Comment:
   Created a precommit that builds relevant jars and runs the script.
   > we'd probably use the lowest-supported
   
   Yeah looks like we need to only use one version for consistent typing in the 
documentation



##########
sdks/python/standard_external_transforms.yaml:
##########
@@ -0,0 +1,725 @@
+#

Review Comment:
   Sure we can keep one transform and skip all others in 
`standard_expansion_services.yaml` (We'd need to do this so the PreCommit 
doesn't fail)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to