This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 90f8c50c612 [YAML] - Jinja % include example (#35914)
90f8c50c612 is described below
commit 90f8c50c612ff20c8bda5778eecf888c3be3266c
Author: Derrick Williams <[email protected]>
AuthorDate: Fri Aug 22 11:34:07 2025 -0400
[YAML] - Jinja % include example (#35914)
* update readme for jinja
* fix conflicts
* add jinja data
* add jinja submodules
* fix lint, whitespace, and save file issues
* fix rebase conflict and exception etc
* update readme
* move around files
* add new readme
* move more files around
* fix whitespace
* fix additional errors after shifting folders
* address comments
* fix jinja processor
* fix readme per new names for submodules
* add todo ids
* add comments
* minor gemini fixes
---
sdks/python/apache_beam/yaml/examples/README.md | 22 +++-
.../yaml/examples/testing/examples_test.py | 134 ++++++++++++++++++++-
.../yaml/examples/testing/input_data.py | 48 ++++++++
.../examples/transforms/jinja/include/README.md | 63 ++++++++++
.../jinja/include/submodules/combineTransform.yaml | 27 +++++
.../jinja/include/submodules/explodeTransform.yaml | 26 ++++
.../include/submodules/mapToFieldsCountConfig.yaml | 24 ++++
.../include/submodules/mapToFieldsSplitConfig.yaml | 33 +++++
.../include/submodules/readFromTextTransform.yaml | 26 ++++
.../include/submodules/writeToTextTransform.yaml | 27 +++++
.../transforms/jinja/include/wordCountInclude.yaml | 66 ++++++++++
11 files changed, 490 insertions(+), 6 deletions(-)
diff --git a/sdks/python/apache_beam/yaml/examples/README.md
b/sdks/python/apache_beam/yaml/examples/README.md
index c788b6d3e60..4cba973dbea 100644
--- a/sdks/python/apache_beam/yaml/examples/README.md
+++ b/sdks/python/apache_beam/yaml/examples/README.md
@@ -37,7 +37,7 @@
Build this jar for running with the run command in the next stage:
```
-cd <path_to_beam_repo>/beam; ./gradlew
sdks:java:io:google-cloud-platform:expansion-service:shadowJar
+cd <PATH_TO_BEAM_REPO>/beam; ./gradlew
sdks:java:io:google-cloud-platform:expansion-service:shadowJar
```
## Example Run
@@ -70,6 +70,10 @@ pytest -v testing/
or
+pytest -v testing/examples_test.py::JinjaTest
+
+or
+
python -m unittest -v testing/examples_test.py
```
@@ -229,6 +233,22 @@ gcloud dataflow yaml run $JOB_NAME \
--region $REGION
```
+### Jinja
+
+Jinja
[templatization](https://beam.apache.org/documentation/sdks/yaml/#jinja-templatization)
+can be used to build off of different contexts and/or with different
+configurations.
+
+Several examples will be created based on the already used word count example
+by leveraging Jinja templating engine for dynamic pipeline generation based on
+inputs from the user through `% include`, `% import`, and inheritance
+directives.
+
+Jinja `% include` directive:
+-
[wordCountInclude.yaml](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/wordCountInclude.yaml)
+-
[Instructions](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/README.md)
on how to run the pipeline.
+
+
### ML
Examples that include the built-in `Enrichment` transform for performing
diff --git a/sdks/python/apache_beam/yaml/examples/testing/examples_test.py
b/sdks/python/apache_beam/yaml/examples/testing/examples_test.py
index d75cdd99431..0bfcb3f6161 100644
--- a/sdks/python/apache_beam/yaml/examples/testing/examples_test.py
+++ b/sdks/python/apache_beam/yaml/examples/testing/examples_test.py
@@ -21,6 +21,7 @@ import json
import logging
import os
import random
+import re
import sys
import unittest
from typing import Any
@@ -33,6 +34,9 @@ from unittest import mock
import pytest
import yaml
+from jinja2 import DictLoader
+from jinja2 import Environment
+from jinja2 import StrictUndefined
import apache_beam as beam
from apache_beam import PCollection
@@ -339,8 +343,21 @@ def create_test_method(
for i, line in enumerate(expected):
expected[i] = line.replace('# ', '').replace('\n', '')
expected = [line for line in expected if line]
+
+ raw_spec_string = ''.join(lines)
+ # Filter for any jinja preprocessor - this has to be done before other
+ # preprocessors.
+ jinja_preprocessor = [
+ preprocessor for preprocessor in custom_preprocessors
+ if 'jinja_preprocessor' in preprocessor.__name__
+ ]
+ if jinja_preprocessor:
+ jinja_preprocessor = jinja_preprocessor[0]
+ raw_spec_string = jinja_preprocessor(raw_spec_string)
+ custom_preprocessors.remove(jinja_preprocessor)
+
pipeline_spec = yaml.load(
- ''.join(lines), Loader=yaml_transform.SafeLineLoader)
+ raw_spec_string, Loader=yaml_transform.SafeLineLoader)
with TestEnvironment() as env:
for fn in custom_preprocessors:
@@ -513,8 +530,9 @@ class YamlExamplesTestSuite:
return apply
[email protected]_test_preprocessor('test_wordcount_minimal_yaml')
-def _wordcount_test_preprocessor(
[email protected]_test_preprocessor(
+ ['test_wordcount_minimal_yaml'])
+def _wordcount_minimal_test_preprocessor(
test_spec: dict, expected: List[str], env: TestEnvironment):
"""
Preprocessor for the wordcount_minimal.yaml test.
@@ -523,6 +541,8 @@ def _wordcount_test_preprocessor(
of the wordcount example. This allows the test to verify the pipeline's
correctness without relying on a fixed input file.
+ Based on this expected output: # Row(word='king', count=311)
+
Args:
test_spec: The dictionary representation of the YAML pipeline
specification.
expected: A list of strings representing the expected output of the
@@ -538,8 +558,64 @@ def _wordcount_test_preprocessor(
word = element.split('=')[1].split(',')[0].replace("'", '')
count = int(element.split('=')[2].replace(')', ''))
all_words += [word] * count
- random.shuffle(all_words)
+ return _wordcount_random_shuffler(test_spec, all_words, env)
+
+
[email protected]_test_preprocessor(
+ ['test_wordCountInclude_yaml'])
+def _wordcount_jinja_test_preprocessor(
+ test_spec: dict, expected: List[str], env: TestEnvironment):
+ """
+ Preprocessor for the wordcount Jinja tests.
+
+ This preprocessor generates a random input file based on the expected output
+ of the wordcount example. This allows the test to verify the pipeline's
+ correctness without relying on a fixed input file.
+
+ Based on this expected output: # Row(output='king - 311')
+
+ Args:
+ test_spec: The dictionary representation of the YAML pipeline
specification.
+ expected: A list of strings representing the expected output of the
+ pipeline.
+ env: The TestEnvironment object providing utilities for creating temporary
+ files.
+
+ Returns:
+ The modified test_spec dictionary with the input file path replaced.
+ """
+ all_words = []
+ for element in expected:
+ match = re.search(r"output='(.*) - (\d+)'", element)
+ if match:
+ word, count_str = match.groups()
+ all_words += [word] * int(count_str)
+ return _wordcount_random_shuffler(test_spec, all_words, env)
+
+
+def _wordcount_random_shuffler(
+ test_spec: dict, all_words: List[str], env: TestEnvironment):
+ """
+ Helper function to create a randomized input file for wordcount-style tests.
+
+ This function takes a list of words, shuffles them, and arranges them into
+ randomly sized lines. It then creates a temporary input file with this
+ content and updates the provided test specification to use this file as
+ the input for a 'ReadFromText' transform.
+
+ Args:
+ test_spec: The dictionary representation of the YAML pipeline
specification.
+ all_words: A list of strings, where each string is a word to be included
+ in the generated input file.
+ env: The TestEnvironment object providing utilities for creating temporary
+ files.
+
+ Returns:
+ The modified test_spec dictionary with the input file path for
+ 'ReadFromText' replaced with the path to the newly generated file.
+ """
+ random.shuffle(all_words)
lines = []
while all_words:
line_length = random.randint(1, min(10, len(all_words)))
@@ -599,7 +675,8 @@ def _kafka_test_preprocessor(
'test_streaming_sentiment_analysis_yaml',
'test_iceberg_migration_yaml',
'test_ml_preprocessing_yaml',
- 'test_anomaly_scoring_yaml'
+ 'test_anomaly_scoring_yaml',
+ 'test_wordCountInclude_yaml'
])
def _io_write_test_preprocessor(
test_spec: dict, expected: List[str], env: TestEnvironment):
@@ -1175,6 +1252,50 @@ def _batch_log_analysis_test_preprocessor(
return test_spec
[email protected]_test_preprocessor(
+ ['test_wordCountInclude_yaml'])
+def _jinja_preprocessor(raw_spec_string: str):
+ """
+ Preprocessor for Jinja-based YAML tests.
+
+ This function takes a raw YAML string, which is treated as a Jinja2
+ template, and renders it to produce the final pipeline specification.
+ It specifically handles templates that use the `{% include ... %}`
+ directive by manually loading the content of the included files from the
+ filesystem.
+
+ The Jinja variables required for rendering are loaded from a predefined
+ data source.
+
+ Args:
+ raw_spec_string: A string containing the raw YAML content, which is a
+ Jinja2 template.
+
+ Returns:
+ A string containing the fully rendered YAML pipeline specification.
+ """
+
+ jinja_variables = json.loads(input_data.word_count_jinja_parameter_data())
+ test_file_dir = os.path.dirname(__file__)
+ sdk_root = os.path.abspath(os.path.join(test_file_dir, '../../../..'))
+
+ include_files = input_data.word_count_jinja_template_data()
+ mock_templates = {'main_template': raw_spec_string}
+ for file_path in include_files:
+ full_path = os.path.join(sdk_root, file_path)
+ with open(full_path, 'r', encoding='utf-8') as f:
+ mock_templates[file_path] = f.read()
+
+ # Can't use the standard expand_jinja method due to it not supporting
+ # `% include` jinja templization.
+ # TODO(#35936): Maybe update expand_jinja to handle this case.
+ jinja_env = Environment(
+ loader=DictLoader(mock_templates), undefined=StrictUndefined)
+ template = jinja_env.get_template('main_template')
+ rendered_yaml_string = template.render(jinja_variables)
+ return rendered_yaml_string
+
+
INPUT_FILES = {
'products.csv': input_data.products_csv(),
'kinglear.txt': input_data.text_data(),
@@ -1216,6 +1337,9 @@ ElementWiseTest = YamlExamplesTestSuite(
os.path.join(YAML_DOCS_DIR, '../transforms/elementwise/*.yaml')).run()
ExamplesTest = YamlExamplesTestSuite(
'ExamplesTest', os.path.join(YAML_DOCS_DIR, '../*.yaml')).run()
+JinjaTest = YamlExamplesTestSuite(
+ 'JinjaExamplesTest',
+ os.path.join(YAML_DOCS_DIR, '../transforms/jinja/**/*.yaml')).run()
IOTest = YamlExamplesTestSuite(
'IOExamplesTest', os.path.join(YAML_DOCS_DIR,
'../transforms/io/*.yaml')).run()
diff --git a/sdks/python/apache_beam/yaml/examples/testing/input_data.py
b/sdks/python/apache_beam/yaml/examples/testing/input_data.py
index 27f210b4856..50d40224f82 100644
--- a/sdks/python/apache_beam/yaml/examples/testing/input_data.py
+++ b/sdks/python/apache_beam/yaml/examples/testing/input_data.py
@@ -16,6 +16,7 @@
# limitations under the License.
#
+import json
import typing
from apache_beam.io.gcp.pubsub import PubsubMessage
@@ -33,6 +34,53 @@ def text_data():
])
+def word_count_jinja_parameter_data():
+ params = {
+ "readFromTextTransform": {
+ "path": "gs://dataflow-samples/shakespeare/kinglear.txt"
+ },
+ "mapToFieldsSplitConfig": {
+ "language": "python", "fields": {
+ "value": "1"
+ }
+ },
+ "explodeTransform": {
+ "fields": "word"
+ },
+ "combineTransform": {
+ "group_by": "word", "combine": {
+ "value": "sum"
+ }
+ },
+ "mapToFieldsCountConfig": {
+ "language": "python",
+ "fields": {
+ "output": "word + \" - \" + str(value)"
+ }
+ },
+ "writeToTextTransform": {
+ "path": "gs://apache-beam-testing-derrickaw/wordCounts/"
+ }
+ }
+ return json.dumps(params)
+
+
+def word_count_jinja_template_data():
+ return \
+[('apache_beam/yaml/examples/transforms/jinja/'
+ 'include/submodules/readFromTextTransform.yaml'),
+ ('apache_beam/yaml/examples/transforms/jinja/'
+ 'include/submodules/mapToFieldsSplitConfig.yaml'),
+ ('apache_beam/yaml/examples/transforms/jinja/'
+ 'include/submodules/explodeTransform.yaml'),
+ ('apache_beam/yaml/examples/transforms/jinja/'
+ 'include/submodules/combineTransform.yaml'),
+ ('apache_beam/yaml/examples/transforms/jinja/'
+ 'include/submodules/mapToFieldsCountConfig.yaml'),
+ ('apache_beam/yaml/examples/transforms/jinja/'
+ 'include/submodules/writeToTextTransform.yaml')]
+
+
def iceberg_dynamic_destinations_users_data():
return [{
'id': 3, 'name': 'Smith', 'email': '[email protected]', 'zip': 'NY'
diff --git
a/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/README.md
b/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/README.md
new file mode 100644
index 00000000000..9b056e9906d
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/README.md
@@ -0,0 +1,63 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+## Jinja % include Pipeline
+
+This example leverages the `% include` Jinja directive by having one main
+pipeline and then submodules for each transformed used.
+
+General setup:
+```sh
+export
PIPELINE_FILE=apache_beam/yaml/examples/transforms/jinja/include/wordCountInclude.yaml
+export KINGLEAR="gs://dataflow-samples/shakespeare/kinglear.txt"
+export TEMP_LOCATION="gs://MY-BUCKET/wordCounts/"
+
+cd <PATH_TO_BEAM_REPO>/beam/sdks/python
+```
+
+Multiline Run Example:
+```sh
+python -m apache_beam.yaml.main \
+ --yaml_pipeline_file="${PIPELINE_FILE}" \
+ --jinja_variables='{
+ "readFromTextTransform": {"path": "'"${KINGLEAR}"'"},
+ "mapToFieldsSplitConfig": {
+ "language": "python",
+ "fields": {
+ "value": "1"
+ }
+ },
+ "explodeTransform": {"fields": "word"},
+ "combineTransform": {
+ "group_by": "word",
+ "combine": {"value": "sum"}
+ },
+ "mapToFieldsCountConfig": {
+ "language": "python",
+ "fields": {"output": "word + \" - \" + str(value)"}
+ },
+ "writeToTextTransform": {"path": "'"${TEMP_LOCATION}"'"}
+ }'
+```
+
+Single Line Run Example:
+```sh
+python -m apache_beam.yaml.main --yaml_pipeline_file="${PIPELINE_FILE}"
--jinja_variables='{"readFromTextTransform": {"path":
"gs://dataflow-samples/shakespeare/kinglear.txt"}, "mapToFieldsSplitConfig":
{"language": "python", "fields":{"value":"1"}},
"explodeTransform":{"fields":"word"}, "combineTransform":{"group_by":"word",
"combine":{"value":"sum"}}, "mapToFieldsCountConfig":{"language": "python",
"fields":{"output":"word + \" - \" + str(value)"}},
"writeToTextTransform":{"path":"${TE [...]
+```
+
diff --git
a/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/submodules/combineTransform.yaml
b/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/submodules/combineTransform.yaml
new file mode 100644
index 00000000000..bbf81355818
--- /dev/null
+++
b/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/submodules/combineTransform.yaml
@@ -0,0 +1,27 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+#
+# The Combine transform groups by each word and count the occurrences.
+
+ - name: Count words
+ type: Combine
+ config:
+ group_by:
+ - {{combineTransform.group_by}}
+ combine:
+ value: {{combineTransform.combine.value}}
diff --git
a/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/submodules/explodeTransform.yaml
b/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/submodules/explodeTransform.yaml
new file mode 100644
index 00000000000..d56649c5b9d
--- /dev/null
+++
b/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/submodules/explodeTransform.yaml
@@ -0,0 +1,26 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# The Explode transform to take arrays of words and emit each word as a
+# separate element.
+
+ - name: Explode word arrays
+ type: Explode
+ config:
+ fields:
+ - {{explodeTransform.fields}}
\ No newline at end of file
diff --git
a/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/submodules/mapToFieldsCountConfig.yaml
b/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/submodules/mapToFieldsCountConfig.yaml
new file mode 100644
index 00000000000..f1423895c41
--- /dev/null
+++
b/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/submodules/mapToFieldsCountConfig.yaml
@@ -0,0 +1,24 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# A generic MapToFields transform to format the word and count into a single
+# output string.
+
+ language: {{mapToFieldsCountConfig.language}}
+ fields:
+ output: {{mapToFieldsCountConfig.fields.output}}
diff --git
a/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/submodules/mapToFieldsSplitConfig.yaml
b/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/submodules/mapToFieldsSplitConfig.yaml
new file mode 100644
index 00000000000..16c41110b92
--- /dev/null
+++
b/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/submodules/mapToFieldsSplitConfig.yaml
@@ -0,0 +1,33 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# A MapToFields transform to map words to a word and a count of 1.
+
+ language: {{mapToFieldsSplitConfig.language}}
+ fields:
+ word:
+ callable: |-
+ # TODO(#35936): Including another file here works fine, but if
+ # the file has a license header or other irrevalent comments, it
+ # will break the pipeline. Need to investigate more on Jinja
+ # filtering in the expand_jinja method or some other way.
+ import re
+ def my_mapping(row):
+ return re.findall(r'[A-Za-z\']+', row.line.lower())
+ value: {{mapToFieldsSplitConfig.fields.value}}
\ No newline at end of file
diff --git
a/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/submodules/readFromTextTransform.yaml
b/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/submodules/readFromTextTransform.yaml
new file mode 100644
index 00000000000..96029c38068
--- /dev/null
+++
b/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/submodules/readFromTextTransform.yaml
@@ -0,0 +1,26 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This examples reads from a public file stored on Google Cloud. This
+# requires authenticating with Google Cloud, or setting the file in
+#`ReadFromText` to a local file.
+
+ - name: Read from GCS
+ type: ReadFromText
+ config:
+ path: {{readFromTextTransform.path}}
\ No newline at end of file
diff --git
a/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/submodules/writeToTextTransform.yaml
b/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/submodules/writeToTextTransform.yaml
new file mode 100644
index 00000000000..6cce90e7a48
--- /dev/null
+++
b/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/submodules/writeToTextTransform.yaml
@@ -0,0 +1,27 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# This examples writes to a public file stored on Google Cloud. This
+# requires authenticating with Google Cloud, or setting the file in
+#`WriteToText` to a local file.
+
+ - name: Write to GCS
+ type: WriteToText
+ config:
+ path: {{writeToTextTransform.path}}
\ No newline at end of file
diff --git
a/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/wordCountInclude.yaml
b/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/wordCountInclude.yaml
new file mode 100644
index 00000000000..a28ba688b2f
--- /dev/null
+++
b/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/wordCountInclude.yaml
@@ -0,0 +1,66 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This examples reads from a public file stored on Google Cloud. This
+# requires authenticating with Google Cloud, or setting the file in
+#`ReadFromText` to a local file.
+#
+# To set up Application Default Credentials,
+# see https://cloud.google.com/docs/authentication/external/set-up-adc.
+#
+# This pipeline reads in a text file, counts distinct words found in the text,
+# then logs a row containing each word and its count.
+
+pipeline:
+ type: chain
+ transforms:
+# Read in text file
+{% include
'apache_beam/yaml/examples/transforms/jinja/include/submodules/readFromTextTransform.yaml'
%}
+
+# Split words and count occurrences
+ - name: Split words
+ type: MapToFields
+ config:
+{% include
'apache_beam/yaml/examples/transforms/jinja/include/submodules/mapToFieldsSplitConfig.yaml'
%}
+
+# Explode into individual words
+{% include
'apache_beam/yaml/examples/transforms/jinja/include/submodules/explodeTransform.yaml'
%}
+
+# Group by word
+{% include
'apache_beam/yaml/examples/transforms/jinja/include/submodules/combineTransform.yaml'
%}
+
+# Format output to a single string consisting of `word - count`
+ - name: Format output
+ type: MapToFields
+ config:
+{% include
'apache_beam/yaml/examples/transforms/jinja/include/submodules/mapToFieldsCountConfig.yaml'
%}
+
+# Write to text file on GCS, locally, etc
+{% include
'apache_beam/yaml/examples/transforms/jinja/include/submodules/writeToTextTransform.yaml'
%}
+
+# Expected:
+# Row(output='king - 311')
+# Row(output='lear - 253')
+# Row(output='dramatis - 1')
+# Row(output='personae - 1')
+# Row(output='of - 483')
+# Row(output='britain - 2')
+# Row(output='france - 32')
+# Row(output='duke - 26')
+# Row(output='burgundy - 20')
+# Row(output='cornwall - 75')