This is an automated email from the ASF dual-hosted git repository. tvalentyn pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new ac008074fa5 [YAML]: add import jinja pipeline example (#35945) ac008074fa5 is described below commit ac008074fa54981b47bc47888c864d09e26ea2f7 Author: Derrick Williams <derric...@google.com> AuthorDate: Wed Aug 27 16:20:40 2025 -0400 [YAML]: add import jinja pipeline example (#35945) * add import jinja pipeline example * revert name change * update overall examples readme * fix lint issue * fix gemini small issue * Update sdks/python/apache_beam/yaml/examples/transforms/jinja/import/README.md --------- Co-authored-by: tvalentyn <tvalen...@users.noreply.github.com> --- sdks/python/apache_beam/yaml/examples/README.md | 4 ++ .../yaml/examples/testing/examples_test.py | 15 ++--- .../yaml/examples/testing/input_data.py | 36 ++++++----- .../examples/transforms/jinja/import/README.md | 63 ++++++++++++++++++++ .../jinja/import/macros/wordCountMacros.yaml | 64 ++++++++++++++++++++ .../transforms/jinja/import/wordCountImport.yaml | 69 ++++++++++++++++++++++ 6 files changed, 230 insertions(+), 21 deletions(-) diff --git a/sdks/python/apache_beam/yaml/examples/README.md b/sdks/python/apache_beam/yaml/examples/README.md index 75098d212cd..b053e3e6236 100644 --- a/sdks/python/apache_beam/yaml/examples/README.md +++ b/sdks/python/apache_beam/yaml/examples/README.md @@ -245,6 +245,10 @@ by leveraging Jinja templating engine for dynamic pipeline generation based on inputs from the user through `% include`, `% import`, and inheritance directives. +Jinja `% import` directive: +- [wordCountImport.yaml](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/examples/transforms/jinja/import/wordCountImport.yaml) +- [Instructions](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/examples/transforms/jinja/import/README.md) on how to run the pipeline. + Jinja `% include` directive: - [wordCountInclude.yaml](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/wordCountInclude.yaml) - [Instructions](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/README.md) on how to run the pipeline. diff --git a/sdks/python/apache_beam/yaml/examples/testing/examples_test.py b/sdks/python/apache_beam/yaml/examples/testing/examples_test.py index 0bfcb3f6161..80e82945523 100644 --- a/sdks/python/apache_beam/yaml/examples/testing/examples_test.py +++ b/sdks/python/apache_beam/yaml/examples/testing/examples_test.py @@ -353,7 +353,8 @@ def create_test_method( ] if jinja_preprocessor: jinja_preprocessor = jinja_preprocessor[0] - raw_spec_string = jinja_preprocessor(raw_spec_string) + raw_spec_string = jinja_preprocessor( + raw_spec_string, self._testMethodName) custom_preprocessors.remove(jinja_preprocessor) pipeline_spec = yaml.load( @@ -563,7 +564,7 @@ def _wordcount_minimal_test_preprocessor( @YamlExamplesTestSuite.register_test_preprocessor( - ['test_wordCountInclude_yaml']) + ['test_wordCountInclude_yaml', 'test_wordCountImport_yaml']) def _wordcount_jinja_test_preprocessor( test_spec: dict, expected: List[str], env: TestEnvironment): """ @@ -676,7 +677,8 @@ def _kafka_test_preprocessor( 'test_iceberg_migration_yaml', 'test_ml_preprocessing_yaml', 'test_anomaly_scoring_yaml', - 'test_wordCountInclude_yaml' + 'test_wordCountInclude_yaml', + 'test_wordCountImport_yaml' ]) def _io_write_test_preprocessor( test_spec: dict, expected: List[str], env: TestEnvironment): @@ -1253,8 +1255,8 @@ def _batch_log_analysis_test_preprocessor( @YamlExamplesTestSuite.register_test_preprocessor( - ['test_wordCountInclude_yaml']) -def _jinja_preprocessor(raw_spec_string: str): + ['test_wordCountInclude_yaml', 'test_wordCountImport_yaml']) +def _jinja_preprocessor(raw_spec_string: str, test_name: str): """ Preprocessor for Jinja-based YAML tests. @@ -1274,12 +1276,11 @@ def _jinja_preprocessor(raw_spec_string: str): Returns: A string containing the fully rendered YAML pipeline specification. """ - jinja_variables = json.loads(input_data.word_count_jinja_parameter_data()) test_file_dir = os.path.dirname(__file__) sdk_root = os.path.abspath(os.path.join(test_file_dir, '../../../..')) - include_files = input_data.word_count_jinja_template_data() + include_files = input_data.word_count_jinja_template_data(test_name) mock_templates = {'main_template': raw_spec_string} for file_path in include_files: full_path = os.path.join(sdk_root, file_path) diff --git a/sdks/python/apache_beam/yaml/examples/testing/input_data.py b/sdks/python/apache_beam/yaml/examples/testing/input_data.py index 50d40224f82..fb468567355 100644 --- a/sdks/python/apache_beam/yaml/examples/testing/input_data.py +++ b/sdks/python/apache_beam/yaml/examples/testing/input_data.py @@ -65,20 +65,28 @@ def word_count_jinja_parameter_data(): return json.dumps(params) -def word_count_jinja_template_data(): - return \ -[('apache_beam/yaml/examples/transforms/jinja/' - 'include/submodules/readFromTextTransform.yaml'), - ('apache_beam/yaml/examples/transforms/jinja/' - 'include/submodules/mapToFieldsSplitConfig.yaml'), - ('apache_beam/yaml/examples/transforms/jinja/' - 'include/submodules/explodeTransform.yaml'), - ('apache_beam/yaml/examples/transforms/jinja/' - 'include/submodules/combineTransform.yaml'), - ('apache_beam/yaml/examples/transforms/jinja/' - 'include/submodules/mapToFieldsCountConfig.yaml'), - ('apache_beam/yaml/examples/transforms/jinja/' - 'include/submodules/writeToTextTransform.yaml')] +def word_count_jinja_template_data(test_name: str) -> list[str]: + if test_name == 'test_wordCountInclude_yaml': + return [ + 'apache_beam/yaml/examples/transforms/jinja/' + 'include/submodules/readFromTextTransform.yaml', + 'apache_beam/yaml/examples/transforms/jinja/' + 'include/submodules/mapToFieldsSplitConfig.yaml', + 'apache_beam/yaml/examples/transforms/jinja/' + 'include/submodules/explodeTransform.yaml', + 'apache_beam/yaml/examples/transforms/jinja/' + 'include/submodules/combineTransform.yaml', + 'apache_beam/yaml/examples/transforms/jinja/' + 'include/submodules/mapToFieldsCountConfig.yaml', + 'apache_beam/yaml/examples/transforms/jinja/' + 'include/submodules/writeToTextTransform.yaml' + ] + elif test_name == 'test_wordCountImport_yaml': + return [ + 'apache_beam/yaml/examples/transforms/jinja/' + 'import/macros/wordCountMacros.yaml' + ] + return [] def iceberg_dynamic_destinations_users_data(): diff --git a/sdks/python/apache_beam/yaml/examples/transforms/jinja/import/README.md b/sdks/python/apache_beam/yaml/examples/transforms/jinja/import/README.md new file mode 100644 index 00000000000..14052cd3a6c --- /dev/null +++ b/sdks/python/apache_beam/yaml/examples/transforms/jinja/import/README.md @@ -0,0 +1,63 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +## Jinja % import Pipeline + +This example leverages the `% import` Jinja directive by having one main +pipeline and then one macros file containing all the transforms and configs +used. + +General setup: +```sh +export PIPELINE_FILE=apache_beam/yaml/examples/transforms/jinja/import/wordCountImport.yaml +export KINGLEAR="gs://dataflow-samples/shakespeare/kinglear.txt" +export TEMP_LOCATION="gs://MY-BUCKET/wordCounts/" + +cd <PATH_TO_BEAM_REPO>/beam/sdks/python +``` + +Multiline Run Example: +```sh +python -m apache_beam.yaml.main \ + --yaml_pipeline_file="${PIPELINE_FILE}" \ + --jinja_variables='{ + "readFromTextTransform": {"path": "'"${KINGLEAR}"'"}, + "mapToFieldsSplitConfig": { + "language": "python", + "fields": { + "value": "1" + } + }, + "explodeTransform": {"fields": "word"}, + "combineTransform": { + "group_by": "word", + "combine": {"value": "sum"} + }, + "mapToFieldsCountConfig": { + "language": "python", + "fields": {"output": "word + \" - \" + str(value)"} + }, + "writeToTextTransform": {"path": "'"${TEMP_LOCATION}"'"} + }' +``` + +Single Line Run Example: +```sh +python -m apache_beam.yaml.main --yaml_pipeline_file="${PIPELINE_FILE}" --jinja_variables='{"readFromTextTransform": {"path": "gs://dataflow-samples/shakespeare/kinglear.txt"}, "mapToFieldsSplitConfig": {"language": "python", "fields":{"value":"1"}}, "explodeTransform":{"fields":"word"}, "combineTransform":{"group_by":"word", "combine":{"value":"sum"}}, "mapToFieldsCountConfig":{"language": "python", "fields":{"output":"word + \" - \" + str(value)"}}, "writeToTextTransform":{"path":"${TE [...] +``` diff --git a/sdks/python/apache_beam/yaml/examples/transforms/jinja/import/macros/wordCountMacros.yaml b/sdks/python/apache_beam/yaml/examples/transforms/jinja/import/macros/wordCountMacros.yaml new file mode 100644 index 00000000000..b3870693ef5 --- /dev/null +++ b/sdks/python/apache_beam/yaml/examples/transforms/jinja/import/macros/wordCountMacros.yaml @@ -0,0 +1,64 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +{%- macro readFromTextTransform(params) -%} + +- name: Read from GCS + type: ReadFromText + config: + path: "{{ params.path }}" +{%- endmacro -%} + +{%- macro mapToFieldsSplitConfig(params) -%} +language: "{{ params.language }}" +fields: + value: "{{ params.fields.value }}" + word: + callable: |- + import re + def my_mapping(row): + return re.findall(r'[A-Za-z\']+', row.line.lower()) +{%- endmacro -%} + +{%- macro explodeTransform(params) -%} +- name: Explode word arrays + type: Explode + config: + fields: "{{ params.fields }}" +{%- endmacro -%} + +{%- macro combineTransform(params) -%} +- name: Count words + type: Combine + config: + group_by: "{{ params.group_by }}" + combine: + value: "{{ params.combine.value }}" +{%- endmacro -%} + +{%- macro mapToFieldsCountConfig(params) -%} +language: "{{ params.language }}" +fields: + output: '{{ params.fields.output }}' +{%- endmacro -%} + +{%- macro writeToTextTransform(params) -%} +- name: Write to GCS + type: WriteToText + config: + path: "{{ params.path }}" +{%- endmacro -%} diff --git a/sdks/python/apache_beam/yaml/examples/transforms/jinja/import/wordCountImport.yaml b/sdks/python/apache_beam/yaml/examples/transforms/jinja/import/wordCountImport.yaml new file mode 100644 index 00000000000..1058a30b607 --- /dev/null +++ b/sdks/python/apache_beam/yaml/examples/transforms/jinja/import/wordCountImport.yaml @@ -0,0 +1,69 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# This examples reads from a public file stored on Google Cloud. This +# requires authenticating with Google Cloud, or setting the file in +#`ReadFromText` to a local file. +# +# To set up Application Default Credentials, +# see https://cloud.google.com/docs/authentication/external/set-up-adc. +# +# This pipeline reads in a text file, counts distinct words found in the text, +# then logs a row containing each word and its count. + +{% import 'apache_beam/yaml/examples/transforms/jinja/import/macros/wordCountMacros.yaml' as macros %} + +pipeline: + type: chain + transforms: + +# Read in text file +{{ macros.readFromTextTransform(readFromTextTransform) | indent(4, true) }} + +# Split words and count occurrences + - name: Split words + type: MapToFields + config: +{{ macros.mapToFieldsSplitConfig(mapToFieldsSplitConfig) | indent(8, true) }} + +# Explode into individual words +{{ macros.explodeTransform(explodeTransform) | indent(4, true) }} + +# Group by word +{{ macros.combineTransform(combineTransform) | indent(4, true) }} + +# Format output to a single string consisting of `word - count` + - name: Format output + type: MapToFields + config: +{{ macros.mapToFieldsCountConfig(mapToFieldsCountConfig) | indent(8, true) }} + +# Write to text file on GCS, locally, etc +{{ macros.writeToTextTransform(writeToTextTransform) | indent(4, true) }} + +# Expected: +# Row(output='king - 311') +# Row(output='lear - 253') +# Row(output='dramatis - 1') +# Row(output='personae - 1') +# Row(output='of - 483') +# Row(output='britain - 2') +# Row(output='france - 32') +# Row(output='duke - 26') +# Row(output='burgundy - 20') +# Row(output='cornwall - 75')