This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 19f9eb06fb5 [yaml] - add multi-line windowing json support (#37174)
19f9eb06fb5 is described below
commit 19f9eb06fb53271b5cd814e34141e50c55f913ce
Author: Derrick Williams <[email protected]>
AuthorDate: Mon Jan 5 15:35:19 2026 -0500
[yaml] - add multi-line windowing json support (#37174)
* add windowing json support
* add a few tests
* remove code change and just have tests
* remove extra line
* add multiline windowing config support back
* remove extra line
---
sdks/python/apache_beam/yaml/yaml_transform.py | 15 ++++++
.../python/apache_beam/yaml/yaml_transform_test.py | 55 ++++++++++++++++++++++
2 files changed, 70 insertions(+)
diff --git a/sdks/python/apache_beam/yaml/yaml_transform.py
b/sdks/python/apache_beam/yaml/yaml_transform.py
index bd1fc8da901..ef065d8a3c4 100644
--- a/sdks/python/apache_beam/yaml/yaml_transform.py
+++ b/sdks/python/apache_beam/yaml/yaml_transform.py
@@ -1033,6 +1033,21 @@ def preprocess_windowing(spec):
if 'windowing' in spec:
spec['config'] = spec.get('config', {})
spec['config']['windowing'] = spec.pop('windowing')
+
+ if spec.get('config', {}).get('windowing'):
+ windowing_config = spec['config']['windowing']
+ if isinstance(windowing_config, str):
+ try:
+ # PyYAML can load a JSON string - one-line and multi-line.
+ # Without this code, multi-line is not supported.
+ parsed_config = yaml.safe_load(windowing_config)
+ if not isinstance(parsed_config, dict):
+ raise TypeError('Windowing config string must be a YAML/JSON map.')
+ spec['config']['windowing'] = parsed_config
+ except Exception as e:
+ raise ValueError(
+ f'Error parsing windowing config string at \
+ {identify_object(spec)}: {e}') from e
return spec
elif 'windowing' not in spec:
# Nothing to do.
diff --git a/sdks/python/apache_beam/yaml/yaml_transform_test.py
b/sdks/python/apache_beam/yaml/yaml_transform_test.py
index d5950fb9efa..89e4dc8b951 100644
--- a/sdks/python/apache_beam/yaml/yaml_transform_test.py
+++ b/sdks/python/apache_beam/yaml/yaml_transform_test.py
@@ -993,6 +993,61 @@ class YamlWindowingTest(unittest.TestCase):
providers=TEST_PROVIDERS)
assert_that(result, equal_to([6, 9]))
+ def test_explicit_window_into_with_json_string_config_one_line(self):
+ with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+ pickle_library='cloudpickle')) as p:
+ result = p | YamlTransform(
+ '''
+ type: chain
+ transforms:
+ - type: CreateTimestamped
+ config:
+ elements: [0, 1, 2, 3, 4, 5]
+ - type: WindowInto
+ config:
+ windowing: {"type": "fixed", "size": "4s"}
+ - type: SumGlobally
+ ''',
+ providers=TEST_PROVIDERS)
+ assert_that(result, equal_to([6, 9]))
+
+ def test_explicit_window_into_with_json_string_config_multi_line(self):
+ with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+ pickle_library='cloudpickle')) as p:
+ result = p | YamlTransform(
+ '''
+ type: chain
+ transforms:
+ - type: CreateTimestamped
+ config:
+ elements: [0, 1, 2, 3, 4, 5]
+ - type: WindowInto
+ config:
+ windowing: |
+ {"type": "fixed", "size": "4s"}
+ - type: SumGlobally
+ ''',
+ providers=TEST_PROVIDERS)
+ assert_that(result, equal_to([6, 9]))
+
+ def test_explicit_window_into_with_string_config_fails(self):
+ with self.assertRaisesRegex(ValueError, 'Error parsing windowing config'):
+ with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+ pickle_library='cloudpickle')) as p:
+ _ = p | YamlTransform(
+ '''
+ type: chain
+ transforms:
+ - type: CreateTimestamped
+ config:
+ elements: [0, 1, 2, 3, 4, 5]
+ - type: WindowInto
+ config:
+ windowing: |
+ 'not a valid yaml'
+ ''',
+ providers=TEST_PROVIDERS)
+
def test_windowing_on_input(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p: