This is an automated email from the ASF dual-hosted git repository.

tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ac008074fa5 [YAML]: add import jinja pipeline example (#35945)
ac008074fa5 is described below

commit ac008074fa54981b47bc47888c864d09e26ea2f7
Author: Derrick Williams <derric...@google.com>
AuthorDate: Wed Aug 27 16:20:40 2025 -0400

    [YAML]: add import jinja pipeline example (#35945)
    
    * add import jinja pipeline example
    
    * revert name change
    
    * update overall examples readme
    
    * fix lint issue
    
    * fix gemini small issue
    
    * Update 
sdks/python/apache_beam/yaml/examples/transforms/jinja/import/README.md
    
    ---------
    
    Co-authored-by: tvalentyn <tvalen...@users.noreply.github.com>
---
 sdks/python/apache_beam/yaml/examples/README.md    |  4 ++
 .../yaml/examples/testing/examples_test.py         | 15 ++---
 .../yaml/examples/testing/input_data.py            | 36 ++++++-----
 .../examples/transforms/jinja/import/README.md     | 63 ++++++++++++++++++++
 .../jinja/import/macros/wordCountMacros.yaml       | 64 ++++++++++++++++++++
 .../transforms/jinja/import/wordCountImport.yaml   | 69 ++++++++++++++++++++++
 6 files changed, 230 insertions(+), 21 deletions(-)

diff --git a/sdks/python/apache_beam/yaml/examples/README.md 
b/sdks/python/apache_beam/yaml/examples/README.md
index 75098d212cd..b053e3e6236 100644
--- a/sdks/python/apache_beam/yaml/examples/README.md
+++ b/sdks/python/apache_beam/yaml/examples/README.md
@@ -245,6 +245,10 @@ by leveraging Jinja templating engine for dynamic pipeline 
generation based on
 inputs from the user through `% include`, `% import`, and inheritance
 directives.
 
+Jinja `% import` directive:
+- 
[wordCountImport.yaml](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/examples/transforms/jinja/import/wordCountImport.yaml)
+- 
[Instructions](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/examples/transforms/jinja/import/README.md)
 on how to run the pipeline.
+
 Jinja `% include` directive:
 - 
[wordCountInclude.yaml](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/wordCountInclude.yaml)
 - 
[Instructions](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/README.md)
 on how to run the pipeline.
diff --git a/sdks/python/apache_beam/yaml/examples/testing/examples_test.py 
b/sdks/python/apache_beam/yaml/examples/testing/examples_test.py
index 0bfcb3f6161..80e82945523 100644
--- a/sdks/python/apache_beam/yaml/examples/testing/examples_test.py
+++ b/sdks/python/apache_beam/yaml/examples/testing/examples_test.py
@@ -353,7 +353,8 @@ def create_test_method(
     ]
     if jinja_preprocessor:
       jinja_preprocessor = jinja_preprocessor[0]
-      raw_spec_string = jinja_preprocessor(raw_spec_string)
+      raw_spec_string = jinja_preprocessor(
+          raw_spec_string, self._testMethodName)
       custom_preprocessors.remove(jinja_preprocessor)
 
     pipeline_spec = yaml.load(
@@ -563,7 +564,7 @@ def _wordcount_minimal_test_preprocessor(
 
 
 @YamlExamplesTestSuite.register_test_preprocessor(
-    ['test_wordCountInclude_yaml'])
+    ['test_wordCountInclude_yaml', 'test_wordCountImport_yaml'])
 def _wordcount_jinja_test_preprocessor(
     test_spec: dict, expected: List[str], env: TestEnvironment):
   """
@@ -676,7 +677,8 @@ def _kafka_test_preprocessor(
     'test_iceberg_migration_yaml',
     'test_ml_preprocessing_yaml',
     'test_anomaly_scoring_yaml',
-    'test_wordCountInclude_yaml'
+    'test_wordCountInclude_yaml',
+    'test_wordCountImport_yaml'
 ])
 def _io_write_test_preprocessor(
     test_spec: dict, expected: List[str], env: TestEnvironment):
@@ -1253,8 +1255,8 @@ def _batch_log_analysis_test_preprocessor(
 
 
 @YamlExamplesTestSuite.register_test_preprocessor(
-    ['test_wordCountInclude_yaml'])
-def _jinja_preprocessor(raw_spec_string: str):
+    ['test_wordCountInclude_yaml', 'test_wordCountImport_yaml'])
+def _jinja_preprocessor(raw_spec_string: str, test_name: str):
   """
   Preprocessor for Jinja-based YAML tests.
 
@@ -1274,12 +1276,11 @@ def _jinja_preprocessor(raw_spec_string: str):
   Returns:
     A string containing the fully rendered YAML pipeline specification.
   """
-
   jinja_variables = json.loads(input_data.word_count_jinja_parameter_data())
   test_file_dir = os.path.dirname(__file__)
   sdk_root = os.path.abspath(os.path.join(test_file_dir, '../../../..'))
 
-  include_files = input_data.word_count_jinja_template_data()
+  include_files = input_data.word_count_jinja_template_data(test_name)
   mock_templates = {'main_template': raw_spec_string}
   for file_path in include_files:
     full_path = os.path.join(sdk_root, file_path)
diff --git a/sdks/python/apache_beam/yaml/examples/testing/input_data.py 
b/sdks/python/apache_beam/yaml/examples/testing/input_data.py
index 50d40224f82..fb468567355 100644
--- a/sdks/python/apache_beam/yaml/examples/testing/input_data.py
+++ b/sdks/python/apache_beam/yaml/examples/testing/input_data.py
@@ -65,20 +65,28 @@ def word_count_jinja_parameter_data():
   return json.dumps(params)
 
 
-def word_count_jinja_template_data():
-  return \
-[('apache_beam/yaml/examples/transforms/jinja/'
-    'include/submodules/readFromTextTransform.yaml'),
-   ('apache_beam/yaml/examples/transforms/jinja/'
-   'include/submodules/mapToFieldsSplitConfig.yaml'),
-   ('apache_beam/yaml/examples/transforms/jinja/'
-   'include/submodules/explodeTransform.yaml'),
-   ('apache_beam/yaml/examples/transforms/jinja/'
-   'include/submodules/combineTransform.yaml'),
-   ('apache_beam/yaml/examples/transforms/jinja/'
-   'include/submodules/mapToFieldsCountConfig.yaml'),
-   ('apache_beam/yaml/examples/transforms/jinja/'
-   'include/submodules/writeToTextTransform.yaml')]
+def word_count_jinja_template_data(test_name: str) -> list[str]:
+  if test_name == 'test_wordCountInclude_yaml':
+    return [
+        'apache_beam/yaml/examples/transforms/jinja/'
+        'include/submodules/readFromTextTransform.yaml',
+        'apache_beam/yaml/examples/transforms/jinja/'
+        'include/submodules/mapToFieldsSplitConfig.yaml',
+        'apache_beam/yaml/examples/transforms/jinja/'
+        'include/submodules/explodeTransform.yaml',
+        'apache_beam/yaml/examples/transforms/jinja/'
+        'include/submodules/combineTransform.yaml',
+        'apache_beam/yaml/examples/transforms/jinja/'
+        'include/submodules/mapToFieldsCountConfig.yaml',
+        'apache_beam/yaml/examples/transforms/jinja/'
+        'include/submodules/writeToTextTransform.yaml'
+    ]
+  elif test_name == 'test_wordCountImport_yaml':
+    return [
+        'apache_beam/yaml/examples/transforms/jinja/'
+        'import/macros/wordCountMacros.yaml'
+    ]
+  return []
 
 
 def iceberg_dynamic_destinations_users_data():
diff --git 
a/sdks/python/apache_beam/yaml/examples/transforms/jinja/import/README.md 
b/sdks/python/apache_beam/yaml/examples/transforms/jinja/import/README.md
new file mode 100644
index 00000000000..14052cd3a6c
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/examples/transforms/jinja/import/README.md
@@ -0,0 +1,63 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+## Jinja % import Pipeline
+
+This example leverages the `% import` Jinja directive by having one main
+pipeline and then one macros file containing all the transforms and configs
+used.
+
+General setup:
+```sh
+export 
PIPELINE_FILE=apache_beam/yaml/examples/transforms/jinja/import/wordCountImport.yaml
+export KINGLEAR="gs://dataflow-samples/shakespeare/kinglear.txt"
+export TEMP_LOCATION="gs://MY-BUCKET/wordCounts/"
+
+cd <PATH_TO_BEAM_REPO>/beam/sdks/python
+```
+
+Multiline Run Example:
+```sh
+python -m apache_beam.yaml.main \
+  --yaml_pipeline_file="${PIPELINE_FILE}" \
+  --jinja_variables='{
+    "readFromTextTransform": {"path": "'"${KINGLEAR}"'"},
+    "mapToFieldsSplitConfig": {
+      "language": "python",
+      "fields": {
+        "value": "1"
+      }
+    },
+    "explodeTransform": {"fields": "word"},
+    "combineTransform": {
+      "group_by": "word",
+      "combine": {"value": "sum"}
+    },
+    "mapToFieldsCountConfig": {
+      "language": "python",
+      "fields": {"output": "word + \" - \" + str(value)"}
+    },
+    "writeToTextTransform": {"path": "'"${TEMP_LOCATION}"'"}
+  }'
+```
+
+Single Line Run Example:
+```sh
+python -m apache_beam.yaml.main --yaml_pipeline_file="${PIPELINE_FILE}" 
--jinja_variables='{"readFromTextTransform": {"path": 
"gs://dataflow-samples/shakespeare/kinglear.txt"}, "mapToFieldsSplitConfig": 
{"language": "python", "fields":{"value":"1"}}, 
"explodeTransform":{"fields":"word"}, "combineTransform":{"group_by":"word", 
"combine":{"value":"sum"}}, "mapToFieldsCountConfig":{"language": "python", 
"fields":{"output":"word + \" - \" + str(value)"}}, 
"writeToTextTransform":{"path":"${TE [...]
+```
diff --git 
a/sdks/python/apache_beam/yaml/examples/transforms/jinja/import/macros/wordCountMacros.yaml
 
b/sdks/python/apache_beam/yaml/examples/transforms/jinja/import/macros/wordCountMacros.yaml
new file mode 100644
index 00000000000..b3870693ef5
--- /dev/null
+++ 
b/sdks/python/apache_beam/yaml/examples/transforms/jinja/import/macros/wordCountMacros.yaml
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+{%- macro readFromTextTransform(params) -%}
+
+- name: Read from GCS
+  type: ReadFromText
+  config:
+    path: "{{ params.path }}"
+{%- endmacro -%}
+
+{%- macro mapToFieldsSplitConfig(params) -%}
+language: "{{ params.language }}"
+fields:
+  value: "{{ params.fields.value }}"
+  word:
+    callable: |-
+      import re
+      def my_mapping(row):
+        return re.findall(r'[A-Za-z\']+', row.line.lower())
+{%- endmacro -%}
+
+{%- macro explodeTransform(params) -%}
+- name: Explode word arrays
+  type: Explode
+  config:
+    fields: "{{ params.fields }}"
+{%- endmacro -%}
+
+{%- macro combineTransform(params) -%}
+- name: Count words
+  type: Combine
+  config:
+    group_by: "{{ params.group_by }}"
+    combine:
+      value: "{{ params.combine.value }}"
+{%- endmacro -%}
+
+{%- macro mapToFieldsCountConfig(params) -%}
+language: "{{ params.language }}"
+fields:
+  output: '{{ params.fields.output }}'
+{%- endmacro -%}
+
+{%- macro writeToTextTransform(params) -%}
+- name: Write to GCS
+  type: WriteToText
+  config:
+    path: "{{ params.path }}"
+{%- endmacro -%}
diff --git 
a/sdks/python/apache_beam/yaml/examples/transforms/jinja/import/wordCountImport.yaml
 
b/sdks/python/apache_beam/yaml/examples/transforms/jinja/import/wordCountImport.yaml
new file mode 100644
index 00000000000..1058a30b607
--- /dev/null
+++ 
b/sdks/python/apache_beam/yaml/examples/transforms/jinja/import/wordCountImport.yaml
@@ -0,0 +1,69 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This examples reads from a public file stored on Google Cloud. This
+# requires authenticating with Google Cloud, or setting the file in
+#`ReadFromText` to a local file.
+#
+# To set up Application Default Credentials,
+# see https://cloud.google.com/docs/authentication/external/set-up-adc.
+#
+# This pipeline reads in a text file, counts distinct words found in the text,
+# then logs a row containing each word and its count.
+
+{% import 
'apache_beam/yaml/examples/transforms/jinja/import/macros/wordCountMacros.yaml' 
as macros %}
+
+pipeline:
+  type: chain
+  transforms:
+
+# Read in text file
+{{ macros.readFromTextTransform(readFromTextTransform) | indent(4, true) }}
+
+# Split words and count occurrences
+    - name: Split words
+      type: MapToFields
+      config:
+{{ macros.mapToFieldsSplitConfig(mapToFieldsSplitConfig) | indent(8, true) }}
+
+# Explode into individual words
+{{ macros.explodeTransform(explodeTransform) | indent(4, true) }}
+
+# Group by word
+{{ macros.combineTransform(combineTransform) | indent(4, true) }}
+
+# Format output to a single string consisting of `word - count`
+    - name: Format output
+      type: MapToFields
+      config:
+{{ macros.mapToFieldsCountConfig(mapToFieldsCountConfig) | indent(8, true) }}
+
+# Write to text file on GCS, locally, etc
+{{ macros.writeToTextTransform(writeToTextTransform) | indent(4, true) }}
+
+# Expected:
+#  Row(output='king - 311')
+#  Row(output='lear - 253')
+#  Row(output='dramatis - 1')
+#  Row(output='personae - 1')
+#  Row(output='of - 483')
+#  Row(output='britain - 2')
+#  Row(output='france - 32')
+#  Row(output='duke - 26')
+#  Row(output='burgundy - 20')
+#  Row(output='cornwall - 75')

Reply via email to