This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch users/damccorm/gbek-xlang-py-tests
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 5064168fb1d491c3aab8265ee865e3a782dd775f
Author: Danny Mccormick <[email protected]>
AuthorDate: Thu Oct 9 14:59:44 2025 -0400

    Add some x-lang gbek tests
---
 ...am_PostCommit_XVR_PythonUsingJava_Dataflow.json |  4 +
 .../transforms/validate_runner_xlang_test.py       | 93 ++++++++++++++++++++++
 2 files changed, 97 insertions(+)

diff --git 
a/.github/trigger_files/beam_PostCommit_XVR_PythonUsingJava_Dataflow.json 
b/.github/trigger_files/beam_PostCommit_XVR_PythonUsingJava_Dataflow.json
new file mode 100644
index 00000000000..b73af5e61a4
--- /dev/null
+++ b/.github/trigger_files/beam_PostCommit_XVR_PythonUsingJava_Dataflow.json
@@ -0,0 +1,4 @@
+{
+    "comment": "Modify this file in a trivial way to cause this test suite to 
run.",
+    "modification": 1
+}
diff --git a/sdks/python/apache_beam/transforms/validate_runner_xlang_test.py 
b/sdks/python/apache_beam/transforms/validate_runner_xlang_test.py
index 8e8e7964825..40f7478fd5d 100644
--- a/sdks/python/apache_beam/transforms/validate_runner_xlang_test.py
+++ b/sdks/python/apache_beam/transforms/validate_runner_xlang_test.py
@@ -52,17 +52,27 @@ 
https://docs.google.com/document/d/1xQp0ElIV84b8OCVz8CD2hvbiWdR8w4BvWxPTZJZA6NA
 
 import logging
 import os
+import random
+import string
 import typing
 import unittest
 
 import pytest
 
 import apache_beam as beam
+from apache_beam.options.pipeline_options import SetupOptions
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
+from apache_beam.transforms.util import GcpSecret
+from apache_beam.transforms.util import Secret
 from apache_beam.transforms.external import ImplicitSchemaPayloadBuilder
 
+try:
+  from google.cloud import secretmanager
+except ImportError:
+  secretmanager = None  # type: ignore[assignment]
+
 TEST_PREFIX_URN = "beam:transforms:xlang:test:prefix"
 TEST_MULTI_URN = "beam:transforms:xlang:test:multi"
 TEST_GBK_URN = "beam:transforms:xlang:test:gbk"
@@ -140,6 +150,24 @@ class CrossLanguageTestPipelines(object):
           | beam.Map(lambda x: "{}:{}".format(x[0], ','.join(sorted(x[1])))))
       assert_that(res, equal_to(['0:1,2', '1:3']))
 
+  def run_group_by_key_no_assert(self, pipeline):
+    """
+    Target transform - GroupByKey, with no assertion for checking errors
+    (https://beam.apache.org/documentation/programming-guide/#groupbykey)
+    Test scenario - Grouping a collection of KV<K,V> to a collection of
+    KV<K, Iterable<V>> by key
+    Boundary conditions checked -
+     - PCollection<KV<?, ?>> to external transforms
+     - PCollection<KV<?, Iterable<?>>> from external transforms
+    """
+    with pipeline as p:
+      (
+          p
+          | beam.Create([(0, "1"), (0, "2"),
+                         (1, "3")], reshuffle=False).with_output_types(
+                             typing.Tuple[int, str])
+          | beam.ExternalTransform(TEST_GBK_URN, None, self.expansion_service))
+
   def run_cogroup_by_key(self, pipeline):
     """
     Target transform - CoGroupByKey
@@ -298,6 +326,71 @@ class ValidateRunnerXlangTest(unittest.TestCase):
         test_pipeline or self.create_pipeline())
 
 
[email protected](
+    os.environ.get('EXPANSION_PORT'),
+    "EXPANSION_PORT environment var is not provided.")
[email protected](secretmanager is None, 'secretmanager not installed')
+class ValidateRunnerGBEKTest(unittest.TestCase):
+  def setUp(self):
+    if secretmanager is not None:
+      self.project_id = 'apache-beam-testing'
+      secret_postfix = ''.join(random.choice(string.digits) for _ in range(6))
+      self.secret_id = 'gbek_secret_tests_' + secret_postfix
+      self.client = secretmanager.SecretManagerServiceClient()
+      self.project_path = f'projects/{self.project_id}'
+      self.secret_path = f'{self.project_path}/secrets/{self.secret_id}'
+      try:
+        self.client.get_secret(request={'name': self.secret_path})
+      except Exception:
+        self.client.create_secret(
+            request={
+                'parent': self.project_path,
+                'secret_id': self.secret_id,
+                'secret': {
+                    'replication': {
+                        'automatic': {}
+                    }
+                }
+            })
+        self.client.add_secret_version(
+            request={
+                'parent': self.secret_path,
+                'payload': {
+                    'data': Secret.generate_secret_bytes()
+                }
+            })
+      version_name = f'{self.secret_path}/versions/latest'
+      self.gcp_secret = GcpSecret(version_name)
+      self.secret_option = f'type:GcpSecret;version_name:{version_name}'
+
+  def tearDown(self):
+    if secretmanager is not None:
+      self.client.delete_secret(request={'name': self.secret_path})
+
+  def create_pipeline(self):
+    test_pipeline = TestPipeline()
+    test_pipeline.not_use_test_runner_api = True
+    return test_pipeline
+
+  # This test and test_group_by_key_gbek_bad_secret validate that the gbek
+  # pipeline option is correctly passed through
+  @pytest.mark.uses_java_expansion_service
+  @pytest.mark.uses_python_expansion_service
+  def test_group_by_key_gbek(self, test_pipeline=None):
+    test_pipeline = test_pipeline or self.create_pipeline()
+    good_secret = self.secret_option
+    test_pipeline.options.view_as(SetupOptions).gbek = good_secret
+    CrossLanguageTestPipelines().run_group_by_key(test_pipeline)
+
+    # Verify actually using secret manager
+    test_pipeline = self.create_pipeline()
+    nonexistent_secret = 'version_name:nonexistent_secret'
+    test_pipeline.options.view_as(SetupOptions).gbek = nonexistent_secret
+    with self.assertRaisesRegex(
+        Exception, 'Secret string must contain a valid type parameter'):
+      CrossLanguageTestPipelines().run_group_by_key_no_assert(test_pipeline)
+
+
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()

Reply via email to