tvalentyn commented on code in PR #29834:
URL: https://github.com/apache/beam/pull/29834#discussion_r1473465090


##########
sdks/python/pyproject.toml:
##########
@@ -29,8 +29,10 @@ requires = [
     "numpy>=1.14.3,<1.25", # Update setup.py as well.
     # having cython here will create wheels that are platform dependent.
     "cython==0.29.36",
+    # deps for generating external transform wrappers:
     'pyyaml>=3.12,<7.0.0',
-    'jinja2>=2.7.1,<4.0.0'
+    'jinja2>=2.7.1,<4.0.0',
+    'yapf==0.29.0'

Review Comment:
   Let's mention this file in the comment in 
https://github.com/apache/beam/blob/a221f98a5f46be985afcb98a65fcec3b46b81f92/sdks/python/tox.ini#L223



##########
.github/workflows/beam_PreCommit_Xlang_Generation.yml:
##########
@@ -0,0 +1,176 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: PreCommit Xlang Wrapper Generation
+
+on:
+  push:
+    tags: ['v*']
+    branches: ['master', 'release-*']
+    paths:
+      - 'model/**'
+      - 'sdks/python/**'
+      - 'sdks/java/expansion-service/**'
+      - 'sdks/java/core/**'
+      - 'sdks/java/io/**'
+      - 'sdks/java/extensions/sql/**'
+      - '!sdks/java/io/amazon-web-services/**'
+      - '!sdks/java/io/amazon-web-services2/**'
+      - '!sdks/java/io/amqp/**'
+      - '!sdks/java/io/azure/**'
+      - '!sdks/java/io/cassandra/**'
+      - '!sdks/java/io/cdap/**'
+      - '!sdks/java/io/clickhouse/**'
+      - '!sdks/java/io/debezium/**'
+      - '!sdks/java/io/elasticsearch/**'
+      - '!sdks/java/io/elasticsearch-tests/**'
+      - '!sdks/java/io/hadoop-common/**'
+      - '!sdks/java/io/hadoop-file-system/**'
+      - '!sdks/java/io/hadoop-format/**'
+      - '!sdks/java/io/hbase/**'
+      - '!sdks/java/io/hcatalog/**'
+      - '!sdks/java/io/influxdb/**'
+      - '!sdks/java/io/jms/**'
+      - '!sdks/java/io/kinesis/**'
+      - '!sdks/java/io/kudu/**'
+      - '!sdks/java/io/mqtt/**'
+      - '!sdks/java/io/mongodb/**'
+      - '!sdks/java/io/neo4j/**'
+      - '!sdks/java/io/pulsar/**'
+      - '!sdks/java/io/rabbitmq/**'
+      - '!sdks/java/io/redis/**'
+      - '!sdks/java/io/rrio/**'
+      - '!sdks/java/io/snowflake/**'
+      - '!sdks/java/io/solr/**'
+      - '!sdks/java/io/splunk/**'
+      - '!sdks/java/io/thrift/**'
+      - '!sdks/java/io/tika/**'
+      - 'release/**'
+      - '.github/workflows/beam_PreCommit_Xlang_Generation.yml'
+  pull_request_target:
+    branches: ['master', 'release-*']
+    paths:
+      - 'model/**'
+      - 'sdks/python/**'
+      - 'sdks/java/expansion-service/**'
+      - 'sdks/java/core/**'
+      - 'sdks/java/io/**'
+      - 'sdks/java/extensions/sql/**'
+      - '!sdks/java/io/amazon-web-services/**'
+      - '!sdks/java/io/amazon-web-services2/**'
+      - '!sdks/java/io/amqp/**'
+      - '!sdks/java/io/azure/**'
+      - '!sdks/java/io/cassandra/**'
+      - '!sdks/java/io/cdap/**'
+      - '!sdks/java/io/clickhouse/**'
+      - '!sdks/java/io/debezium/**'
+      - '!sdks/java/io/elasticsearch/**'
+      - '!sdks/java/io/elasticsearch-tests/**'
+      - '!sdks/java/io/hadoop-common/**'
+      - '!sdks/java/io/hadoop-file-system/**'
+      - '!sdks/java/io/hadoop-format/**'
+      - '!sdks/java/io/hbase/**'
+      - '!sdks/java/io/hcatalog/**'
+      - '!sdks/java/io/influxdb/**'
+      - '!sdks/java/io/jms/**'
+      - '!sdks/java/io/kinesis/**'
+      - '!sdks/java/io/kudu/**'
+      - '!sdks/java/io/mqtt/**'
+      - '!sdks/java/io/mongodb/**'
+      - '!sdks/java/io/neo4j/**'
+      - '!sdks/java/io/pulsar/**'
+      - '!sdks/java/io/rabbitmq/**'
+      - '!sdks/java/io/redis/**'
+      - '!sdks/java/io/rrio/**'
+      - '!sdks/java/io/snowflake/**'
+      - '!sdks/java/io/solr/**'
+      - '!sdks/java/io/splunk/**'
+      - '!sdks/java/io/thrift/**'
+      - '!sdks/java/io/tika/**'
+      - 'release/**'
+      - 'release/trigger_all_tests.json'
+      - '.github/workflows/beam_PreCommit_Xlang_Generation.yml'
+  issue_comment:
+    types: [created]
+  schedule:
+    - cron: '30 2/6 * * *'
+  workflow_dispatch:
+
+#Setting explicit permissions for the action to avoid the default permissions 
which are `write-all` in case of pull_request_target event
+permissions:
+  actions: write
+  pull-requests: read
+  checks: read
+  contents: read
+  deployments: read
+  id-token: none
+  issues: read
+  discussions: read
+  packages: read
+  pages: read
+  repository-projects: read
+  security-events: read
+  statuses: read
+
+# This allows a subsequently queued workflow run to interrupt previous runs
+concurrency:
+  group: '${{ github.workflow }} @ ${{ github.event.issue.number || 
github.event.pull_request.head.label || github.sha || github.head_ref || 
github.ref }}-${{ github.event.schedule || github.event.comment.id || 
github.event.sender.login }}'
+  cancel-in-progress: true
+
+env:
+  GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
+  GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }}
+  GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}
+
+jobs:
+  beam_PreCommit_Xlang_Generation:
+    name: ${{ matrix.job_name }} (${{ matrix.job_phrase }} ${{ 
matrix.python_version }})
+    timeout-minutes: 120
+    runs-on: ['self-hosted', ubuntu-20.04, main]
+    strategy:
+      fail-fast: false
+      matrix:
+        job_name: ['beam_PreCommit_Xlang_Generation']
+        job_phrase: ['Run Xlang Generation PreCommit']

