This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d7fa511ee8a [yaml] : add more error handling tests and website example
(#37245)
d7fa511ee8a is described below
commit d7fa511ee8a1ebaf5ee020e253d1bfe0bd6e0767
Author: Derrick Williams <[email protected]>
AuthorDate: Thu Jan 8 16:14:39 2026 -0500
[yaml] : add more error handling tests and website example (#37245)
* add more error handling and provider tests
* provide example of provider with error handling
---
.../python/apache_beam/yaml/yaml_transform_test.py | 155 +++++++++++++++++++++
.../content/en/documentation/sdks/yaml-errors.md | 58 ++++++++
2 files changed, 213 insertions(+)
diff --git a/sdks/python/apache_beam/yaml/yaml_transform_test.py
b/sdks/python/apache_beam/yaml/yaml_transform_test.py
index 5cf2fa00f15..2afb5e7d8e3 100644
--- a/sdks/python/apache_beam/yaml/yaml_transform_test.py
+++ b/sdks/python/apache_beam/yaml/yaml_transform_test.py
@@ -19,6 +19,7 @@ import collections
import glob
import logging
import os
+import shutil
import tempfile
import unittest
@@ -911,6 +912,60 @@ class ErrorHandlingTest(unittest.TestCase):
''',
providers=TEST_PROVIDERS)
+ def test_error_handling_log_combined_errors(self):
+ with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+ pickle_library='cloudpickle')) as p:
+ result = p | YamlTransform(
+ '''
+ type: composite
+ transforms:
+ - type: Create
+ name: Input1
+ config:
+ elements: [1, 2, 0]
+ - type: Create
+ name: Input2
+ config:
+ elements: [3, 'a', 5]
+ - type: MapToFields
+ name: Inverse
+ input: Input1
+ config:
+ language: python
+ fields:
+ inverse: "1 / element"
+ error_handling:
+ output: errors
+ - type: MapToFields
+ name: Square
+ input: Input2
+ config:
+ language: python
+ fields:
+ square: "element * element"
+ error_handling:
+ output: errors
+ - type: LogForTesting
+ input:
+ - Inverse.errors
+ - Square.errors
+ - type: Flatten
+ name: GoodData
+ input:
+ - Inverse
+ - Square
+ output: GoodData
+ ''',
+ providers=TEST_PROVIDERS)
+ assert_that(
+ result,
+ equal_to([
+ beam.Row(inverse=1.0, square=None),
+ beam.Row(inverse=0.5, square=None),
+ beam.Row(square=9, inverse=None),
+ beam.Row(square=25, inverse=None)
+ ]))
+
def test_mapping_errors(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
@@ -1297,6 +1352,106 @@ class ProviderAffinityTest(unittest.TestCase):
label='StartWith3')
+class TestExternalYamlProvider(unittest.TestCase):
+ def setUp(self):
+ self.temp_dir = tempfile.mkdtemp()
+ self.provider_path = os.path.join(self.temp_dir, 'power_provider.yaml')
+ with open(self.provider_path, 'w') as f:
+ f.write(
+ """
+- type: yaml
+ transforms:
+ RaiseElementToPower:
+ config_schema:
+ properties:
+ n: {type: integer}
+ body:
+ type: MapToFields
+ config:
+ language: python
+ append: true
+ fields:
+ power: "element ** {{n}}"
+ error_handling:
+ output: my_error
+""")
+
+ def tearDown(self):
+ shutil.rmtree(self.temp_dir)
+
+ def test_provider_with_error_handling(self):
+ loaded_providers = yaml_provider.load_providers(self.provider_path)
+ test_providers = yaml_provider.InlineProvider(TEST_PROVIDERS)
+ merged_providers = yaml_provider.merge_providers(
+ loaded_providers, [test_providers])
+
+ with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+ pickle_library='cloudpickle')) as p:
+ results = p | YamlTransform(
+ '''
+ type: composite
+ transforms:
+ - type: Create
+ config:
+ elements: [2, 'bad', 3]
+ - type: RaiseElementToPower
+ input: Create
+ config:
+ n: 2
+ - type: PyMap
+ name: TrimErrors
+ input: RaiseElementToPower.my_error
+ config:
+ fn: "lambda x: x.msg"
+ output:
+ good: RaiseElementToPower.good
+ bad: TrimErrors
+ ''',
+ providers=merged_providers)
+
+ assert_that(
+ results['good'],
+ equal_to([beam.Row(element=2, power=4), beam.Row(element=3,
+ power=9)]),
+ label="CheckGood")
+ assert_that(
+ results['bad'],
+ equal_to([
+ 'TypeError("unsupported operand type(s) for ** or pow(): ' +
+ '\'str\' and \'int\'")'
+ ]),
+ label="CheckBad")
+
+ def test_must_consume_error_output(self):
+ # By adding a dummy error_handling block here, we signal to the static
+ # checker that this transform has an error output that must be consumed.
+ # The framework is able to handle the "nesting" where the provider for
+ # RaiseElementToPower also defines error handling internally.
+ loaded_providers = yaml_provider.load_providers(self.provider_path)
+ test_providers = yaml_provider.InlineProvider(TEST_PROVIDERS)
+ merged_providers = yaml_provider.merge_providers(
+ loaded_providers, [test_providers])
+
+ with self.assertRaisesRegex(Exception, 'Unconsumed error output.*'):
+ with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+ pickle_library='cloudpickle')) as p:
+ _ = p | YamlTransform(
+ '''
+ type: composite
+ transforms:
+ - type: Create
+ config:
+ elements: [2, 'bad', 3]
+ - type: RaiseElementToPower
+ input: Create
+ config:
+ n: 2
+ error_handling:
+ output: my_error
+ ''',
+ providers=merged_providers)
+
+
@beam.transforms.ptransform.annotate_yaml
class LinearTransform(beam.PTransform):
"""A transform used for testing annotate_yaml."""
diff --git a/website/www/site/content/en/documentation/sdks/yaml-errors.md
b/website/www/site/content/en/documentation/sdks/yaml-errors.md
index 8a836890a73..34a124fc606 100644
--- a/website/www/site/content/en/documentation/sdks/yaml-errors.md
+++ b/website/www/site/content/en/documentation/sdks/yaml-errors.md
@@ -218,4 +218,62 @@ pipeline:
path: /path/to/errors.json
```
+## Error Handling with Custom Providers
+Custom transforms, such as those defined in separate YAML files via a
`YamlProvider`, can also expose error outputs from their underlying transforms.
+
+Consider a file `my_transforms.yaml` that defines a `RaiseElementToPower`
transform:
+```yaml
+# my_transforms.yaml
+- type: yaml
+ transforms:
+ RaiseElementToPower:
+ config_schema:
+ properties:
+ n: {type: integer}
+ body:
+ type: MapToFields
+ config:
+ language: python
+ append: true
+ fields:
+ power: "element ** {{n}}"
+ # This transform internally defines and exposes an error output.
+ error_handling:
+ output: my_error
+```
+This transform takes a numeric element and raises it to the power of `n`. If
the element is not a number, it will produce an error. The error output from
the internal `MapToFields` is named `my_error`. This error output is
automatically exposed by the `RaiseElementToPower` transform.
+
+When using this transform in a pipeline, you can access this error output and
handle it. The main output of the transform will contain only the successfully
processed elements.
+
+```yaml
+pipeline:
+ transforms:
+ - type: Create
+ config:
+ elements: [2, 'bad', 3]
+ - type: RaiseElementToPower
+ input: Create
+ config:
+ n: 2
+ - type: WriteToJson
+ name: WriteGood
+ # The main output contains successfully processed elements.
+ input: RaiseElementToPower
+ config:
+ path: /path/to/good
+ - type: WriteToJson
+ name: WriteBad
+ # The error output is accessed by its name.
+ input: RaiseElementToPower.my_error
+ config:
+ path: /path/to/bad
+
+ providers:
+ - include: my_transforms.yaml
+
+```
+In this example, the pipeline separates the good and bad records coming from
the custom `RaiseElementToPower` transform. The good records are written to one
location, and the error records are written to another.
+
+A pipeline will fail at construction time if an error output is declared
(either in a built-in transform or a custom one) but not consumed. This helps
ensure that all error paths are considered.
+
See YAML schema
[info](https://beam.apache.org/documentation/sdks/yaml-schema/) for another use
of error_handling in a schema context.