This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d7fa511ee8a [yaml] : add more error handling tests and website example 
(#37245)
d7fa511ee8a is described below

commit d7fa511ee8a1ebaf5ee020e253d1bfe0bd6e0767
Author: Derrick Williams <[email protected]>
AuthorDate: Thu Jan 8 16:14:39 2026 -0500

    [yaml] : add more error handling tests and website example (#37245)
    
    * add more error handling and provider tests
    
    * provide example of provider with error handling
---
 .../python/apache_beam/yaml/yaml_transform_test.py | 155 +++++++++++++++++++++
 .../content/en/documentation/sdks/yaml-errors.md   |  58 ++++++++
 2 files changed, 213 insertions(+)

diff --git a/sdks/python/apache_beam/yaml/yaml_transform_test.py 
b/sdks/python/apache_beam/yaml/yaml_transform_test.py
index 5cf2fa00f15..2afb5e7d8e3 100644
--- a/sdks/python/apache_beam/yaml/yaml_transform_test.py
+++ b/sdks/python/apache_beam/yaml/yaml_transform_test.py
@@ -19,6 +19,7 @@ import collections
 import glob
 import logging
 import os
+import shutil
 import tempfile
 import unittest
 
@@ -911,6 +912,60 @@ class ErrorHandlingTest(unittest.TestCase):
             ''',
             providers=TEST_PROVIDERS)
 
+  def test_error_handling_log_combined_errors(self):
+    with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+        pickle_library='cloudpickle')) as p:
+      result = p | YamlTransform(
+          '''
+          type: composite
+          transforms:
+            - type: Create
+              name: Input1
+              config:
+                  elements: [1, 2, 0]
+            - type: Create
+              name: Input2
+              config:
+                  elements: [3, 'a', 5]
+            - type: MapToFields
+              name: Inverse
+              input: Input1
+              config:
+                  language: python
+                  fields:
+                    inverse: "1 / element"
+                  error_handling:
+                    output: errors
+            - type: MapToFields
+              name: Square
+              input: Input2
+              config:
+                  language: python
+                  fields:
+                    square: "element * element"
+                  error_handling:
+                    output: errors
+            - type: LogForTesting
+              input:
+                - Inverse.errors
+                - Square.errors
+            - type: Flatten
+              name: GoodData
+              input:
+                - Inverse
+                - Square
+          output: GoodData
+          ''',
+          providers=TEST_PROVIDERS)
+      assert_that(
+          result,
+          equal_to([
+              beam.Row(inverse=1.0, square=None),
+              beam.Row(inverse=0.5, square=None),
+              beam.Row(square=9, inverse=None),
+              beam.Row(square=25, inverse=None)
+          ]))
+
   def test_mapping_errors(self):
     with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
         pickle_library='cloudpickle')) as p:
@@ -1297,6 +1352,106 @@ class ProviderAffinityTest(unittest.TestCase):
           label='StartWith3')
 
 
+class TestExternalYamlProvider(unittest.TestCase):
+  def setUp(self):
+    self.temp_dir = tempfile.mkdtemp()
+    self.provider_path = os.path.join(self.temp_dir, 'power_provider.yaml')
+    with open(self.provider_path, 'w') as f:
+      f.write(
+          """
+- type: yaml
+  transforms:
+    RaiseElementToPower:
+      config_schema:
+        properties:
+          n: {type: integer}
+      body:
+        type: MapToFields
+        config:
+          language: python
+          append: true
+          fields:
+            power: "element ** {{n}}"
+          error_handling:
+            output: my_error
+""")
+
+  def tearDown(self):
+    shutil.rmtree(self.temp_dir)
+
+  def test_provider_with_error_handling(self):
+    loaded_providers = yaml_provider.load_providers(self.provider_path)
+    test_providers = yaml_provider.InlineProvider(TEST_PROVIDERS)
+    merged_providers = yaml_provider.merge_providers(
+        loaded_providers, [test_providers])
+
+    with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+        pickle_library='cloudpickle')) as p:
+      results = p | YamlTransform(
+          '''
+          type: composite
+          transforms:
+            - type: Create
+              config:
+                elements: [2, 'bad', 3]
+            - type: RaiseElementToPower
+              input: Create
+              config:
+                n: 2
+            - type: PyMap
+              name: TrimErrors
+              input: RaiseElementToPower.my_error
+              config:
+                  fn: "lambda x: x.msg"
+          output:
+            good: RaiseElementToPower.good
+            bad: TrimErrors
+          ''',
+          providers=merged_providers)
+
+      assert_that(
+          results['good'],
+          equal_to([beam.Row(element=2, power=4), beam.Row(element=3,
+                                                           power=9)]),
+          label="CheckGood")
+      assert_that(
+          results['bad'],
+          equal_to([
+              'TypeError("unsupported operand type(s) for ** or pow(): ' +
+              '\'str\' and \'int\'")'
+          ]),
+          label="CheckBad")
+
+  def test_must_consume_error_output(self):
+    # By adding a dummy error_handling block here, we signal to the static
+    # checker that this transform has an error output that must be consumed.
+    # The framework is able to handle the "nesting" where the provider for
+    # RaiseElementToPower also defines error handling internally.
+    loaded_providers = yaml_provider.load_providers(self.provider_path)
+    test_providers = yaml_provider.InlineProvider(TEST_PROVIDERS)
+    merged_providers = yaml_provider.merge_providers(
+        loaded_providers, [test_providers])
+
+    with self.assertRaisesRegex(Exception, 'Unconsumed error output.*'):
+      with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+          pickle_library='cloudpickle')) as p:
+        _ = p | YamlTransform(
+            '''
+            type: composite
+            transforms:
+              - type: Create
+                config:
+                  elements: [2, 'bad', 3]
+              - type: RaiseElementToPower
+                input: Create
+                config:
+                  n: 2
+                  error_handling:
+                    output: my_error
+            ''',
+            providers=merged_providers)
+
+
 @beam.transforms.ptransform.annotate_yaml
 class LinearTransform(beam.PTransform):
   """A transform used for testing annotate_yaml."""
diff --git a/website/www/site/content/en/documentation/sdks/yaml-errors.md 
b/website/www/site/content/en/documentation/sdks/yaml-errors.md
index 8a836890a73..34a124fc606 100644
--- a/website/www/site/content/en/documentation/sdks/yaml-errors.md
+++ b/website/www/site/content/en/documentation/sdks/yaml-errors.md
@@ -218,4 +218,62 @@ pipeline:
         path: /path/to/errors.json
 ```
 
+## Error Handling with Custom Providers
+Custom transforms, such as those defined in separate YAML files via a 
`YamlProvider`, can also expose error outputs from their underlying transforms.
+
+Consider a file `my_transforms.yaml` that defines a `RaiseElementToPower` 
transform:
+```yaml
+# my_transforms.yaml
+- type: yaml
+  transforms:
+    RaiseElementToPower:
+      config_schema:
+        properties:
+          n: {type: integer}
+      body:
+        type: MapToFields
+        config:
+          language: python
+          append: true
+          fields:
+            power: "element ** {{n}}"
+          # This transform internally defines and exposes an error output.
+          error_handling:
+            output: my_error
+```
+This transform takes a numeric element and raises it to the power of `n`. If 
the element is not a number, it will produce an error. The error output from 
the internal `MapToFields` is named `my_error`. This error output is 
automatically exposed by the `RaiseElementToPower` transform.
+
+When using this transform in a pipeline, you can access this error output and 
handle it. The main output of the transform will contain only the successfully 
processed elements.
+
+```yaml
+pipeline:
+  transforms:
+    - type: Create
+      config:
+        elements: [2, 'bad', 3]
+    - type: RaiseElementToPower
+      input: Create
+      config:
+        n: 2
+    - type: WriteToJson
+      name: WriteGood
+      # The main output contains successfully processed elements.
+      input: RaiseElementToPower
+      config:
+        path: /path/to/good
+    - type: WriteToJson
+      name: WriteBad
+      # The error output is accessed by its name.
+      input: RaiseElementToPower.my_error
+      config:
+        path: /path/to/bad
+
+  providers:
+    - include: my_transforms.yaml
+
+```
+In this example, the pipeline separates the good and bad records coming from 
the custom `RaiseElementToPower` transform. The good records are written to one 
location, and the error records are written to another.
+
+A pipeline will fail at construction time if an error output is declared 
(either in a built-in transform or a custom one) but not consumed. This helps 
ensure that all error paths are considered.
+
 See YAML schema 
[info](https://beam.apache.org/documentation/sdks/yaml-schema/) for another use 
of error_handling in a schema context.

Reply via email to