This is an automated email from the ASF dual-hosted git repository. damccorm pushed a commit to branch users/damccorm/gbek-xlang-tests in repository https://gitbox.apache.org/repos/asf/beam.git
commit b4523d86ef354a976b089f6198e5372f32a5b7f7 Author: Danny Mccormick <[email protected]> AuthorDate: Tue Oct 7 07:33:06 2025 -0400 x-lang gbek tests --- ...am_PostCommit_XVR_PythonUsingJava_Dataflow.json | 4 + .../transforms/validate_runner_xlang_test.py | 85 ++++++++++++++++++++++ 2 files changed, 89 insertions(+) diff --git a/.github/trigger_files/beam_PostCommit_XVR_PythonUsingJava_Dataflow.json b/.github/trigger_files/beam_PostCommit_XVR_PythonUsingJava_Dataflow.json new file mode 100644 index 00000000000..6a55e29ae15 --- /dev/null +++ b/.github/trigger_files/beam_PostCommit_XVR_PythonUsingJava_Dataflow.json @@ -0,0 +1,4 @@ +{ + "comment": "Modify this file in a trivial way to cause this test suite to run.", + "modification": 1 +} \ No newline at end of file diff --git a/sdks/python/apache_beam/transforms/validate_runner_xlang_test.py b/sdks/python/apache_beam/transforms/validate_runner_xlang_test.py index 8e8e7964825..764167e8bc0 100644 --- a/sdks/python/apache_beam/transforms/validate_runner_xlang_test.py +++ b/sdks/python/apache_beam/transforms/validate_runner_xlang_test.py @@ -52,17 +52,27 @@ https://docs.google.com/document/d/1xQp0ElIV84b8OCVz8CD2hvbiWdR8w4BvWxPTZJZA6NA import logging import os +import random +import string import typing import unittest import pytest import apache_beam as beam +from apache_beam.options.pipeline_options import SetupOptions from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +from apache_beam.transforms.util import GcpSecret +from apache_beam.transforms.util import Secret from apache_beam.transforms.external import ImplicitSchemaPayloadBuilder +try: + from google.cloud import secretmanager +except ImportError: + secretmanager = None # type: ignore[assignment] + TEST_PREFIX_URN = "beam:transforms:xlang:test:prefix" TEST_MULTI_URN = "beam:transforms:xlang:test:multi" TEST_GBK_URN = "beam:transforms:xlang:test:gbk" @@ -140,6 +150,24 @@ class CrossLanguageTestPipelines(object): | beam.Map(lambda x: "{}:{}".format(x[0], ','.join(sorted(x[1]))))) assert_that(res, equal_to(['0:1,2', '1:3'])) + def run_group_by_key_no_assert(self, pipeline): + """ + Target transform - GroupByKey, with no assertion for checking errors + (https://beam.apache.org/documentation/programming-guide/#groupbykey) + Test scenario - Grouping a collection of KV<K,V> to a collection of + KV<K, Iterable<V>> by key + Boundary conditions checked - + - PCollection<KV<?, ?>> to external transforms + - PCollection<KV<?, Iterable<?>>> from external transforms + """ + with pipeline as p: + res = ( + p + | beam.Create([(0, "1"), (0, "2"), + (1, "3")], reshuffle=False).with_output_types( + typing.Tuple[int, str]) + | beam.ExternalTransform(TEST_GBK_URN, None, self.expansion_service)) + def run_cogroup_by_key(self, pipeline): """ Target transform - CoGroupByKey @@ -243,6 +271,42 @@ class CrossLanguageTestPipelines(object): class ValidateRunnerXlangTest(unittest.TestCase): _multiprocess_can_split_ = True + def setUp(self): + if secretmanager is not None: + self.project_id = 'apache-beam-testing' + secret_postfix = ''.join(random.choice(string.digits) for _ in range(6)) + self.secret_id = 'gbek_secret_tests_' + secret_postfix + self.client = secretmanager.SecretManagerServiceClient() + self.project_path = f'projects/{self.project_id}' + self.secret_path = f'{self.project_path}/secrets/{self.secret_id}' + try: + self.client.get_secret(request={'name': self.secret_path}) + except Exception: + self.client.create_secret( + request={ + 'parent': self.project_path, + 'secret_id': self.secret_id, + 'secret': { + 'replication': { + 'automatic': {} + } + } + }) + self.client.add_secret_version( + request={ + 'parent': self.secret_path, + 'payload': { + 'data': Secret.generate_secret_bytes() + } + }) + version_name = f'{self.secret_path}/versions/latest' + self.gcp_secret = GcpSecret(version_name) + self.secret_option = f'type:GcpSecret;version_name:{version_name}' + + def tearDown(self): + if secretmanager is not None: + self.client.delete_secret(request={'name': self.secret_path}) + def create_pipeline(self): test_pipeline = TestPipeline() test_pipeline.not_use_test_runner_api = True @@ -266,6 +330,27 @@ class ValidateRunnerXlangTest(unittest.TestCase): CrossLanguageTestPipelines().run_group_by_key( test_pipeline or self.create_pipeline()) + # This test and test_group_by_key_gbek_bad_secret validate that the gbek + # pipeline option is correctly passed through + @pytest.mark.uses_java_expansion_service + @pytest.mark.uses_python_expansion_service + @unittest.skipIf(secretmanager is None, 'GCP dependencies are not installed') + def test_group_by_key_gbek(self, test_pipeline=None): + test_pipeline = test_pipeline or self.create_pipeline() + good_secret = self.secret_option + test_pipeline.options.view_as(SetupOptions).gbek = good_secret + CrossLanguageTestPipelines().run_group_by_key(test_pipeline) + + @pytest.mark.uses_java_expansion_service + @pytest.mark.uses_python_expansion_service + @unittest.skipIf(secretmanager is None, 'GCP dependencies are not installed') + def test_group_by_key_gbek_bad_secret(self, test_pipeline=None): + test_pipeline = test_pipeline or self.create_pipeline() + nonexistent_secret = 'version_name:nonexistent_secret' + test_pipeline.options.view_as(SetupOptions).gbek = nonexistent_secret + with self.assertRaisesRegex(Exception, 'Secret string must contain a valid type parameter'): + CrossLanguageTestPipelines().run_group_by_key_no_assert(test_pipeline) + @pytest.mark.uses_java_expansion_service @pytest.mark.uses_python_expansion_service def test_cogroup_by_key(self, test_pipeline=None):
