tvalentyn commented on code in PR #29834:
URL: https://github.com/apache/beam/pull/29834#discussion_r1473465090
##########
sdks/python/pyproject.toml:
##########
@@ -29,8 +29,10 @@ requires = [
"numpy>=1.14.3,<1.25", # Update setup.py as well.
# having cython here will create wheels that are platform dependent.
"cython==0.29.36",
+ # deps for generating external transform wrappers:
'pyyaml>=3.12,<7.0.0',
- 'jinja2>=2.7.1,<4.0.0'
+ 'jinja2>=2.7.1,<4.0.0',
+ 'yapf==0.29.0'
Review Comment:
Let's mention this file in the comment in
https://github.com/apache/beam/blob/a221f98a5f46be985afcb98a65fcec3b46b81f92/sdks/python/tox.ini#L223
##########
.github/workflows/beam_PreCommit_Xlang_Generation.yml:
##########
@@ -0,0 +1,176 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: PreCommit Xlang Wrapper Generation
+
+on:
+ push:
+ tags: ['v*']
+ branches: ['master', 'release-*']
+ paths:
+ - 'model/**'
+ - 'sdks/python/**'
+ - 'sdks/java/expansion-service/**'
+ - 'sdks/java/core/**'
+ - 'sdks/java/io/**'
+ - 'sdks/java/extensions/sql/**'
+ - '!sdks/java/io/amazon-web-services/**'
+ - '!sdks/java/io/amazon-web-services2/**'
+ - '!sdks/java/io/amqp/**'
+ - '!sdks/java/io/azure/**'
+ - '!sdks/java/io/cassandra/**'
+ - '!sdks/java/io/cdap/**'
+ - '!sdks/java/io/clickhouse/**'
+ - '!sdks/java/io/debezium/**'
+ - '!sdks/java/io/elasticsearch/**'
+ - '!sdks/java/io/elasticsearch-tests/**'
+ - '!sdks/java/io/hadoop-common/**'
+ - '!sdks/java/io/hadoop-file-system/**'
+ - '!sdks/java/io/hadoop-format/**'
+ - '!sdks/java/io/hbase/**'
+ - '!sdks/java/io/hcatalog/**'
+ - '!sdks/java/io/influxdb/**'
+ - '!sdks/java/io/jms/**'
+ - '!sdks/java/io/kinesis/**'
+ - '!sdks/java/io/kudu/**'
+ - '!sdks/java/io/mqtt/**'
+ - '!sdks/java/io/mongodb/**'
+ - '!sdks/java/io/neo4j/**'
+ - '!sdks/java/io/pulsar/**'
+ - '!sdks/java/io/rabbitmq/**'
+ - '!sdks/java/io/redis/**'
+ - '!sdks/java/io/rrio/**'
+ - '!sdks/java/io/snowflake/**'
+ - '!sdks/java/io/solr/**'
+ - '!sdks/java/io/splunk/**'
+ - '!sdks/java/io/thrift/**'
+ - '!sdks/java/io/tika/**'
+ - 'release/**'
+ - '.github/workflows/beam_PreCommit_Xlang_Generation.yml'
+ pull_request_target:
+ branches: ['master', 'release-*']
+ paths:
+ - 'model/**'
+ - 'sdks/python/**'
+ - 'sdks/java/expansion-service/**'
+ - 'sdks/java/core/**'
+ - 'sdks/java/io/**'
+ - 'sdks/java/extensions/sql/**'
+ - '!sdks/java/io/amazon-web-services/**'
+ - '!sdks/java/io/amazon-web-services2/**'
+ - '!sdks/java/io/amqp/**'
+ - '!sdks/java/io/azure/**'
+ - '!sdks/java/io/cassandra/**'
+ - '!sdks/java/io/cdap/**'
+ - '!sdks/java/io/clickhouse/**'
+ - '!sdks/java/io/debezium/**'
+ - '!sdks/java/io/elasticsearch/**'
+ - '!sdks/java/io/elasticsearch-tests/**'
+ - '!sdks/java/io/hadoop-common/**'
+ - '!sdks/java/io/hadoop-file-system/**'
+ - '!sdks/java/io/hadoop-format/**'
+ - '!sdks/java/io/hbase/**'
+ - '!sdks/java/io/hcatalog/**'
+ - '!sdks/java/io/influxdb/**'
+ - '!sdks/java/io/jms/**'
+ - '!sdks/java/io/kinesis/**'
+ - '!sdks/java/io/kudu/**'
+ - '!sdks/java/io/mqtt/**'
+ - '!sdks/java/io/mongodb/**'
+ - '!sdks/java/io/neo4j/**'
+ - '!sdks/java/io/pulsar/**'
+ - '!sdks/java/io/rabbitmq/**'
+ - '!sdks/java/io/redis/**'
+ - '!sdks/java/io/rrio/**'
+ - '!sdks/java/io/snowflake/**'
+ - '!sdks/java/io/solr/**'
+ - '!sdks/java/io/splunk/**'
+ - '!sdks/java/io/thrift/**'
+ - '!sdks/java/io/tika/**'
+ - 'release/**'
+ - 'release/trigger_all_tests.json'
+ - '.github/workflows/beam_PreCommit_Xlang_Generation.yml'
+ issue_comment:
+ types: [created]
+ schedule:
+ - cron: '30 2/6 * * *'
+ workflow_dispatch:
+
+#Setting explicit permissions for the action to avoid the default permissions
which are `write-all` in case of pull_request_target event
+permissions:
+ actions: write
+ pull-requests: read
+ checks: read
+ contents: read
+ deployments: read
+ id-token: none
+ issues: read
+ discussions: read
+ packages: read
+ pages: read
+ repository-projects: read
+ security-events: read
+ statuses: read
+
+# This allows a subsequently queued workflow run to interrupt previous runs
+concurrency:
+ group: '${{ github.workflow }} @ ${{ github.event.issue.number ||
github.event.pull_request.head.label || github.sha || github.head_ref ||
github.ref }}-${{ github.event.schedule || github.event.comment.id ||
github.event.sender.login }}'
+ cancel-in-progress: true
+
+env:
+ GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
+ GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }}
+ GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}
+
+jobs:
+ beam_PreCommit_Xlang_Generation:
+ name: ${{ matrix.job_name }} (${{ matrix.job_phrase }} ${{
matrix.python_version }})
+ timeout-minutes: 120
+ runs-on: ['self-hosted', ubuntu-20.04, main]
+ strategy:
+ fail-fast: false
+ matrix:
+ job_name: ['beam_PreCommit_Xlang_Generation']
+ job_phrase: ['Run Xlang Generation PreCommit']
Review Comment:
```suggestion
job_phrase: ['Run Xlang_Generation PreCommit']
```
##########
.github/workflows/beam_PreCommit_Xlang_Generation.yml:
##########
@@ -0,0 +1,176 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: PreCommit Xlang Wrapper Generation
Review Comment:
Let's update
https://github.com/apache/beam/blob/master/.github/workflows/README.md and
https://github.com/apache/beam/blob/master/scripts/ci/release/test/resources/mass_comment.txt
with new suites.
##########
sdks/python/apache_beam/transforms/external_transform_provider_test.py:
##########
@@ -407,7 +432,8 @@ def test_run_pipeline_with_script_generated_transform(self):
with open(self.service_config_path, 'w') as f:
yaml.dump([expansion_service_config], f)
- generate_transforms_config(self.service_config_path,
self.transform_config_path)
+ generate_transforms_config(
Review Comment:
why do we generate wrappers for this test, as opposed to taking the already
generated wrappers and testing them? It seems that wrappers would be generated
during beam installation. reusing them might remove the test dependency on
jinja/yapf.
##########
.github/workflows/beam_PreCommit_Xlang_Generation.yml:
##########
@@ -0,0 +1,176 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: PreCommit Xlang Wrapper Generation
+
+on:
+ push:
+ tags: ['v*']
+ branches: ['master', 'release-*']
+ paths:
+ - 'model/**'
+ - 'sdks/python/**'
+ - 'sdks/java/expansion-service/**'
+ - 'sdks/java/core/**'
+ - 'sdks/java/io/**'
+ - 'sdks/java/extensions/sql/**'
+ - '!sdks/java/io/amazon-web-services/**'
+ - '!sdks/java/io/amazon-web-services2/**'
+ - '!sdks/java/io/amqp/**'
+ - '!sdks/java/io/azure/**'
+ - '!sdks/java/io/cassandra/**'
+ - '!sdks/java/io/cdap/**'
+ - '!sdks/java/io/clickhouse/**'
+ - '!sdks/java/io/debezium/**'
+ - '!sdks/java/io/elasticsearch/**'
+ - '!sdks/java/io/elasticsearch-tests/**'
+ - '!sdks/java/io/hadoop-common/**'
+ - '!sdks/java/io/hadoop-file-system/**'
+ - '!sdks/java/io/hadoop-format/**'
+ - '!sdks/java/io/hbase/**'
+ - '!sdks/java/io/hcatalog/**'
+ - '!sdks/java/io/influxdb/**'
+ - '!sdks/java/io/jms/**'
+ - '!sdks/java/io/kinesis/**'
+ - '!sdks/java/io/kudu/**'
+ - '!sdks/java/io/mqtt/**'
+ - '!sdks/java/io/mongodb/**'
+ - '!sdks/java/io/neo4j/**'
+ - '!sdks/java/io/pulsar/**'
+ - '!sdks/java/io/rabbitmq/**'
+ - '!sdks/java/io/redis/**'
+ - '!sdks/java/io/rrio/**'
+ - '!sdks/java/io/snowflake/**'
+ - '!sdks/java/io/solr/**'
+ - '!sdks/java/io/splunk/**'
+ - '!sdks/java/io/thrift/**'
+ - '!sdks/java/io/tika/**'
+ - 'release/**'
+ - '.github/workflows/beam_PreCommit_Xlang_Generation.yml'
+ pull_request_target:
+ branches: ['master', 'release-*']
+ paths:
+ - 'model/**'
+ - 'sdks/python/**'
+ - 'sdks/java/expansion-service/**'
+ - 'sdks/java/core/**'
+ - 'sdks/java/io/**'
+ - 'sdks/java/extensions/sql/**'
+ - '!sdks/java/io/amazon-web-services/**'
+ - '!sdks/java/io/amazon-web-services2/**'
+ - '!sdks/java/io/amqp/**'
+ - '!sdks/java/io/azure/**'
+ - '!sdks/java/io/cassandra/**'
+ - '!sdks/java/io/cdap/**'
+ - '!sdks/java/io/clickhouse/**'
+ - '!sdks/java/io/debezium/**'
+ - '!sdks/java/io/elasticsearch/**'
+ - '!sdks/java/io/elasticsearch-tests/**'
+ - '!sdks/java/io/hadoop-common/**'
+ - '!sdks/java/io/hadoop-file-system/**'
+ - '!sdks/java/io/hadoop-format/**'
+ - '!sdks/java/io/hbase/**'
+ - '!sdks/java/io/hcatalog/**'
+ - '!sdks/java/io/influxdb/**'
+ - '!sdks/java/io/jms/**'
+ - '!sdks/java/io/kinesis/**'
+ - '!sdks/java/io/kudu/**'
+ - '!sdks/java/io/mqtt/**'
+ - '!sdks/java/io/mongodb/**'
+ - '!sdks/java/io/neo4j/**'
+ - '!sdks/java/io/pulsar/**'
+ - '!sdks/java/io/rabbitmq/**'
+ - '!sdks/java/io/redis/**'
+ - '!sdks/java/io/rrio/**'
+ - '!sdks/java/io/snowflake/**'
+ - '!sdks/java/io/solr/**'
+ - '!sdks/java/io/splunk/**'
+ - '!sdks/java/io/thrift/**'
+ - '!sdks/java/io/tika/**'
+ - 'release/**'
+ - 'release/trigger_all_tests.json'
+ - '.github/workflows/beam_PreCommit_Xlang_Generation.yml'
+ issue_comment:
+ types: [created]
+ schedule:
+ - cron: '30 2/6 * * *'
+ workflow_dispatch:
+
+#Setting explicit permissions for the action to avoid the default permissions
which are `write-all` in case of pull_request_target event
+permissions:
+ actions: write
+ pull-requests: read
+ checks: read
+ contents: read
+ deployments: read
+ id-token: none
+ issues: read
+ discussions: read
+ packages: read
+ pages: read
+ repository-projects: read
+ security-events: read
+ statuses: read
+
+# This allows a subsequently queued workflow run to interrupt previous runs
+concurrency:
+ group: '${{ github.workflow }} @ ${{ github.event.issue.number ||
github.event.pull_request.head.label || github.sha || github.head_ref ||
github.ref }}-${{ github.event.schedule || github.event.comment.id ||
github.event.sender.login }}'
+ cancel-in-progress: true
+
+env:
+ GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
+ GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }}
+ GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}
+
+jobs:
+ beam_PreCommit_Xlang_Generation:
+ name: ${{ matrix.job_name }} (${{ matrix.job_phrase }} ${{
matrix.python_version }})
+ timeout-minutes: 120
+ runs-on: ['self-hosted', ubuntu-20.04, main]
+ strategy:
+ fail-fast: false
+ matrix:
+ job_name: ['beam_PreCommit_Xlang_Generation']
+ job_phrase: ['Run Xlang Generation PreCommit']
Review Comment:
nit: added underscore for consistency with other commands;
alternative name: beam_PreCommit_Generated_Transforms / Run
Generated_Transforms PreCommit
##########
sdks/python/container/py39/base_image_requirements.txt:
##########
@@ -82,11 +82,13 @@ hypothesis==6.92.1
idna==3.6
importlib-metadata==7.0.0
iniconfig==2.0.0
+Jinja2==3.1.2
joblib==1.3.2
Js2Py==0.74
jsonpickle==3.0.2
jsonschema==4.20.0
jsonschema-specifications==2023.11.2
+MarkupSafe==2.1.3
Review Comment:
are you editing these files by hand? these should be autogenerated, file
header has instructions. can be done later or in a separate PR. Don't remember
if I asked this already.
##########
sdks/python/gen_xlang_wrappers.py:
##########
@@ -212,9 +179,60 @@ class name. This can be overriden by manually providing a
name.
f.write(
"# NOTE: This file is autogenerated and should "
"not be edited by hand.\n")
+ f.write(
+ "# Configs are generated based on the expansion service\n"
+ f"# configuration in {input_services.replace(PROJECT_ROOT, '')}.\n")
+ f.write("# Refer to sdks/python/gen_xlang_wrappers.py for more info.\n")
dt = datetime.datetime.now().date()
- f.write(f"# Last updated on: {dt}\n\n")
+ f.write(f"#\n# Last updated on: {dt}\n\n")
yaml.dump(transform_list, f)
+ logging.info("Successfully wrote transform configs to file: %s", output_file)
+
+
+def validate_sdks_destinations(sdk, dest, service, identifier=None):
Review Comment:
given that we now assemble external transform wrappers in one folder, is it
still important to specify particular destinations (say, apache_beam/io vs
apache_beam/io/gcp) when all generated wrappers will live in
_external_transforms ?
##########
sdks/python/apache_beam/transforms/external_transform_provider_test.py:
##########
@@ -423,45 +396,50 @@ def
test_run_pipeline_with_script_generated_transform(self):
}
}
}
+
with open(self.service_config_path, 'w') as f:
yaml.dump([expansion_service_config], f)
+ generate_transforms_config(
+ self.service_config_path, self.transform_config_path)
+ wrappers_grouped_by_destination = get_wrappers_from_transform_configs(
+ self.transform_config_path)
+ write_wrappers_to_destinations(
+ wrappers_grouped_by_destination, self.test_dir)
- run_script(False, self.service_config_path, self.transform_config_path)
-
- gen_seq_et = import_module(
- modified_dest.replace('/', '.') + PYTHON_SUFFIX.rstrip('.py'))
+ gen_seq_et = self.get_module(modified_dest)
with beam.Pipeline() as p:
numbers = (
p | gen_seq_et.MyGenSeq(start=0, end=10)
| beam.Map(lambda row: row.value))
-
assert_that(numbers, equal_to([i for i in range(10)]))
def test_check_standard_external_transforms_config_in_sync(self):
"""
- This test creates a transforms config file and checks it against the file
- in the SDK root `standard_external_transforms.yaml`. Fails if the
- test is out of sync.
+ This test creates a transforms config file and checks it against
+ `sdks/standard_external_transforms.yaml`. Fails if the test is out of sync.
Review Comment:
```suggestion
`sdks/standard_external_transforms.yaml`. Fails if two configs don't
match.
```
##########
sdks/python/apache_beam/transforms/external_transform_provider_test.py:
##########
@@ -186,83 +193,139 @@ def setUp(self):
os.mkdir(self.test_dir)
self.assertTrue(
- os.environ.get('EXPANSION_PORT'), "Expansion service port not found!")
+ os.environ.get('EXPANSION_PORTS'), "Expansion service port not found!")
+ logging.info("EXPANSION_PORTS: %s", os.environ.get('EXPANSION_PORTS'))
def tearDown(self):
shutil.rmtree(self.test_dir, ignore_errors=False)
- def test_script_workflow(self):
+ def delete_and_validate(self):
+ delete_generated_files(self.test_dir)
+ self.assertEqual(len(os.listdir(self.test_dir)), 0)
+
+ def test_script_fails_with_invalid_destinations(self):
expansion_service_config = {
"gradle_target": 'sdks:java:io:expansion-service:shadowJar',
'destinations': {
- 'python': f'apache_beam/transforms/{self.test_dir_name}'
+ 'python': 'apache_beam/some_nonexistent_dir'
}
}
+ with self.assertRaises(ValueError):
+ self.create_and_check_transforms_config_exists(expansion_service_config)
+
+ def test_pretty_types(self):
+ types = [
+ typing.Optional[typing.List[str]],
+ numpy.int16,
+ str,
+ typing.Dict[str, numpy.float64],
+ typing.Optional[typing.Dict[str, typing.List[numpy.int64]]],
+ typing.Dict[int, typing.Optional[str]]
+ ]
+
+ expected_type_names = [('List[str]', True), ('numpy.int16', False),
+ ('str', False), ('Dict[str, numpy.float64]', False),
+ ('Dict[str, List[numpy.int64]]', True),
+ ('Dict[int, Union[str, NoneType]]', False)]
+
+ for i in range(len(types)):
+ self.assertEqual(pretty_type(types[i]), expected_type_names[i])
+
+ def create_and_check_transforms_config_exists(self,
expansion_service_config):
with open(self.service_config_path, 'w') as f:
yaml.dump([expansion_service_config], f)
- # test that transform config YAML file is created
generate_transforms_config(
self.service_config_path, self.transform_config_path)
self.assertTrue(os.path.exists(self.transform_config_path))
- expected_destination = \
- f'apache_beam/transforms/{self.test_dir_name}/generate_sequence'
- # test that transform config is populated correctly
+
+ def create_and_validate_transforms_config(
+ self, expansion_service_config, expected_name, expected_destination):
+ self.create_and_check_transforms_config_exists(expansion_service_config)
+
with open(self.transform_config_path) as f:
- transforms = yaml.safe_load(f)
+ configs = yaml.safe_load(f)
gen_seq_config = None
- for transform in transforms:
- if transform['identifier'] == self.GEN_SEQ_IDENTIFIER:
- gen_seq_config = transform
+ for config in configs:
+ if config['identifier'] == self.GEN_SEQ_IDENTIFIER:
+ gen_seq_config = config
self.assertIsNotNone(gen_seq_config)
self.assertEqual(
gen_seq_config['default_service'],
expansion_service_config['gradle_target'])
- self.assertEqual(gen_seq_config['name'], 'GenerateSequence')
+ self.assertEqual(gen_seq_config['name'], expected_name)
self.assertEqual(
gen_seq_config['destinations']['python'], expected_destination)
self.assertIn("end", gen_seq_config['fields'])
self.assertIn("start", gen_seq_config['fields'])
self.assertIn("rate", gen_seq_config['fields'])
- # test that the code for GenerateSequence is set to the right destination
+ def get_module(self, dest):
+ module_name = dest.replace('apache_beam/', '').replace('/', '_')
+ module = 'apache_beam.transforms.%s.%s' % (self.test_dir_name, module_name)
+ return import_module(module)
+
+ def write_wrappers_to_destinations_and_validate(
+ self, destinations: typing.List[str]):
+ """
+ Generate wrappers from the config path and validate all destinations are
+ included.
+ Then write wrappers to destinations and validate all destination paths
+ exist.
+
+ :return: Generated wrappers grouped by destination
+ """
grouped_wrappers = get_wrappers_from_transform_configs(
self.transform_config_path)
- self.assertIn(expected_destination, grouped_wrappers)
- # only the GenerateSequence wrapper is set to this destination
- self.assertEqual(len(grouped_wrappers[expected_destination]), 1)
+ for dest in destinations:
+ self.assertIn(dest, grouped_wrappers)
+
+ # write to our test directory to avoid messing with other files
+ write_wrappers_to_destinations(grouped_wrappers, self.test_dir)
+
+ for dest in destinations:
+ self.assertTrue(
+ os.path.exists(
+ os.path.join(
+ self.test_dir,
+ dest.replace('apache_beam/', '').replace('/', '_') + ".py")))
+ return grouped_wrappers
+
+ def test_script_workflow(self):
+ expected_destination = 'apache_beam/transforms'
+ expansion_service_config = {
+ "gradle_target": 'sdks:java:io:expansion-service:shadowJar',
+ 'destinations': {
+ 'python': expected_destination
+ }
+ }
+
+ self.create_and_validate_transforms_config(
+ expansion_service_config, 'GenerateSequence', expected_destination)
+ grouped_wrappers = self.write_wrappers_to_destinations_and_validate(
+ [expected_destination])
+ # at least the GenerateSequence wrapper is set to this destination
+ self.assertGreaterEqual(len(grouped_wrappers[expected_destination]), 1)
- # test that the correct destination is created
- write_wrappers_to_destinations(grouped_wrappers)
- self.assertTrue(
- os.path.exists(
- os.path.join(self.test_dir, 'generate_sequence' + PYTHON_SUFFIX)))
# check the wrapper exists in this destination and has correct properties
- generate_sequence_et = import_module(
- expected_destination.replace('/', '.') + PYTHON_SUFFIX.rstrip('.py'))
- self.assertTrue(hasattr(generate_sequence_et, 'GenerateSequence'))
+ output_module = self.get_module(expected_destination)
+ self.assertTrue(hasattr(output_module, 'GenerateSequence'))
+ self.assertTrue(hasattr(output_module, 'KafkaWrite')) # also check that
Review Comment:
> also check that
nit: this is not a very helpful comment; perhaps you want to explain what's
the significance of testing for presence of these transforms?
Also are we not skipping Kafka transforms?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]