Review Comment:
   ```suggestion
           job_phrase: ['Run Xlang_Generation PreCommit']
   ```



##########
.github/workflows/beam_PreCommit_Xlang_Generation.yml:
##########
@@ -0,0 +1,176 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: PreCommit Xlang Wrapper Generation

Review Comment:
   Let's update  
https://github.com/apache/beam/blob/master/.github/workflows/README.md and 
https://github.com/apache/beam/blob/master/scripts/ci/release/test/resources/mass_comment.txt
 with new suites.



##########
sdks/python/apache_beam/transforms/external_transform_provider_test.py:
##########
@@ -407,7 +432,8 @@ def test_run_pipeline_with_script_generated_transform(self):
 
     with open(self.service_config_path, 'w') as f:
       yaml.dump([expansion_service_config], f)
-    generate_transforms_config(self.service_config_path, 
self.transform_config_path)
+    generate_transforms_config(

Review Comment:
   why do we generate  wrappers for this test, as opposed to taking the already 
generated wrappers and testing them? It seems that wrappers would be generated 
during beam installation. reusing them might remove the test dependency on 
jinja/yapf.
   
   



##########
.github/workflows/beam_PreCommit_Xlang_Generation.yml:
##########
@@ -0,0 +1,176 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: PreCommit Xlang Wrapper Generation
+
+on:
+  push:
+    tags: ['v*']
+    branches: ['master', 'release-*']
+    paths:
+      - 'model/**'
+      - 'sdks/python/**'
+      - 'sdks/java/expansion-service/**'
+      - 'sdks/java/core/**'
+      - 'sdks/java/io/**'
+      - 'sdks/java/extensions/sql/**'
+      - '!sdks/java/io/amazon-web-services/**'
+      - '!sdks/java/io/amazon-web-services2/**'
+      - '!sdks/java/io/amqp/**'
+      - '!sdks/java/io/azure/**'
+      - '!sdks/java/io/cassandra/**'
+      - '!sdks/java/io/cdap/**'
+      - '!sdks/java/io/clickhouse/**'
+      - '!sdks/java/io/debezium/**'
+      - '!sdks/java/io/elasticsearch/**'
+      - '!sdks/java/io/elasticsearch-tests/**'
+      - '!sdks/java/io/hadoop-common/**'
+      - '!sdks/java/io/hadoop-file-system/**'
+      - '!sdks/java/io/hadoop-format/**'
+      - '!sdks/java/io/hbase/**'
+      - '!sdks/java/io/hcatalog/**'
+      - '!sdks/java/io/influxdb/**'
+      - '!sdks/java/io/jms/**'
+      - '!sdks/java/io/kinesis/**'
+      - '!sdks/java/io/kudu/**'
+      - '!sdks/java/io/mqtt/**'
+      - '!sdks/java/io/mongodb/**'
+      - '!sdks/java/io/neo4j/**'
+      - '!sdks/java/io/pulsar/**'
+      - '!sdks/java/io/rabbitmq/**'
+      - '!sdks/java/io/redis/**'
+      - '!sdks/java/io/rrio/**'
+      - '!sdks/java/io/snowflake/**'
+      - '!sdks/java/io/solr/**'
+      - '!sdks/java/io/splunk/**'
+      - '!sdks/java/io/thrift/**'
+      - '!sdks/java/io/tika/**'
+      - 'release/**'
+      - '.github/workflows/beam_PreCommit_Xlang_Generation.yml'
+  pull_request_target:
+    branches: ['master', 'release-*']
+    paths:
+      - 'model/**'
+      - 'sdks/python/**'
+      - 'sdks/java/expansion-service/**'
+      - 'sdks/java/core/**'
+      - 'sdks/java/io/**'
+      - 'sdks/java/extensions/sql/**'
+      - '!sdks/java/io/amazon-web-services/**'
+      - '!sdks/java/io/amazon-web-services2/**'
+      - '!sdks/java/io/amqp/**'
+      - '!sdks/java/io/azure/**'
+      - '!sdks/java/io/cassandra/**'
+      - '!sdks/java/io/cdap/**'
+      - '!sdks/java/io/clickhouse/**'
+      - '!sdks/java/io/debezium/**'
+      - '!sdks/java/io/elasticsearch/**'
+      - '!sdks/java/io/elasticsearch-tests/**'
+      - '!sdks/java/io/hadoop-common/**'
+      - '!sdks/java/io/hadoop-file-system/**'
+      - '!sdks/java/io/hadoop-format/**'
+      - '!sdks/java/io/hbase/**'
+      - '!sdks/java/io/hcatalog/**'
+      - '!sdks/java/io/influxdb/**'
+      - '!sdks/java/io/jms/**'
+      - '!sdks/java/io/kinesis/**'
+      - '!sdks/java/io/kudu/**'
+      - '!sdks/java/io/mqtt/**'
+      - '!sdks/java/io/mongodb/**'
+      - '!sdks/java/io/neo4j/**'
+      - '!sdks/java/io/pulsar/**'
+      - '!sdks/java/io/rabbitmq/**'
+      - '!sdks/java/io/redis/**'
+      - '!sdks/java/io/rrio/**'
+      - '!sdks/java/io/snowflake/**'
+      - '!sdks/java/io/solr/**'
+      - '!sdks/java/io/splunk/**'
+      - '!sdks/java/io/thrift/**'
+      - '!sdks/java/io/tika/**'
+      - 'release/**'
+      - 'release/trigger_all_tests.json'
+      - '.github/workflows/beam_PreCommit_Xlang_Generation.yml'
+  issue_comment:
+    types: [created]
+  schedule:
+    - cron: '30 2/6 * * *'
+  workflow_dispatch:
+
+#Setting explicit permissions for the action to avoid the default permissions 
which are `write-all` in case of pull_request_target event
+permissions:
+  actions: write
+  pull-requests: read
+  checks: read
+  contents: read
+  deployments: read
+  id-token: none
+  issues: read
+  discussions: read
+  packages: read
+  pages: read
+  repository-projects: read
+  security-events: read
+  statuses: read
+
+# This allows a subsequently queued workflow run to interrupt previous runs
+concurrency:
+  group: '${{ github.workflow }} @ ${{ github.event.issue.number || 
github.event.pull_request.head.label || github.sha || github.head_ref || 
github.ref }}-${{ github.event.schedule || github.event.comment.id || 
github.event.sender.login }}'
+  cancel-in-progress: true
+
+env:
+  GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
+  GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }}
+  GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}
+
+jobs:
+  beam_PreCommit_Xlang_Generation:
+    name: ${{ matrix.job_name }} (${{ matrix.job_phrase }} ${{ 
matrix.python_version }})
+    timeout-minutes: 120
+    runs-on: ['self-hosted', ubuntu-20.04, main]
+    strategy:
+      fail-fast: false
+      matrix:
+        job_name: ['beam_PreCommit_Xlang_Generation']
+        job_phrase: ['Run Xlang Generation PreCommit']

