This is an automated email from the ASF dual-hosted git repository. damccorm pushed a commit to branch users/damccorm/gbek-xlang-py-tests in repository https://gitbox.apache.org/repos/asf/beam.git
commit 5064168fb1d491c3aab8265ee865e3a782dd775f Author: Danny Mccormick <[email protected]> AuthorDate: Thu Oct 9 14:59:44 2025 -0400 Add some x-lang gbek tests --- ...am_PostCommit_XVR_PythonUsingJava_Dataflow.json | 4 + .../transforms/validate_runner_xlang_test.py | 93 ++++++++++++++++++++++ 2 files changed, 97 insertions(+) diff --git a/.github/trigger_files/beam_PostCommit_XVR_PythonUsingJava_Dataflow.json b/.github/trigger_files/beam_PostCommit_XVR_PythonUsingJava_Dataflow.json new file mode 100644 index 00000000000..b73af5e61a4 --- /dev/null +++ b/.github/trigger_files/beam_PostCommit_XVR_PythonUsingJava_Dataflow.json @@ -0,0 +1,4 @@ +{ + "comment": "Modify this file in a trivial way to cause this test suite to run.", + "modification": 1 +} diff --git a/sdks/python/apache_beam/transforms/validate_runner_xlang_test.py b/sdks/python/apache_beam/transforms/validate_runner_xlang_test.py index 8e8e7964825..40f7478fd5d 100644 --- a/sdks/python/apache_beam/transforms/validate_runner_xlang_test.py +++ b/sdks/python/apache_beam/transforms/validate_runner_xlang_test.py @@ -52,17 +52,27 @@ https://docs.google.com/document/d/1xQp0ElIV84b8OCVz8CD2hvbiWdR8w4BvWxPTZJZA6NA import logging import os +import random +import string import typing import unittest import pytest import apache_beam as beam +from apache_beam.options.pipeline_options import SetupOptions from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +from apache_beam.transforms.util import GcpSecret +from apache_beam.transforms.util import Secret from apache_beam.transforms.external import ImplicitSchemaPayloadBuilder +try: + from google.cloud import secretmanager +except ImportError: + secretmanager = None # type: ignore[assignment] + TEST_PREFIX_URN = "beam:transforms:xlang:test:prefix" TEST_MULTI_URN = "beam:transforms:xlang:test:multi" TEST_GBK_URN = "beam:transforms:xlang:test:gbk" @@ -140,6 +150,24 @@ class CrossLanguageTestPipelines(object): | beam.Map(lambda x: "{}:{}".format(x[0], ','.join(sorted(x[1]))))) assert_that(res, equal_to(['0:1,2', '1:3'])) + def run_group_by_key_no_assert(self, pipeline): + """ + Target transform - GroupByKey, with no assertion for checking errors + (https://beam.apache.org/documentation/programming-guide/#groupbykey) + Test scenario - Grouping a collection of KV<K,V> to a collection of + KV<K, Iterable<V>> by key + Boundary conditions checked - + - PCollection<KV<?, ?>> to external transforms + - PCollection<KV<?, Iterable<?>>> from external transforms + """ + with pipeline as p: + ( + p + | beam.Create([(0, "1"), (0, "2"), + (1, "3")], reshuffle=False).with_output_types( + typing.Tuple[int, str]) + | beam.ExternalTransform(TEST_GBK_URN, None, self.expansion_service)) + def run_cogroup_by_key(self, pipeline): """ Target transform - CoGroupByKey @@ -298,6 +326,71 @@ class ValidateRunnerXlangTest(unittest.TestCase): test_pipeline or self.create_pipeline()) [email protected]( + os.environ.get('EXPANSION_PORT'), + "EXPANSION_PORT environment var is not provided.") [email protected](secretmanager is None, 'secretmanager not installed') +class ValidateRunnerGBEKTest(unittest.TestCase): + def setUp(self): + if secretmanager is not None: + self.project_id = 'apache-beam-testing' + secret_postfix = ''.join(random.choice(string.digits) for _ in range(6)) + self.secret_id = 'gbek_secret_tests_' + secret_postfix + self.client = secretmanager.SecretManagerServiceClient() + self.project_path = f'projects/{self.project_id}' + self.secret_path = f'{self.project_path}/secrets/{self.secret_id}' + try: + self.client.get_secret(request={'name': self.secret_path}) + except Exception: + self.client.create_secret( + request={ + 'parent': self.project_path, + 'secret_id': self.secret_id, + 'secret': { + 'replication': { + 'automatic': {} + } + } + }) + self.client.add_secret_version( + request={ + 'parent': self.secret_path, + 'payload': { + 'data': Secret.generate_secret_bytes() + } + }) + version_name = f'{self.secret_path}/versions/latest' + self.gcp_secret = GcpSecret(version_name) + self.secret_option = f'type:GcpSecret;version_name:{version_name}' + + def tearDown(self): + if secretmanager is not None: + self.client.delete_secret(request={'name': self.secret_path}) + + def create_pipeline(self): + test_pipeline = TestPipeline() + test_pipeline.not_use_test_runner_api = True + return test_pipeline + + # This test and test_group_by_key_gbek_bad_secret validate that the gbek + # pipeline option is correctly passed through + @pytest.mark.uses_java_expansion_service + @pytest.mark.uses_python_expansion_service + def test_group_by_key_gbek(self, test_pipeline=None): + test_pipeline = test_pipeline or self.create_pipeline() + good_secret = self.secret_option + test_pipeline.options.view_as(SetupOptions).gbek = good_secret + CrossLanguageTestPipelines().run_group_by_key(test_pipeline) + + # Verify actually using secret manager + test_pipeline = self.create_pipeline() + nonexistent_secret = 'version_name:nonexistent_secret' + test_pipeline.options.view_as(SetupOptions).gbek = nonexistent_secret + with self.assertRaisesRegex( + Exception, 'Secret string must contain a valid type parameter'): + CrossLanguageTestPipelines().run_group_by_key_no_assert(test_pipeline) + + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main()
