This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 19f9eb06fb5 [yaml] - add multi-line windowing json support (#37174)
19f9eb06fb5 is described below

commit 19f9eb06fb53271b5cd814e34141e50c55f913ce
Author: Derrick Williams <[email protected]>
AuthorDate: Mon Jan 5 15:35:19 2026 -0500

    [yaml] - add multi-line windowing json support (#37174)
    
    * add windowing json support
    
    * add a few tests
    
    * remove code change and just have tests
    
    * remove extra line
    
    * add multiline windowing config support back
    
    * remove extra line
---
 sdks/python/apache_beam/yaml/yaml_transform.py     | 15 ++++++
 .../python/apache_beam/yaml/yaml_transform_test.py | 55 ++++++++++++++++++++++
 2 files changed, 70 insertions(+)

diff --git a/sdks/python/apache_beam/yaml/yaml_transform.py 
b/sdks/python/apache_beam/yaml/yaml_transform.py
index bd1fc8da901..ef065d8a3c4 100644
--- a/sdks/python/apache_beam/yaml/yaml_transform.py
+++ b/sdks/python/apache_beam/yaml/yaml_transform.py
@@ -1033,6 +1033,21 @@ def preprocess_windowing(spec):
     if 'windowing' in spec:
       spec['config'] = spec.get('config', {})
       spec['config']['windowing'] = spec.pop('windowing')
+
+    if spec.get('config', {}).get('windowing'):
+      windowing_config = spec['config']['windowing']
+      if isinstance(windowing_config, str):
+        try:
+          # PyYAML can load a JSON string - one-line and multi-line.
+          # Without this code, multi-line is not supported.
+          parsed_config = yaml.safe_load(windowing_config)
+          if not isinstance(parsed_config, dict):
+            raise TypeError('Windowing config string must be a YAML/JSON map.')
+          spec['config']['windowing'] = parsed_config
+        except Exception as e:
+          raise ValueError(
+              f'Error parsing windowing config string at \
+                {identify_object(spec)}: {e}') from e
     return spec
   elif 'windowing' not in spec:
     # Nothing to do.
diff --git a/sdks/python/apache_beam/yaml/yaml_transform_test.py 
b/sdks/python/apache_beam/yaml/yaml_transform_test.py
index d5950fb9efa..89e4dc8b951 100644
--- a/sdks/python/apache_beam/yaml/yaml_transform_test.py
+++ b/sdks/python/apache_beam/yaml/yaml_transform_test.py
@@ -993,6 +993,61 @@ class YamlWindowingTest(unittest.TestCase):
           providers=TEST_PROVIDERS)
       assert_that(result, equal_to([6, 9]))
 
+  def test_explicit_window_into_with_json_string_config_one_line(self):
+    with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+        pickle_library='cloudpickle')) as p:
+      result = p | YamlTransform(
+          '''
+          type: chain
+          transforms:
+            - type: CreateTimestamped
+              config:
+                  elements: [0, 1, 2, 3, 4, 5]
+            - type: WindowInto
+              config:
+                windowing: {"type": "fixed", "size": "4s"}
+            - type: SumGlobally
+          ''',
+          providers=TEST_PROVIDERS)
+      assert_that(result, equal_to([6, 9]))
+
+  def test_explicit_window_into_with_json_string_config_multi_line(self):
+    with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+        pickle_library='cloudpickle')) as p:
+      result = p | YamlTransform(
+          '''
+          type: chain
+          transforms:
+            - type: CreateTimestamped
+              config:
+                  elements: [0, 1, 2, 3, 4, 5]
+            - type: WindowInto
+              config:
+                windowing: |
+                  {"type": "fixed", "size": "4s"}
+            - type: SumGlobally
+          ''',
+          providers=TEST_PROVIDERS)
+      assert_that(result, equal_to([6, 9]))
+
+  def test_explicit_window_into_with_string_config_fails(self):
+    with self.assertRaisesRegex(ValueError, 'Error parsing windowing config'):
+      with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+          pickle_library='cloudpickle')) as p:
+        _ = p | YamlTransform(
+            '''
+            type: chain
+            transforms:
+              - type: CreateTimestamped
+                config:
+                    elements: [0, 1, 2, 3, 4, 5]
+              - type: WindowInto
+                config:
+                  windowing: |
+                    'not a valid yaml'
+            ''',
+            providers=TEST_PROVIDERS)
+
   def test_windowing_on_input(self):
     with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
         pickle_library='cloudpickle')) as p:

Reply via email to