Review Comment:
   nit: added underscore for consistency with other commands;
   
   alternative name:   beam_PreCommit_Generated_Transforms  /  Run 
Generated_Transforms PreCommit



##########
sdks/python/container/py39/base_image_requirements.txt:
##########
@@ -82,11 +82,13 @@ hypothesis==6.92.1
 idna==3.6
 importlib-metadata==7.0.0
 iniconfig==2.0.0
+Jinja2==3.1.2
 joblib==1.3.2
 Js2Py==0.74
 jsonpickle==3.0.2
 jsonschema==4.20.0
 jsonschema-specifications==2023.11.2
+MarkupSafe==2.1.3

Review Comment:
   are you editing these files by hand? these should be autogenerated,  file 
header has instructions. can be done later or in a separate PR. Don't remember 
if I asked this already.



##########
sdks/python/gen_xlang_wrappers.py:
##########
@@ -212,9 +179,60 @@ class name. This can be overriden by manually providing a 
name.
     f.write(
         "# NOTE: This file is autogenerated and should "
         "not be edited by hand.\n")
+    f.write(
+        "# Configs are generated based on the expansion service\n"
+        f"# configuration in {input_services.replace(PROJECT_ROOT, '')}.\n")
+    f.write("# Refer to sdks/python/gen_xlang_wrappers.py for more info.\n")
     dt = datetime.datetime.now().date()
