This is an automated email from the ASF dual-hosted git repository.

tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 78eb4a2a2d7 fix: Enable `create_test` to correctly parse and apply 
external providers defined in YAML pipeline specifications. (#37216)
78eb4a2a2d7 is described below

commit 78eb4a2a2d7153e24b86cfea418b9b317ebebe04
Author: liferoad <[email protected]>
AuthorDate: Mon Jan 5 08:17:10 2026 -0500

    fix: Enable `create_test` to correctly parse and apply external providers 
defined in YAML pipeline specifications. (#37216)
---
 sdks/python/apache_beam/yaml/yaml_testing.py      | 21 +++++++++---
 sdks/python/apache_beam/yaml/yaml_testing_test.py | 39 +++++++++++++++++++++++
 2 files changed, 55 insertions(+), 5 deletions(-)

diff --git a/sdks/python/apache_beam/yaml/yaml_testing.py 
b/sdks/python/apache_beam/yaml/yaml_testing.py
index ad31afa927e..ead3ab9de31 100644
--- a/sdks/python/apache_beam/yaml/yaml_testing.py
+++ b/sdks/python/apache_beam/yaml/yaml_testing.py
@@ -411,6 +411,13 @@ def create_test(
         **yaml_transform.SafeLineLoader.strip_metadata(
             pipeline_spec.get('options', {})))
 
+  providers = yaml_provider.merge_providers(
+      yaml_provider.parse_providers('', pipeline_spec.get('providers', [])),
+      {
+          'AssertEqualAndRecord': yaml_provider.as_provider_list(
+              'AssertEqualAndRecord', AssertEqualAndRecord)
+      })
+
   def get_name(transform):
     if 'name' in transform:
       return str(transform['name'])
@@ -428,7 +435,8 @@ def create_test(
   mock_outputs = [{
       'name': get_name(t),
       'elements': [
-          _try_row_as_dict(row) for row in _first_n(t, options, max_num_inputs)
+          _try_row_as_dict(row)
+          for row in _first_n(t, options, max_num_inputs, providers)
       ],
   } for t in input_transforms]
 
@@ -504,15 +512,18 @@ class RecordElements(beam.PTransform):
     return pcoll | beam.Map(record)
 
 
-def _first_n(transform_spec, options, n):
+def _first_n(transform_spec, options, n, providers=None):
   recorder = RecordElements(n)
+  if providers is None:
+    providers = {
+        'AssertEqualAndRecord': yaml_provider.as_provider_list(
+            'AssertEqualAndRecord', AssertEqualAndRecord)
+    }
   try:
     with beam.Pipeline(options=options) as p:
       _ = (
           p
-          | yaml_transform.YamlTransform(
-              transform_spec,
-              providers={'AssertEqualAndRecord': AssertEqualAndRecord})
+          | yaml_transform.YamlTransform(transform_spec, providers=providers)
           | recorder)
   except _DoneException:
     pass
diff --git a/sdks/python/apache_beam/yaml/yaml_testing_test.py 
b/sdks/python/apache_beam/yaml/yaml_testing_test.py
index 9bb0e64b6db..70e9246e4d3 100644
--- a/sdks/python/apache_beam/yaml/yaml_testing_test.py
+++ b/sdks/python/apache_beam/yaml/yaml_testing_test.py
@@ -356,6 +356,45 @@ class YamlTestingTest(unittest.TestCase):
             }]
         })
 
+  def test_create_with_external_providers(self):
+    """Test that create_test works with external providers defined in the
+    pipeline spec.
+
+    This test validates the fix for issue #37136 where external providers
+    defined in YAML files were not recognized when running tests.
+    """
+    pipeline = '''
+    pipeline:
+      type: chain
+      transforms:
+        - type: Create
+          config:
+            elements:
+              - {a: 1, b: 2}
+              - {a: 2, b: 3}
+              - {a: 3, b: 4}
+              - {a: 4, b: 5}
+              - {a: 5, b: 6}
+        - type: MyCustomTransform
+        - type: LogForTesting
+    providers:
+      - type: yaml
+        transforms:
+          MyCustomTransform:
+            body:
+              type: MapToFields
+              config:
+                language: python
+                fields:
+                  sum_ab: a + b
+    '''
+    test_spec = yaml_testing.create_test(
+        pipeline, max_num_inputs=10, min_num_outputs=3)
+
+    self.assertEqual(len(test_spec['expected_inputs']), 1)
+    self.assertGreaterEqual(len(test_spec['expected_inputs'][0]['elements']), 
3)
+    yaml_testing.run_test(pipeline, test_spec)
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)

Reply via email to