-    f.write(f"# Last updated on: {dt}\n\n")
+    f.write(f"#\n# Last updated on: {dt}\n\n")
     yaml.dump(transform_list, f)
+  logging.info("Successfully wrote transform configs to file: %s", output_file)
+
+
+def validate_sdks_destinations(sdk, dest, service, identifier=None):

Review Comment:
   given that we now assemble external transform wrappers in one folder, is it 
still important to specify particular destinations (say, apache_beam/io vs 
apache_beam/io/gcp) when all generated wrappers will live in 
_external_transforms  ?



##########
sdks/python/apache_beam/transforms/external_transform_provider_test.py:
##########
@@ -423,45 +396,50 @@ def 
test_run_pipeline_with_script_generated_transform(self):
             }
         }
     }
+
     with open(self.service_config_path, 'w') as f:
       yaml.dump([expansion_service_config], f)
+    generate_transforms_config(
+        self.service_config_path, self.transform_config_path)
+    wrappers_grouped_by_destination = get_wrappers_from_transform_configs(
+        self.transform_config_path)
+    write_wrappers_to_destinations(
+        wrappers_grouped_by_destination, self.test_dir)
 
-    run_script(False, self.service_config_path, self.transform_config_path)
-
-    gen_seq_et = import_module(
-        modified_dest.replace('/', '.') + PYTHON_SUFFIX.rstrip('.py'))
+    gen_seq_et = self.get_module(modified_dest)
 
     with beam.Pipeline() as p:
       numbers = (
           p | gen_seq_et.MyGenSeq(start=0, end=10)
           | beam.Map(lambda row: row.value))
-
       assert_that(numbers, equal_to([i for i in range(10)]))
 
   def test_check_standard_external_transforms_config_in_sync(self):
     """
-    This test creates a transforms config file and checks it against the file
-    in the SDK root `standard_external_transforms.yaml`. Fails if the
-    test is out of sync.
+    This test creates a transforms config file and checks it against
+    `sdks/standard_external_transforms.yaml`. Fails if the test is out of sync.

Review Comment:
   ```suggestion
       `sdks/standard_external_transforms.yaml`. Fails if two configs don't 
match.
   ```



##########
sdks/python/apache_beam/transforms/external_transform_provider_test.py:
##########
@@ -186,83 +193,139 @@ def setUp(self):
     os.mkdir(self.test_dir)
 
     self.assertTrue(
-        os.environ.get('EXPANSION_PORT'), "Expansion service port not found!")
+        os.environ.get('EXPANSION_PORTS'), "Expansion service port not found!")
+    logging.info("EXPANSION_PORTS: %s", os.environ.get('EXPANSION_PORTS'))
 
   def tearDown(self):
     shutil.rmtree(self.test_dir, ignore_errors=False)
 
-  def test_script_workflow(self):
+  def delete_and_validate(self):
+    delete_generated_files(self.test_dir)
+    self.assertEqual(len(os.listdir(self.test_dir)), 0)
+
+  def test_script_fails_with_invalid_destinations(self):
     expansion_service_config = {
         "gradle_target": 'sdks:java:io:expansion-service:shadowJar',
         'destinations': {
-            'python': f'apache_beam/transforms/{self.test_dir_name}'
+            'python': 'apache_beam/some_nonexistent_dir'
         }
     }
+    with self.assertRaises(ValueError):
+      self.create_and_check_transforms_config_exists(expansion_service_config)
+
+  def test_pretty_types(self):
+    types = [
+        typing.Optional[typing.List[str]],
+        numpy.int16,
+        str,
+        typing.Dict[str, numpy.float64],
+        typing.Optional[typing.Dict[str, typing.List[numpy.int64]]],
+        typing.Dict[int, typing.Optional[str]]
+    ]
+
+    expected_type_names = [('List[str]', True), ('numpy.int16', False),
+                           ('str', False), ('Dict[str, numpy.float64]', False),
+                           ('Dict[str, List[numpy.int64]]', True),
+                           ('Dict[int, Union[str, NoneType]]', False)]
+
+    for i in range(len(types)):
+      self.assertEqual(pretty_type(types[i]), expected_type_names[i])
+
+  def create_and_check_transforms_config_exists(self, 
expansion_service_config):
     with open(self.service_config_path, 'w') as f:
       yaml.dump([expansion_service_config], f)
 
-    # test that transform config YAML file is created
     generate_transforms_config(
         self.service_config_path, self.transform_config_path)
     self.assertTrue(os.path.exists(self.transform_config_path))
-    expected_destination = \
-      f'apache_beam/transforms/{self.test_dir_name}/generate_sequence'
-    # test that transform config is populated correctly
+
+  def create_and_validate_transforms_config(
+      self, expansion_service_config, expected_name, expected_destination):
+    self.create_and_check_transforms_config_exists(expansion_service_config)
+
     with open(self.transform_config_path) as f:
-      transforms = yaml.safe_load(f)
+      configs = yaml.safe_load(f)
       gen_seq_config = None
-      for transform in transforms:
-        if transform['identifier'] == self.GEN_SEQ_IDENTIFIER:
-          gen_seq_config = transform
+      for config in configs:
+        if config['identifier'] == self.GEN_SEQ_IDENTIFIER:
+          gen_seq_config = config
       self.assertIsNotNone(gen_seq_config)
       self.assertEqual(
           gen_seq_config['default_service'],
           expansion_service_config['gradle_target'])
-      self.assertEqual(gen_seq_config['name'], 'GenerateSequence')
+      self.assertEqual(gen_seq_config['name'], expected_name)
       self.assertEqual(
           gen_seq_config['destinations']['python'], expected_destination)
       self.assertIn("end", gen_seq_config['fields'])
       self.assertIn("start", gen_seq_config['fields'])
       self.assertIn("rate", gen_seq_config['fields'])
 
-    # test that the code for GenerateSequence is set to the right destination
+  def get_module(self, dest):
+    module_name = dest.replace('apache_beam/', '').replace('/', '_')
+    module = 'apache_beam.transforms.%s.%s' % (self.test_dir_name, module_name)
+    return import_module(module)
+
+  def write_wrappers_to_destinations_and_validate(
+      self, destinations: typing.List[str]):
+    """
+    Generate wrappers from the config path and validate all destinations are
+    included.
+    Then write wrappers to destinations and validate all destination paths
+    exist.
+
+    :return: Generated wrappers grouped by destination
+    """
     grouped_wrappers = get_wrappers_from_transform_configs(
         self.transform_config_path)
-    self.assertIn(expected_destination, grouped_wrappers)
-    # only the GenerateSequence wrapper is set to this destination
-    self.assertEqual(len(grouped_wrappers[expected_destination]), 1)
+    for dest in destinations:
+      self.assertIn(dest, grouped_wrappers)
+
+    # write to our test directory to avoid messing with other files
+    write_wrappers_to_destinations(grouped_wrappers, self.test_dir)
+
+    for dest in destinations:
+      self.assertTrue(
+          os.path.exists(
+              os.path.join(
+                  self.test_dir,
+                  dest.replace('apache_beam/', '').replace('/', '_') + ".py")))
+    return grouped_wrappers
+
+  def test_script_workflow(self):
+    expected_destination = 'apache_beam/transforms'
+    expansion_service_config = {
+        "gradle_target": 'sdks:java:io:expansion-service:shadowJar',
+        'destinations': {
+            'python': expected_destination
+        }
+    }
+
+    self.create_and_validate_transforms_config(
+        expansion_service_config, 'GenerateSequence', expected_destination)
+    grouped_wrappers = self.write_wrappers_to_destinations_and_validate(
+        [expected_destination])
+    # at least the GenerateSequence wrapper is set to this destination
+    self.assertGreaterEqual(len(grouped_wrappers[expected_destination]), 1)
 
-    # test that the correct destination is created
-    write_wrappers_to_destinations(grouped_wrappers)
-    self.assertTrue(
-        os.path.exists(
-            os.path.join(self.test_dir, 'generate_sequence' + PYTHON_SUFFIX)))
     # check the wrapper exists in this destination and has correct properties
-    generate_sequence_et = import_module(
-        expected_destination.replace('/', '.') + PYTHON_SUFFIX.rstrip('.py'))
-    self.assertTrue(hasattr(generate_sequence_et, 'GenerateSequence'))
+    output_module = self.get_module(expected_destination)
+    self.assertTrue(hasattr(output_module, 'GenerateSequence'))
+    self.assertTrue(hasattr(output_module, 'KafkaWrite'))  # also check that

Review Comment:
   >  also check that
   
   nit: this is not a very helpful comment; perhaps you want to explain what's 
the significance of testing for presence of these transforms?
   
   Also are we not skipping Kafka transforms?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to