svetakvsundhar commented on code in PR #25969:
URL: https://github.com/apache/beam/pull/25969#discussion_r1174127448
##########
examples/notebooks/get-started/learn_beam_transforms_by_doing.ipynb:
##########
@@ -0,0 +1,740 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 0,
+ "metadata": {
+ "colab": {
+ "provenance": []
+ },
+ "kernelspec": {
+ "name": "python3",
+ "display_name": "Python 3"
+ },
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "cells": [
+ {
+ "cell_type": "code",
+ "source": [
+ "#@title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License."
+ ],
+ "metadata": {
+ "cellView": "form",
+ "id": "QgmD1wbmT4mj"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Learn Beam PTransforms\n",
+ "\n",
+ "After this notebook, you should be able to:\n",
+ "1. Use user-defined functions in your `PTransforms`\n",
+ "2. Learn Beam SDK composite transforms\n",
+ "3. Create you own composite transforms to simplify your `Pipeline`\n",
+ "\n",
+ "For basic Beam `PTransforms`, please check out [this
Notebook](https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/get-started/learn_beam_basics_by_doing.ipynb).\n",
+ "\n",
+ "Beam Python SDK also provides [a list of built-in
transforms](https://beam.apache.org/documentation/transforms/python/overview/).\n"
+ ],
+ "metadata": {
+ "id": "RuUHlGZjVt6W"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## How To Approach This Tutorial\n",
+ "\n",
+ "This tutorial was designed for someone who likes to learn by doing.
There will be code cells where you can write your own code to test your
understanding.\n",
Review Comment:
```suggestion
"This tutorial is designed for someone who likes to learn by doing.
There will be code cells where you can write your own code to test your
understanding.\n",
```
##########
examples/notebooks/get-started/learn_beam_transforms_by_doing.ipynb:
##########
@@ -0,0 +1,740 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 0,
+ "metadata": {
+ "colab": {
+ "provenance": []
+ },
+ "kernelspec": {
+ "name": "python3",
+ "display_name": "Python 3"
+ },
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "cells": [
+ {
+ "cell_type": "code",
+ "source": [
+ "#@title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License."
+ ],
+ "metadata": {
+ "cellView": "form",
+ "id": "QgmD1wbmT4mj"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Learn Beam PTransforms\n",
+ "\n",
+ "After this notebook, you should be able to:\n",
+ "1. Use user-defined functions in your `PTransforms`\n",
+ "2. Learn Beam SDK composite transforms\n",
+ "3. Create you own composite transforms to simplify your `Pipeline`\n",
+ "\n",
+ "For basic Beam `PTransforms`, please check out [this
Notebook](https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/get-started/learn_beam_basics_by_doing.ipynb).\n",
+ "\n",
+ "Beam Python SDK also provides [a list of built-in
transforms](https://beam.apache.org/documentation/transforms/python/overview/).\n"
+ ],
+ "metadata": {
+ "id": "RuUHlGZjVt6W"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## How To Approach This Tutorial\n",
+ "\n",
+ "This tutorial was designed for someone who likes to learn by doing.
There will be code cells where you can write your own code to test your
understanding.\n",
+ "\n",
+ "As such, to get the most out of this tutorial, we strongly recommend
typing code by hand as you’re working through the tutorial and not using
copy/paste. This will help you develop muscle memory and a stronger
understanding."
+ ],
+ "metadata": {
+ "id": "Ldx0Z7nWGopE"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "To begin, run the cell below to install and import Apache Beam."
+ ],
+ "metadata": {
+ "id": "jy1zaj4NDE0T"
+ }
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "pNure-fW8hl3"
+ },
+ "outputs": [],
+ "source": [
+ "# Run a shell command and import beam\n",
+ "!pip install --quiet apache-beam\n",
+ "import apache_beam as beam\n",
+ "beam.__version__"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# Set the logging level to reduce verbose information\n",
+ "import logging\n",
+ "\n",
+ "logging.root.setLevel(logging.ERROR)"
+ ],
+ "metadata": {
+ "id": "vyksB2VMtv3m"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "\n",
+ "\n",
+ "---\n",
+ "\n",
+ "\n",
+ "\n",
+ "---\n",
+ "\n"
+ ],
+ "metadata": {
+ "id": "M1ku4nX_Gutb"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## 1. Simple User-Defined Function (UDF)\n",
+ "\n",
+ "Some `PTransforms` allow you to run your own functions and
user-defined code to specify how your transform is applied. For example, the
below `CombineGlobally` transform,"
+ ],
+ "metadata": {
+ "id": "0qDeT34SS1_8"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "pc = [1, 10, 100, 1000]\n",
+ "\n",
+ "# User-defined function\n",
+ "def bounded_sum(values, bound=500):\n",
+ " return min(sum(values), bound)\n",
+ "\n",
+ "small_sum = pc | beam.CombineGlobally(bounded_sum) # [500]\n",
+ "large_sum = pc | beam.CombineGlobally(bounded_sum, bound=5000) #
[1111]\n",
+ "\n",
+ "print(small_sum, large_sum)"
+ ],
+ "metadata": {
+ "id": "UZTWBGZ0TQWF"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## 2. Transforms: ParDo and Combine\n",
+ "\n",
+ "A `ParDo` transform considers each element in the input
`PCollection`, performs your user code to process each element, and emits zero,
one, or multiple elements to an output `PCollection`. `Combine` is another Beam
transform for combining collections of elements or values in your data.\n",
+ "Both allow flexible UDF to define how you process the data."
+ ],
+ "metadata": {
+ "id": "UBFRcPO06xiV"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### 2.1 DoFn\n",
+ "\n",
+ "DoFn - a Beam Python class that defines a distributed processing
function (used in
[ParDo](https://beam.apache.org/documentation/programming-guide/#pardo))"
+ ],
+ "metadata": {
+ "id": "P4W-1HIiV-HP"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "data = [1, 2, 3, 4]\n",
+ "\n",
+ "# create a DoFn to multiply each element by five\n",
+ "# you can define the procesing code under `process`\n",
Review Comment:
```suggestion
"# you can define the processing code under `process`\n",
```
##########
examples/notebooks/get-started/learn_beam_transforms_by_doing.ipynb:
##########
@@ -0,0 +1,740 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 0,
+ "metadata": {
+ "colab": {
+ "provenance": []
+ },
+ "kernelspec": {
+ "name": "python3",
+ "display_name": "Python 3"
+ },
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "cells": [
+ {
+ "cell_type": "code",
+ "source": [
+ "#@title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License."
+ ],
+ "metadata": {
+ "cellView": "form",
+ "id": "QgmD1wbmT4mj"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Learn Beam PTransforms\n",
+ "\n",
+ "After this notebook, you should be able to:\n",
+ "1. Use user-defined functions in your `PTransforms`\n",
+ "2. Learn Beam SDK composite transforms\n",
+ "3. Create you own composite transforms to simplify your `Pipeline`\n",
+ "\n",
+ "For basic Beam `PTransforms`, please check out [this
Notebook](https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/get-started/learn_beam_basics_by_doing.ipynb).\n",
+ "\n",
+ "Beam Python SDK also provides [a list of built-in
transforms](https://beam.apache.org/documentation/transforms/python/overview/).\n"
+ ],
+ "metadata": {
+ "id": "RuUHlGZjVt6W"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## How To Approach This Tutorial\n",
+ "\n",
+ "This tutorial was designed for someone who likes to learn by doing.
There will be code cells where you can write your own code to test your
understanding.\n",
+ "\n",
+ "As such, to get the most out of this tutorial, we strongly recommend
typing code by hand as you’re working through the tutorial and not using
copy/paste. This will help you develop muscle memory and a stronger
understanding."
+ ],
+ "metadata": {
+ "id": "Ldx0Z7nWGopE"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "To begin, run the cell below to install and import Apache Beam."
+ ],
+ "metadata": {
+ "id": "jy1zaj4NDE0T"
+ }
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "pNure-fW8hl3"
+ },
+ "outputs": [],
+ "source": [
+ "# Run a shell command and import beam\n",
+ "!pip install --quiet apache-beam\n",
+ "import apache_beam as beam\n",
+ "beam.__version__"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# Set the logging level to reduce verbose information\n",
+ "import logging\n",
+ "\n",
+ "logging.root.setLevel(logging.ERROR)"
+ ],
+ "metadata": {
+ "id": "vyksB2VMtv3m"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "\n",
+ "\n",
+ "---\n",
+ "\n",
+ "\n",
+ "\n",
+ "---\n",
+ "\n"
+ ],
+ "metadata": {
+ "id": "M1ku4nX_Gutb"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## 1. Simple User-Defined Function (UDF)\n",
+ "\n",
+ "Some `PTransforms` allow you to run your own functions and
user-defined code to specify how your transform is applied. For example, the
below `CombineGlobally` transform,"
+ ],
+ "metadata": {
+ "id": "0qDeT34SS1_8"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "pc = [1, 10, 100, 1000]\n",
+ "\n",
+ "# User-defined function\n",
+ "def bounded_sum(values, bound=500):\n",
+ " return min(sum(values), bound)\n",
+ "\n",
+ "small_sum = pc | beam.CombineGlobally(bounded_sum) # [500]\n",
+ "large_sum = pc | beam.CombineGlobally(bounded_sum, bound=5000) #
[1111]\n",
+ "\n",
+ "print(small_sum, large_sum)"
+ ],
+ "metadata": {
+ "id": "UZTWBGZ0TQWF"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## 2. Transforms: ParDo and Combine\n",
+ "\n",
+ "A `ParDo` transform considers each element in the input
`PCollection`, performs your user code to process each element, and emits zero,
one, or multiple elements to an output `PCollection`. `Combine` is another Beam
transform for combining collections of elements or values in your data.\n",
+ "Both allow flexible UDF to define how you process the data."
+ ],
+ "metadata": {
+ "id": "UBFRcPO06xiV"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### 2.1 DoFn\n",
+ "\n",
+ "DoFn - a Beam Python class that defines a distributed processing
function (used in
[ParDo](https://beam.apache.org/documentation/programming-guide/#pardo))"
+ ],
+ "metadata": {
+ "id": "P4W-1HIiV-HP"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "data = [1, 2, 3, 4]\n",
+ "\n",
+ "# create a DoFn to multiply each element by five\n",
+ "# you can define the procesing code under `process`\n",
Review Comment:
We should make clear that having a process function is a requirement for a
DoFn.
##########
examples/notebooks/get-started/learn_beam_transforms_by_doing.ipynb:
##########
@@ -0,0 +1,740 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 0,
+ "metadata": {
+ "colab": {
+ "provenance": []
+ },
+ "kernelspec": {
+ "name": "python3",
+ "display_name": "Python 3"
+ },
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "cells": [
+ {
+ "cell_type": "code",
+ "source": [
+ "#@title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License."
+ ],
+ "metadata": {
+ "cellView": "form",
+ "id": "QgmD1wbmT4mj"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Learn Beam PTransforms\n",
+ "\n",
+ "After this notebook, you should be able to:\n",
+ "1. Use user-defined functions in your `PTransforms`\n",
+ "2. Learn Beam SDK composite transforms\n",
+ "3. Create you own composite transforms to simplify your `Pipeline`\n",
+ "\n",
+ "For basic Beam `PTransforms`, please check out [this
Notebook](https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/get-started/learn_beam_basics_by_doing.ipynb).\n",
+ "\n",
+ "Beam Python SDK also provides [a list of built-in
transforms](https://beam.apache.org/documentation/transforms/python/overview/).\n"
+ ],
+ "metadata": {
+ "id": "RuUHlGZjVt6W"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## How To Approach This Tutorial\n",
+ "\n",
+ "This tutorial was designed for someone who likes to learn by doing.
There will be code cells where you can write your own code to test your
understanding.\n",
+ "\n",
+ "As such, to get the most out of this tutorial, we strongly recommend
typing code by hand as you’re working through the tutorial and not using
copy/paste. This will help you develop muscle memory and a stronger
understanding."
+ ],
+ "metadata": {
+ "id": "Ldx0Z7nWGopE"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "To begin, run the cell below to install and import Apache Beam."
+ ],
+ "metadata": {
+ "id": "jy1zaj4NDE0T"
+ }
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "pNure-fW8hl3"
+ },
+ "outputs": [],
+ "source": [
+ "# Run a shell command and import beam\n",
+ "!pip install --quiet apache-beam\n",
+ "import apache_beam as beam\n",
+ "beam.__version__"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# Set the logging level to reduce verbose information\n",
+ "import logging\n",
+ "\n",
+ "logging.root.setLevel(logging.ERROR)"
+ ],
+ "metadata": {
+ "id": "vyksB2VMtv3m"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "\n",
+ "\n",
+ "---\n",
+ "\n",
+ "\n",
+ "\n",
+ "---\n",
+ "\n"
+ ],
+ "metadata": {
+ "id": "M1ku4nX_Gutb"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## 1. Simple User-Defined Function (UDF)\n",
+ "\n",
+ "Some `PTransforms` allow you to run your own functions and
user-defined code to specify how your transform is applied. For example, the
below `CombineGlobally` transform,"
+ ],
+ "metadata": {
+ "id": "0qDeT34SS1_8"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "pc = [1, 10, 100, 1000]\n",
+ "\n",
+ "# User-defined function\n",
+ "def bounded_sum(values, bound=500):\n",
+ " return min(sum(values), bound)\n",
+ "\n",
+ "small_sum = pc | beam.CombineGlobally(bounded_sum) # [500]\n",
+ "large_sum = pc | beam.CombineGlobally(bounded_sum, bound=5000) #
[1111]\n",
+ "\n",
+ "print(small_sum, large_sum)"
+ ],
+ "metadata": {
+ "id": "UZTWBGZ0TQWF"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## 2. Transforms: ParDo and Combine\n",
+ "\n",
+ "A `ParDo` transform considers each element in the input
`PCollection`, performs your user code to process each element, and emits zero,
one, or multiple elements to an output `PCollection`. `Combine` is another Beam
transform for combining collections of elements or values in your data.\n",
+ "Both allow flexible UDF to define how you process the data."
+ ],
+ "metadata": {
+ "id": "UBFRcPO06xiV"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### 2.1 DoFn\n",
+ "\n",
+ "DoFn - a Beam Python class that defines a distributed processing
function (used in
[ParDo](https://beam.apache.org/documentation/programming-guide/#pardo))"
+ ],
+ "metadata": {
+ "id": "P4W-1HIiV-HP"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "data = [1, 2, 3, 4]\n",
+ "\n",
+ "# create a DoFn to multiply each element by five\n",
+ "# you can define the procesing code under `process`\n",
+ "class MultiplyByFive(beam.DoFn):\n",
+ " def process(self, element):\n",
+ " return [element*5]\n",
+ "\n",
+ "with beam.Pipeline() as pipeline:\n",
+ " outputs = (\n",
+ " pipeline\n",
+ " | 'Create values' >> beam.Create(data)\n",
+ " | 'Multiply by 5' >> beam.ParDo(MultiplyByFive())\n",
+ " )\n",
+ "\n",
+ " outputs | beam.Map(print)"
+ ],
+ "metadata": {
+ "id": "TjOzWnQd-dan"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### 2.2 CombineFn\n",
+ "\n",
+ "CombineFn - define associative and commutative aggregations (used in
[Combine](https://beam.apache.org/documentation/programming-guide/#combine))"
+ ],
+ "metadata": {
+ "id": "1qL2crwNXQXe"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "data = [1, 2, 3, 4]\n",
+ "\n",
+ "# create a CombineFn to get the product of each element\n",
+ "# you need to provide four opeations\n",
+ "class ProductFn(beam.CombineFn):\n",
+ " def create_accumulator(self):\n",
+ " # creates a new accumulator to store the initial value\n",
+ " return 1\n",
+ "\n",
+ " def add_input(self, current_prod, input):\n",
+ " # adds an input element to an accumulator\n",
+ " return current_prod*input\n",
+ "\n",
+ " def merge_accumulators(self, accumulators):\n",
+ " # merge several accumulators into a single accumulator\n",
+ " prod = 1\n",
+ " for accu in accumulators:\n",
+ " prod *= accu\n",
+ " return prod\n",
+ "\n",
+ " def extract_output(self, prod):\n",
+ " # performs the final computation\n",
+ " return prod\n",
+ "\n",
+ "with beam.Pipeline() as pipeline:\n",
+ " outputs = (\n",
+ " pipeline\n",
+ " | 'Create values' >> beam.Create(data)\n",
+ " | 'Multiply by 2' >> beam.CombineGlobally(ProductFn())\n",
+ " )\n",
+ " outputs | beam.LogElements()\n"
+ ],
+ "metadata": {
+ "id": "zxLatbOa9FyA"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "Note: The above `DoFn` and `CombineFn` examples are for demonstration
purposes. You could easily achieve the same functionality by using the simple
function illustrated in section 1."
+ ],
+ "metadata": {
+ "id": "r1Vw1d5vJoIE"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "\n",
+ "\n",
+ "---\n",
+ "\n"
+ ],
+ "metadata": {
+ "id": "bTer_URwS0wb"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## 3. Composite Transforms\n",
+ "\n",
+ "Now that you've learned the basic `PTransforms`, Beam allows you to
simplify the process of processing and transforming your data through
[Composite
Transforms](https://beam.apache.org/documentation/programming-guide/#composite-transforms).\n",
+ "\n",
+ "Composite transforms can nest multiple transforms into a single
composite transform, making your code easier to understand."
+ ],
+ "metadata": {
+ "id": "qPnSfU5wLTN5"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "To see an example of this, let's take a look at how we can improve
the `Pipeline` we built to count each word in Shakespeare's *King Lear*.\n",
+ "\n",
+ "Below is that `Pipeline` we built in [WordCount
tutorial](https://colab.research.google.com/drive/1_EncqFT_SmwXp7wlRqEf39m9efyrmm9p?usp=sharing):"
+ ],
+ "metadata": {
+ "id": "4tBsLkeatNUU"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "!mkdir -p data\n",
+ "!gsutil cp gs://dataflow-samples/shakespeare/kinglear.txt data/"
+ ],
+ "metadata": {
+ "id": "Vokbrhhyto7H"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "import re\n",
+ "\n",
+ "# Function used to run and display the result\n",
+ "def run(cmd):\n",
+ " print('>> {}'.format(cmd))\n",
+ " !{cmd}\n",
+ " print('')\n",
+ "\n",
+ "inputs_pattern = 'data/*'\n",
+ "outputs_prefix = 'outputs/part'\n",
+ "\n",
+ "# Running locally in the DirectRunner.\n",
+ "with beam.Pipeline() as pipeline:\n",
+ " word_count = (\n",
+ " pipeline\n",
+ " | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)\n",
+ " | 'Find words' >> beam.FlatMap(lambda line:
re.findall(r\"[a-zA-Z']+\", line))\n",
+ " | 'Pair words with 1' >> beam.Map(lambda word: (word, 1))\n",
+ " | 'Group and sum' >> beam.CombinePerKey(sum)\n",
+ " | 'Write results' >> beam.io.WriteToText(outputs_prefix)\n",
+ " )\n",
+ "\n",
+ "# Sample the first 20 results, remember there are no ordering
guarantees.\n",
+ "run('head -n 20 {}-00000-of-*'.format(outputs_prefix))"
+ ],
+ "metadata": {
+ "id": "R-4uyn0Tttr2"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "Although the code above is a viable way to design your `Pipeline`,
you can see that we use multiple transforms to perform one process:\n",
+ "1. `FlatMap` is used to find words in each line\n",
+ "2. `Map` is used to create key-value pairs with each word where the
value is 1\n",
+ "3. `CombinePerKey` is used so that we can then group by each word and
count up the sums\n",
+ "\n",
+ "All of these `PTransforms` in essence is meant to count each word in
*King Lear*. You can simplify the process and combine these three transforms
into one by using composite transforms.\n",
+ "\n",
+ "There's two ways you can follow:\n",
+ "1. Using Beam SDK's built-in composite transforms\n",
+ "2. Creating your own composite transforms"
+ ],
+ "metadata": {
+ "id": "wl8wLwZZtbnX"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### 3.1 Beam SDK Composite Transforms\n",
+ "Beam makes it easy for you with its Beam SDK which comes with a
package of many useful composite transforms. We will only cover one in this
tutorial but to see a list of transforms you can use, see the following API
reference page: [Pre-written Beam Transforms for
Python](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.html).\n"
+ ],
+ "metadata": {
+ "id": "nPG11AEkNMKK"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "By using a Beam SDK composite transform, you're able to easily
combine multiple transforms into one line.\n",
+ "\n",
+ "For this tutorial, we will use the SDK-provided [`Count`
transform](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.combiners.html#apache_beam.transforms.combiners.Count),
which counts each element in the `PCollection`.\n",
+ "\n",
+ "\n",
+ "```\n",
+ "beam.combiners.Count.PerElement()\n",
+ "```\n",
+ "\n"
+ ],
+ "metadata": {
+ "id": "CG3lYxgqsXNk"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "This `Count` transform performs the work that both the `Map` and
`CombinePerKey` transforms from our Word Count `Pipeline` but do it in one
line.\n",
+ "\n",
+ "Edit the Word Count `Pipeline` below to use a composite transform by
implementing Beam's `Count` transform (see above). Applying a composite
transform is just like applying a `PTransform` to your `PCollection`.\n",
+ "\n",
+ "Below the code cell you will edit is a hidden answer code cell to
check your work. If you're stuck, try opening the hint first!"
+ ],
+ "metadata": {
+ "id": "JQvwxTOHjOnr"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "#@title Open code to show the hint\n",
+ "\n",
+ "#Hint: Replace the `Map` and `CombinePerKey` transforms with Beam's
`Count` transform (see above)*italicized text*"
+ ],
+ "metadata": {
+ "id": "lHmq9azUmldL",
+ "cellView": "form"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "#@title EDIT THIS CODE CELL TO USE beam.combiners.Count.PerElement\n",
+ "# EDIT THIS CODE CELL\n",
+ "\n",
+ "inputs_pattern = 'data/*'\n",
+ "outputs_prefix = 'outputs/userans'\n",
+ "\n",
+ "with beam.Pipeline() as pipeline:\n",
+ " word_count = (\n",
+ " pipeline\n",
+ " | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)\n",
+ " | 'Find words' >>\n",
+ " beam.FlatMap(lambda line: re.findall(r\"[a-zA-Z']+\",
line))\n",
+ " | 'Pair words with 1' >> beam.Map(lambda word: (word, 1))\n",
+ " | 'Group and sum' >> beam.CombinePerKey(sum)\n",
+ " | 'Write results' >> beam.io.WriteToText(outputs_prefix)\n",
+ " )\n",
+ "\n",
+ "# After you're done, check to see if your code outputs\n",
+ "# the same PCollection by uncommenting the code below\n",
+ "'''\n",
+ "# Sample the first 20 results, remember there are no ordering
guarantees.\n",
+ "run('head -n 20 {}-00000-of-*'.format(outputs_prefix))\n",
+ "'''"
+ ],
+ "metadata": {
+ "id": "QEE8nhc_lhPB"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "Below is our answer to check your work. It is the Word Count example
from above, but they now combine `Map` and `CombinePerKey` into one line using
the `Count` composite transform."
+ ],
+ "metadata": {
+ "id": "TW4rLroMIMPe"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "#@title Answer\n",
+ "inputs_pattern = 'data/*'\n",
+ "outputs_prefix = 'outputs/part2'\n",
+ "\n",
+ "# Running locally in the DirectRunner.\n",
+ "with beam.Pipeline() as pipeline:\n",
+ " word_count = (\n",
+ " pipeline\n",
+ " | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)\n",
+ " | 'Find words' >>\n",
+ " beam.FlatMap(lambda line: re.findall(r\"[a-zA-Z']+\",
line))\n",
+ " # Count composite transform from Beam SDK\n",
+ " | 'Count words' >> beam.combiners.Count.PerElement()\n",
+ " | 'Write results' >> beam.io.WriteToText(outputs_prefix)\n",
+ " )\n",
+ "\n",
+ "# Sample the first 20 results, remember there are no ordering
guarantees.\n",
+ "run('head -n 20 {}-00000-of-*'.format(outputs_prefix))"
+ ],
+ "metadata": {
+ "id": "T-utf0YLIvEe",
+ "cellView": "form"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "> Summary: Applying a composite transform is just like applying a
`PTransform` to your `PCollection`, but it simplifies the process by combining
multiple `PTransforms` in one line."
+ ],
+ "metadata": {
+ "id": "FlcAgn0HlP3v"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### 3.2 Creating Your Own Composite Transform\n",
+ "\n",
+ "We simplified the original code using a Beam SDK composite transform,
but we can simplify it further by creating our own composite transform
function."
+ ],
+ "metadata": {
+ "id": "IoENeJ1kMndv"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "Below is an example of a composite transform you can create that the
Beam SDK does not cover. The function combines the `Count` composite transform
you implemented above, as well as the `FlatMap` transform that converts lines
of texts into individual words.\n",
+ "\n",
+ "Note that because `Count` is itself a composite transform,
`CountWords` is also a nested composite transform."
+ ],
+ "metadata": {
+ "id": "e0Og38YMQ0hV"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# The CountWords Composite Transform inside the WordCount
pipeline.\n",
+ "@beam.ptransform_fn\n",
+ "def CountWords(pcoll):\n",
+ " return (\n",
+ " pcoll\n",
+ " # Convert lines of text into individual words.\n",
+ " | 'ExtractWords' >> beam.FlatMap(lambda x:
re.findall(r'[A-Za-z\\']+', x))\n",
+ " # Count the number of times each word occurs.\n",
+ " | beam.combiners.Count.PerElement()\n",
+ " )"
+ ],
+ "metadata": {
+ "id": "cvkGb-QcQyD3"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "You can then use this `CountWords` composite transform in your
`Pipeline`, making your pipeline more visually easy to parse through.\n",
+ "\n",
+ "Try editing the Word Count `Pipeline` below to incoporate this
transform into the pipeline.\n",
+ "\n",
+ "Below the code cell you will edit is a hidden answer code cell to
check your work. If you're stuck, try opening the hint first!"
+ ],
+ "metadata": {
+ "id": "OJCZKYxGNAfU"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "#@title Open code to show the hint\n",
+ "\n",
+ "#Hint: The newly defined transform combines the Count and FlatMap
transform.\n",
+ "#Replace the `FlatMap` and `Count` transforms with CountWords() (see
above)*italicized text*"
+ ],
+ "metadata": {
+ "id": "9XO-uwhL8w27",
+ "cellView": "form"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "#@title EDIT THIS CODE CELL TO USE YOUR `CountWords`\n",
+ "# EDIT THIS CODE CELL\n",
+ "\n",
+ "inputs_pattern = 'data/*'\n",
+ "outputs_prefix = 'outputs/part3'\n",
+ "\n",
+ "# Running locally in the DirectRunner.\n",
+ "with beam.Pipeline() as pipeline:\n",
+ " word_count = (\n",
+ " pipeline\n",
+ " | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)\n",
+ " | 'Find words' >> beam.FlatMap(lambda line:
re.findall(r\"[a-zA-Z']+\", line))\n",
+ " | 'Count words' >> beam.combiners.Count.PerElement()\n",
+ " | 'Write results' >> beam.io.WriteToText(outputs_prefix)\n",
+ " )\n",
+ "\n",
+ "pipeline.run()\n",
+ "# Sample the first 20 results, remember there are no ordering
guarantees.\n",
+ "run('head -n 20 {}-00000-of-*'.format(outputs_prefix))"
+ ],
+ "metadata": {
+ "id": "FyW5ug6S3-1B"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "#@title Answer\n",
+ "inputs_pattern = 'data/*'\n",
+ "outputs_prefix = 'outputs/part3'\n",
+ "\n",
+ "# Running locally in the DirectRunner.\n",
+ "with beam.Pipeline() as pipeline:\n",
+ " word_count = (\n",
+ " pipeline\n",
+ " | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)\n",
+ " # The composite transform function you created above\n",
+ " | 'Count Words' >> CountWords()\n",
+ " | 'Write results' >> beam.io.WriteToText(outputs_prefix)\n",
+ " )\n",
+ "\n",
+ "# Sample the first 20 results, remember there are no ordering
guarantees.\n",
+ "run('head -n 20 {}-00000-of-*'.format(outputs_prefix))"
+ ],
+ "metadata": {
+ "id": "6Cn4pk3j8K4p",
+ "cellView": "form"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### 3.3 Creating Your Own Composite Transform With `PTransform`
Directly"
Review Comment:
This section is wordy IMO. It might make sense to link to documentation here
##########
examples/notebooks/get-started/learn_beam_transforms_by_doing.ipynb:
##########
@@ -0,0 +1,740 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 0,
+ "metadata": {
+ "colab": {
+ "provenance": []
+ },
+ "kernelspec": {
+ "name": "python3",
+ "display_name": "Python 3"
+ },
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "cells": [
+ {
+ "cell_type": "code",
+ "source": [
+ "#@title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License."
+ ],
+ "metadata": {
+ "cellView": "form",
+ "id": "QgmD1wbmT4mj"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Learn Beam PTransforms\n",
+ "\n",
+ "After this notebook, you should be able to:\n",
+ "1. Use user-defined functions in your `PTransforms`\n",
+ "2. Learn Beam SDK composite transforms\n",
+ "3. Create you own composite transforms to simplify your `Pipeline`\n",
+ "\n",
+ "For basic Beam `PTransforms`, please check out [this
Notebook](https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/get-started/learn_beam_basics_by_doing.ipynb).\n",
+ "\n",
+ "Beam Python SDK also provides [a list of built-in
transforms](https://beam.apache.org/documentation/transforms/python/overview/).\n"
+ ],
+ "metadata": {
+ "id": "RuUHlGZjVt6W"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## How To Approach This Tutorial\n",
+ "\n",
+ "This tutorial was designed for someone who likes to learn by doing.
There will be code cells where you can write your own code to test your
understanding.\n",
+ "\n",
+ "As such, to get the most out of this tutorial, we strongly recommend
typing code by hand as you’re working through the tutorial and not using
copy/paste. This will help you develop muscle memory and a stronger
understanding."
+ ],
+ "metadata": {
+ "id": "Ldx0Z7nWGopE"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "To begin, run the cell below to install and import Apache Beam."
+ ],
+ "metadata": {
+ "id": "jy1zaj4NDE0T"
+ }
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "pNure-fW8hl3"
+ },
+ "outputs": [],
+ "source": [
+ "# Run a shell command and import beam\n",
+ "!pip install --quiet apache-beam\n",
+ "import apache_beam as beam\n",
+ "beam.__version__"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# Set the logging level to reduce verbose information\n",
+ "import logging\n",
+ "\n",
+ "logging.root.setLevel(logging.ERROR)"
+ ],
+ "metadata": {
+ "id": "vyksB2VMtv3m"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "\n",
+ "\n",
+ "---\n",
+ "\n",
+ "\n",
+ "\n",
+ "---\n",
+ "\n"
+ ],
+ "metadata": {
+ "id": "M1ku4nX_Gutb"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## 1. Simple User-Defined Function (UDF)\n",
+ "\n",
+ "Some `PTransforms` allow you to run your own functions and
user-defined code to specify how your transform is applied. For example, the
below `CombineGlobally` transform,"
+ ],
+ "metadata": {
+ "id": "0qDeT34SS1_8"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "pc = [1, 10, 100, 1000]\n",
+ "\n",
+ "# User-defined function\n",
+ "def bounded_sum(values, bound=500):\n",
+ " return min(sum(values), bound)\n",
+ "\n",
+ "small_sum = pc | beam.CombineGlobally(bounded_sum) # [500]\n",
+ "large_sum = pc | beam.CombineGlobally(bounded_sum, bound=5000) #
[1111]\n",
+ "\n",
+ "print(small_sum, large_sum)"
+ ],
+ "metadata": {
+ "id": "UZTWBGZ0TQWF"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## 2. Transforms: ParDo and Combine\n",
+ "\n",
+ "A `ParDo` transform considers each element in the input
`PCollection`, performs your user code to process each element, and emits zero,
one, or multiple elements to an output `PCollection`. `Combine` is another Beam
transform for combining collections of elements or values in your data.\n",
+ "Both allow flexible UDF to define how you process the data."
Review Comment:
```suggestion
"Both allow flexible UDF's to define how you process the data."
```
##########
examples/notebooks/get-started/learn_beam_transforms_by_doing.ipynb:
##########
@@ -0,0 +1,740 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 0,
+ "metadata": {
+ "colab": {
+ "provenance": []
+ },
+ "kernelspec": {
+ "name": "python3",
+ "display_name": "Python 3"
+ },
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "cells": [
+ {
+ "cell_type": "code",
+ "source": [
+ "#@title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License."
+ ],
+ "metadata": {
+ "cellView": "form",
+ "id": "QgmD1wbmT4mj"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Learn Beam PTransforms\n",
+ "\n",
+ "After this notebook, you should be able to:\n",
+ "1. Use user-defined functions in your `PTransforms`\n",
+ "2. Learn Beam SDK composite transforms\n",
+ "3. Create you own composite transforms to simplify your `Pipeline`\n",
+ "\n",
+ "For basic Beam `PTransforms`, please check out [this
Notebook](https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/get-started/learn_beam_basics_by_doing.ipynb).\n",
+ "\n",
+ "Beam Python SDK also provides [a list of built-in
transforms](https://beam.apache.org/documentation/transforms/python/overview/).\n"
+ ],
+ "metadata": {
+ "id": "RuUHlGZjVt6W"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## How To Approach This Tutorial\n",
+ "\n",
+ "This tutorial was designed for someone who likes to learn by doing.
There will be code cells where you can write your own code to test your
understanding.\n",
+ "\n",
+ "As such, to get the most out of this tutorial, we strongly recommend
typing code by hand as you’re working through the tutorial and not using
copy/paste. This will help you develop muscle memory and a stronger
understanding."
+ ],
+ "metadata": {
+ "id": "Ldx0Z7nWGopE"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "To begin, run the cell below to install and import Apache Beam."
+ ],
+ "metadata": {
+ "id": "jy1zaj4NDE0T"
+ }
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "pNure-fW8hl3"
+ },
+ "outputs": [],
+ "source": [
+ "# Run a shell command and import beam\n",
+ "!pip install --quiet apache-beam\n",
+ "import apache_beam as beam\n",
+ "beam.__version__"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# Set the logging level to reduce verbose information\n",
+ "import logging\n",
+ "\n",
+ "logging.root.setLevel(logging.ERROR)"
+ ],
+ "metadata": {
+ "id": "vyksB2VMtv3m"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "\n",
+ "\n",
+ "---\n",
+ "\n",
+ "\n",
+ "\n",
+ "---\n",
+ "\n"
+ ],
+ "metadata": {
+ "id": "M1ku4nX_Gutb"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## 1. Simple User-Defined Function (UDF)\n",
+ "\n",
+ "Some `PTransforms` allow you to run your own functions and
user-defined code to specify how your transform is applied. For example, the
below `CombineGlobally` transform,"
+ ],
+ "metadata": {
+ "id": "0qDeT34SS1_8"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "pc = [1, 10, 100, 1000]\n",
+ "\n",
+ "# User-defined function\n",
+ "def bounded_sum(values, bound=500):\n",
+ " return min(sum(values), bound)\n",
+ "\n",
+ "small_sum = pc | beam.CombineGlobally(bounded_sum) # [500]\n",
+ "large_sum = pc | beam.CombineGlobally(bounded_sum, bound=5000) #
[1111]\n",
+ "\n",
+ "print(small_sum, large_sum)"
+ ],
+ "metadata": {
+ "id": "UZTWBGZ0TQWF"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## 2. Transforms: ParDo and Combine\n",
+ "\n",
+ "A `ParDo` transform considers each element in the input
`PCollection`, performs your user code to process each element, and emits zero,
one, or multiple elements to an output `PCollection`. `Combine` is another Beam
transform for combining collections of elements or values in your data.\n",
+ "Both allow flexible UDF to define how you process the data."
+ ],
+ "metadata": {
+ "id": "UBFRcPO06xiV"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### 2.1 DoFn\n",
+ "\n",
+ "DoFn - a Beam Python class that defines a distributed processing
function (used in
[ParDo](https://beam.apache.org/documentation/programming-guide/#pardo))"
+ ],
+ "metadata": {
+ "id": "P4W-1HIiV-HP"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "data = [1, 2, 3, 4]\n",
+ "\n",
+ "# create a DoFn to multiply each element by five\n",
+ "# you can define the procesing code under `process`\n",
+ "class MultiplyByFive(beam.DoFn):\n",
+ " def process(self, element):\n",
+ " return [element*5]\n",
+ "\n",
+ "with beam.Pipeline() as pipeline:\n",
+ " outputs = (\n",
+ " pipeline\n",
+ " | 'Create values' >> beam.Create(data)\n",
+ " | 'Multiply by 5' >> beam.ParDo(MultiplyByFive())\n",
+ " )\n",
+ "\n",
+ " outputs | beam.Map(print)"
+ ],
+ "metadata": {
+ "id": "TjOzWnQd-dan"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### 2.2 CombineFn\n",
+ "\n",
+ "CombineFn - define associative and commutative aggregations (used in
[Combine](https://beam.apache.org/documentation/programming-guide/#combine))"
+ ],
+ "metadata": {
+ "id": "1qL2crwNXQXe"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "data = [1, 2, 3, 4]\n",
+ "\n",
+ "# create a CombineFn to get the product of each element\n",
+ "# you need to provide four opeations\n",
+ "class ProductFn(beam.CombineFn):\n",
+ " def create_accumulator(self):\n",
+ " # creates a new accumulator to store the initial value\n",
+ " return 1\n",
+ "\n",
+ " def add_input(self, current_prod, input):\n",
+ " # adds an input element to an accumulator\n",
+ " return current_prod*input\n",
+ "\n",
+ " def merge_accumulators(self, accumulators):\n",
+ " # merge several accumulators into a single accumulator\n",
+ " prod = 1\n",
+ " for accu in accumulators:\n",
+ " prod *= accu\n",
+ " return prod\n",
+ "\n",
+ " def extract_output(self, prod):\n",
+ " # performs the final computation\n",
+ " return prod\n",
+ "\n",
+ "with beam.Pipeline() as pipeline:\n",
+ " outputs = (\n",
+ " pipeline\n",
+ " | 'Create values' >> beam.Create(data)\n",
+ " | 'Multiply by 2' >> beam.CombineGlobally(ProductFn())\n",
+ " )\n",
+ " outputs | beam.LogElements()\n"
+ ],
+ "metadata": {
+ "id": "zxLatbOa9FyA"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "Note: The above `DoFn` and `CombineFn` examples are for demonstration
purposes. You could easily achieve the same functionality by using the simple
function illustrated in section 1."
+ ],
+ "metadata": {
+ "id": "r1Vw1d5vJoIE"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "\n",
+ "\n",
+ "---\n",
+ "\n"
+ ],
+ "metadata": {
+ "id": "bTer_URwS0wb"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## 3. Composite Transforms\n",
+ "\n",
+ "Now that you've learned the basic `PTransforms`, Beam allows you to
simplify the process of processing and transforming your data through
[Composite
Transforms](https://beam.apache.org/documentation/programming-guide/#composite-transforms).\n",
+ "\n",
+ "Composite transforms can nest multiple transforms into a single
composite transform, making your code easier to understand."
+ ],
+ "metadata": {
+ "id": "qPnSfU5wLTN5"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "To see an example of this, let's take a look at how we can improve
the `Pipeline` we built to count each word in Shakespeare's *King Lear*.\n",
+ "\n",
+ "Below is that `Pipeline` we built in [WordCount
tutorial](https://colab.research.google.com/drive/1_EncqFT_SmwXp7wlRqEf39m9efyrmm9p?usp=sharing):"
+ ],
+ "metadata": {
+ "id": "4tBsLkeatNUU"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "!mkdir -p data\n",
+ "!gsutil cp gs://dataflow-samples/shakespeare/kinglear.txt data/"
+ ],
+ "metadata": {
+ "id": "Vokbrhhyto7H"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "import re\n",
+ "\n",
+ "# Function used to run and display the result\n",
+ "def run(cmd):\n",
+ " print('>> {}'.format(cmd))\n",
+ " !{cmd}\n",
+ " print('')\n",
+ "\n",
+ "inputs_pattern = 'data/*'\n",
+ "outputs_prefix = 'outputs/part'\n",
+ "\n",
+ "# Running locally in the DirectRunner.\n",
+ "with beam.Pipeline() as pipeline:\n",
+ " word_count = (\n",
+ " pipeline\n",
+ " | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)\n",
+ " | 'Find words' >> beam.FlatMap(lambda line:
re.findall(r\"[a-zA-Z']+\", line))\n",
+ " | 'Pair words with 1' >> beam.Map(lambda word: (word, 1))\n",
+ " | 'Group and sum' >> beam.CombinePerKey(sum)\n",
+ " | 'Write results' >> beam.io.WriteToText(outputs_prefix)\n",
+ " )\n",
+ "\n",
+ "# Sample the first 20 results, remember there are no ordering
guarantees.\n",
+ "run('head -n 20 {}-00000-of-*'.format(outputs_prefix))"
+ ],
+ "metadata": {
+ "id": "R-4uyn0Tttr2"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "Although the code above is a viable way to design your `Pipeline`,
you can see that we use multiple transforms to perform one process:\n",
+ "1. `FlatMap` is used to find words in each line\n",
+ "2. `Map` is used to create key-value pairs with each word where the
value is 1\n",
+ "3. `CombinePerKey` is used so that we can then group by each word and
count up the sums\n",
+ "\n",
+ "All of these `PTransforms` in essence is meant to count each word in
*King Lear*. You can simplify the process and combine these three transforms
into one by using composite transforms.\n",
+ "\n",
+ "There's two ways you can follow:\n",
+ "1. Using Beam SDK's built-in composite transforms\n",
+ "2. Creating your own composite transforms"
+ ],
+ "metadata": {
+ "id": "wl8wLwZZtbnX"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### 3.1 Beam SDK Composite Transforms\n",
+ "Beam makes it easy for you with its Beam SDK which comes with a
package of many useful composite transforms. We will only cover one in this
tutorial but to see a list of transforms you can use, see the following API
reference page: [Pre-written Beam Transforms for
Python](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.html).\n"
+ ],
+ "metadata": {
+ "id": "nPG11AEkNMKK"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "By using a Beam SDK composite transform, you're able to easily
combine multiple transforms into one line.\n",
+ "\n",
+ "For this tutorial, we will use the SDK-provided [`Count`
transform](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.combiners.html#apache_beam.transforms.combiners.Count),
which counts each element in the `PCollection`.\n",
+ "\n",
+ "\n",
+ "```\n",
+ "beam.combiners.Count.PerElement()\n",
+ "```\n",
+ "\n"
+ ],
+ "metadata": {
+ "id": "CG3lYxgqsXNk"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "This `Count` transform performs the work that both the `Map` and
`CombinePerKey` transforms from our Word Count `Pipeline` but do it in one
line.\n",
+ "\n",
+ "Edit the Word Count `Pipeline` below to use a composite transform by
implementing Beam's `Count` transform (see above). Applying a composite
transform is just like applying a `PTransform` to your `PCollection`.\n",
+ "\n",
+ "Below the code cell you will edit is a hidden answer code cell to
check your work. If you're stuck, try opening the hint first!"
+ ],
+ "metadata": {
+ "id": "JQvwxTOHjOnr"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "#@title Open code to show the hint\n",
+ "\n",
+ "#Hint: Replace the `Map` and `CombinePerKey` transforms with Beam's
`Count` transform (see above)*italicized text*"
+ ],
+ "metadata": {
+ "id": "lHmq9azUmldL",
+ "cellView": "form"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "#@title EDIT THIS CODE CELL TO USE beam.combiners.Count.PerElement\n",
+ "# EDIT THIS CODE CELL\n",
+ "\n",
+ "inputs_pattern = 'data/*'\n",
+ "outputs_prefix = 'outputs/userans'\n",
+ "\n",
+ "with beam.Pipeline() as pipeline:\n",
+ " word_count = (\n",
+ " pipeline\n",
+ " | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)\n",
+ " | 'Find words' >>\n",
+ " beam.FlatMap(lambda line: re.findall(r\"[a-zA-Z']+\",
line))\n",
+ " | 'Pair words with 1' >> beam.Map(lambda word: (word, 1))\n",
+ " | 'Group and sum' >> beam.CombinePerKey(sum)\n",
+ " | 'Write results' >> beam.io.WriteToText(outputs_prefix)\n",
+ " )\n",
+ "\n",
+ "# After you're done, check to see if your code outputs\n",
+ "# the same PCollection by uncommenting the code below\n",
+ "'''\n",
+ "# Sample the first 20 results, remember there are no ordering
guarantees.\n",
+ "run('head -n 20 {}-00000-of-*'.format(outputs_prefix))\n",
+ "'''"
+ ],
+ "metadata": {
+ "id": "QEE8nhc_lhPB"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "Below is our answer to check your work. It is the Word Count example
from above, but they now combine `Map` and `CombinePerKey` into one line using
the `Count` composite transform."
+ ],
+ "metadata": {
+ "id": "TW4rLroMIMPe"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "#@title Answer\n",
+ "inputs_pattern = 'data/*'\n",
+ "outputs_prefix = 'outputs/part2'\n",
+ "\n",
+ "# Running locally in the DirectRunner.\n",
+ "with beam.Pipeline() as pipeline:\n",
+ " word_count = (\n",
+ " pipeline\n",
+ " | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)\n",
+ " | 'Find words' >>\n",
+ " beam.FlatMap(lambda line: re.findall(r\"[a-zA-Z']+\",
line))\n",
+ " # Count composite transform from Beam SDK\n",
+ " | 'Count words' >> beam.combiners.Count.PerElement()\n",
+ " | 'Write results' >> beam.io.WriteToText(outputs_prefix)\n",
+ " )\n",
+ "\n",
+ "# Sample the first 20 results, remember there are no ordering
guarantees.\n",
+ "run('head -n 20 {}-00000-of-*'.format(outputs_prefix))"
+ ],
+ "metadata": {
+ "id": "T-utf0YLIvEe",
+ "cellView": "form"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "> Summary: Applying a composite transform is just like applying a
`PTransform` to your `PCollection`, but it simplifies the process by combining
multiple `PTransforms` in one line."
+ ],
+ "metadata": {
+ "id": "FlcAgn0HlP3v"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### 3.2 Creating Your Own Composite Transform\n",
+ "\n",
+ "We simplified the original code using a Beam SDK composite transform,
but we can simplify it further by creating our own composite transform
function."
+ ],
+ "metadata": {
+ "id": "IoENeJ1kMndv"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "Below is an example of a composite transform you can create that the
Beam SDK does not cover. The function combines the `Count` composite transform
you implemented above, as well as the `FlatMap` transform that converts lines
of texts into individual words.\n",
+ "\n",
+ "Note that because `Count` is itself a composite transform,
`CountWords` is also a nested composite transform."
+ ],
+ "metadata": {
+ "id": "e0Og38YMQ0hV"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# The CountWords Composite Transform inside the WordCount
pipeline.\n",
+ "@beam.ptransform_fn\n",
+ "def CountWords(pcoll):\n",
+ " return (\n",
+ " pcoll\n",
+ " # Convert lines of text into individual words.\n",
+ " | 'ExtractWords' >> beam.FlatMap(lambda x:
re.findall(r'[A-Za-z\\']+', x))\n",
+ " # Count the number of times each word occurs.\n",
+ " | beam.combiners.Count.PerElement()\n",
+ " )"
+ ],
+ "metadata": {
+ "id": "cvkGb-QcQyD3"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "You can then use this `CountWords` composite transform in your
`Pipeline`, making your pipeline more visually easy to parse through.\n",
+ "\n",
+ "Try editing the Word Count `Pipeline` below to incoporate this
transform into the pipeline.\n",
+ "\n",
+ "Below the code cell you will edit is a hidden answer code cell to
check your work. If you're stuck, try opening the hint first!"
+ ],
+ "metadata": {
+ "id": "OJCZKYxGNAfU"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "#@title Open code to show the hint\n",
+ "\n",
+ "#Hint: The newly defined transform combines the Count and FlatMap
transform.\n",
+ "#Replace the `FlatMap` and `Count` transforms with CountWords() (see
above)*italicized text*"
+ ],
+ "metadata": {
+ "id": "9XO-uwhL8w27",
+ "cellView": "form"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "#@title EDIT THIS CODE CELL TO USE YOUR `CountWords`\n",
+ "# EDIT THIS CODE CELL\n",
+ "\n",
+ "inputs_pattern = 'data/*'\n",
+ "outputs_prefix = 'outputs/part3'\n",
+ "\n",
+ "# Running locally in the DirectRunner.\n",
+ "with beam.Pipeline() as pipeline:\n",
+ " word_count = (\n",
+ " pipeline\n",
+ " | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)\n",
+ " | 'Find words' >> beam.FlatMap(lambda line:
re.findall(r\"[a-zA-Z']+\", line))\n",
+ " | 'Count words' >> beam.combiners.Count.PerElement()\n",
+ " | 'Write results' >> beam.io.WriteToText(outputs_prefix)\n",
+ " )\n",
+ "\n",
+ "pipeline.run()\n",
+ "# Sample the first 20 results, remember there are no ordering
guarantees.\n",
+ "run('head -n 20 {}-00000-of-*'.format(outputs_prefix))"
+ ],
+ "metadata": {
+ "id": "FyW5ug6S3-1B"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "#@title Answer\n",
+ "inputs_pattern = 'data/*'\n",
+ "outputs_prefix = 'outputs/part3'\n",
+ "\n",
+ "# Running locally in the DirectRunner.\n",
+ "with beam.Pipeline() as pipeline:\n",
+ " word_count = (\n",
+ " pipeline\n",
+ " | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)\n",
+ " # The composite transform function you created above\n",
+ " | 'Count Words' >> CountWords()\n",
+ " | 'Write results' >> beam.io.WriteToText(outputs_prefix)\n",
+ " )\n",
+ "\n",
+ "# Sample the first 20 results, remember there are no ordering
guarantees.\n",
+ "run('head -n 20 {}-00000-of-*'.format(outputs_prefix))"
+ ],
+ "metadata": {
+ "id": "6Cn4pk3j8K4p",
+ "cellView": "form"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### 3.3 Creating Your Own Composite Transform With `PTransform`
Directly"
+ ],
+ "metadata": {
+ "id": "w73aU2IsDAS1"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "To create your own composite transform, create a subclass of the
`PTransform` class and override the `expand` method to specify the actual
processing logic\n",
+ "([more
details](https://beam.apache.org/documentation/programming-guide/#composite-transform-creation))."
+ ],
+ "metadata": {
+ "id": "uLEAICZKDz6e"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "For example, if we wanted to create our own composite transform that
counted the length of each word.\n",
+ "\n",
+ "The following code sample shows how to declare a `PTransform` that
accepts a `PCollection` of Strings for input, and outputs a `PCollection` of
integers\n",
+ "to show the string lengths.\n",
+ "\n",
+ "Within your `PTransform` subclass, you’ll need to override the
`expand` method. The `expand` method is where you add the processing logic for
the `PTransform`. Your override of `expand` must accept the appropriate type of
input `PCollection` as a parameter, and specify the output `PCollection` as the
return value.\n",
+ "\n",
+ "As long as you override the `expand` method in your `PTransform`
subclass to accept the appropriate input `PCollection`(s) and return the
corresponding output `PCollection`(s), you can include as many transforms as
you want. These transforms can include core transforms, composite transforms,
or the transforms included in the Beam SDK libraries.\n",
Review Comment:
```suggestion
"As long as you override the `expand` method in your `PTransform`
subclass to accept the appropriate input `PCollection`(s) and return the
corresponding output `PCollection`(s), you can include as many transforms as
you want. These transforms can include core transforms (ParDo), composite
transforms, or the transforms included in the Beam SDK libraries.\n",
```
##########
examples/notebooks/get-started/learn_beam_transforms_by_doing.ipynb:
##########
@@ -0,0 +1,740 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 0,
+ "metadata": {
+ "colab": {
+ "provenance": []
+ },
+ "kernelspec": {
+ "name": "python3",
+ "display_name": "Python 3"
+ },
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "cells": [
+ {
+ "cell_type": "code",
+ "source": [
+ "#@title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License."
+ ],
+ "metadata": {
+ "cellView": "form",
+ "id": "QgmD1wbmT4mj"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Learn Beam PTransforms\n",
+ "\n",
+ "After this notebook, you should be able to:\n",
+ "1. Use user-defined functions in your `PTransforms`\n",
+ "2. Learn Beam SDK composite transforms\n",
+ "3. Create you own composite transforms to simplify your `Pipeline`\n",
+ "\n",
+ "For basic Beam `PTransforms`, please check out [this
Notebook](https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/get-started/learn_beam_basics_by_doing.ipynb).\n",
+ "\n",
+ "Beam Python SDK also provides [a list of built-in
transforms](https://beam.apache.org/documentation/transforms/python/overview/).\n"
+ ],
+ "metadata": {
+ "id": "RuUHlGZjVt6W"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## How To Approach This Tutorial\n",
+ "\n",
+ "This tutorial was designed for someone who likes to learn by doing.
There will be code cells where you can write your own code to test your
understanding.\n",
+ "\n",
+ "As such, to get the most out of this tutorial, we strongly recommend
typing code by hand as you’re working through the tutorial and not using
copy/paste. This will help you develop muscle memory and a stronger
understanding."
+ ],
+ "metadata": {
+ "id": "Ldx0Z7nWGopE"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "To begin, run the cell below to install and import Apache Beam."
+ ],
+ "metadata": {
+ "id": "jy1zaj4NDE0T"
+ }
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "pNure-fW8hl3"
+ },
+ "outputs": [],
+ "source": [
+ "# Run a shell command and import beam\n",
+ "!pip install --quiet apache-beam\n",
+ "import apache_beam as beam\n",
+ "beam.__version__"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# Set the logging level to reduce verbose information\n",
+ "import logging\n",
+ "\n",
+ "logging.root.setLevel(logging.ERROR)"
+ ],
+ "metadata": {
+ "id": "vyksB2VMtv3m"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "\n",
+ "\n",
+ "---\n",
+ "\n",
+ "\n",
+ "\n",
+ "---\n",
+ "\n"
+ ],
+ "metadata": {
+ "id": "M1ku4nX_Gutb"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## 1. Simple User-Defined Function (UDF)\n",
+ "\n",
+ "Some `PTransforms` allow you to run your own functions and
user-defined code to specify how your transform is applied. For example, the
below `CombineGlobally` transform,"
+ ],
+ "metadata": {
+ "id": "0qDeT34SS1_8"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "pc = [1, 10, 100, 1000]\n",
+ "\n",
+ "# User-defined function\n",
+ "def bounded_sum(values, bound=500):\n",
+ " return min(sum(values), bound)\n",
+ "\n",
+ "small_sum = pc | beam.CombineGlobally(bounded_sum) # [500]\n",
+ "large_sum = pc | beam.CombineGlobally(bounded_sum, bound=5000) #
[1111]\n",
+ "\n",
+ "print(small_sum, large_sum)"
+ ],
+ "metadata": {
+ "id": "UZTWBGZ0TQWF"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## 2. Transforms: ParDo and Combine\n",
+ "\n",
+ "A `ParDo` transform considers each element in the input
`PCollection`, performs your user code to process each element, and emits zero,
one, or multiple elements to an output `PCollection`. `Combine` is another Beam
transform for combining collections of elements or values in your data.\n",
+ "Both allow flexible UDF to define how you process the data."
+ ],
+ "metadata": {
+ "id": "UBFRcPO06xiV"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### 2.1 DoFn\n",
+ "\n",
+ "DoFn - a Beam Python class that defines a distributed processing
function (used in
[ParDo](https://beam.apache.org/documentation/programming-guide/#pardo))"
+ ],
+ "metadata": {
+ "id": "P4W-1HIiV-HP"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "data = [1, 2, 3, 4]\n",
+ "\n",
+ "# create a DoFn to multiply each element by five\n",
+ "# you can define the procesing code under `process`\n",
+ "class MultiplyByFive(beam.DoFn):\n",
+ " def process(self, element):\n",
+ " return [element*5]\n",
+ "\n",
+ "with beam.Pipeline() as pipeline:\n",
+ " outputs = (\n",
+ " pipeline\n",
+ " | 'Create values' >> beam.Create(data)\n",
+ " | 'Multiply by 5' >> beam.ParDo(MultiplyByFive())\n",
+ " )\n",
+ "\n",
+ " outputs | beam.Map(print)"
+ ],
+ "metadata": {
+ "id": "TjOzWnQd-dan"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### 2.2 CombineFn\n",
+ "\n",
+ "CombineFn - define associative and commutative aggregations (used in
[Combine](https://beam.apache.org/documentation/programming-guide/#combine))"
+ ],
+ "metadata": {
+ "id": "1qL2crwNXQXe"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "data = [1, 2, 3, 4]\n",
+ "\n",
+ "# create a CombineFn to get the product of each element\n",
+ "# you need to provide four opeations\n",
+ "class ProductFn(beam.CombineFn):\n",
+ " def create_accumulator(self):\n",
+ " # creates a new accumulator to store the initial value\n",
+ " return 1\n",
+ "\n",
+ " def add_input(self, current_prod, input):\n",
+ " # adds an input element to an accumulator\n",
+ " return current_prod*input\n",
+ "\n",
+ " def merge_accumulators(self, accumulators):\n",
+ " # merge several accumulators into a single accumulator\n",
+ " prod = 1\n",
+ " for accu in accumulators:\n",
+ " prod *= accu\n",
+ " return prod\n",
+ "\n",
+ " def extract_output(self, prod):\n",
+ " # performs the final computation\n",
+ " return prod\n",
+ "\n",
+ "with beam.Pipeline() as pipeline:\n",
+ " outputs = (\n",
+ " pipeline\n",
+ " | 'Create values' >> beam.Create(data)\n",
+ " | 'Multiply by 2' >> beam.CombineGlobally(ProductFn())\n",
+ " )\n",
+ " outputs | beam.LogElements()\n"
+ ],
+ "metadata": {
+ "id": "zxLatbOa9FyA"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "Note: The above `DoFn` and `CombineFn` examples are for demonstration
purposes. You could easily achieve the same functionality by using the simple
function illustrated in section 1."
+ ],
+ "metadata": {
+ "id": "r1Vw1d5vJoIE"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "\n",
+ "\n",
+ "---\n",
+ "\n"
+ ],
+ "metadata": {
+ "id": "bTer_URwS0wb"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## 3. Composite Transforms\n",
+ "\n",
+ "Now that you've learned the basic `PTransforms`, Beam allows you to
simplify the process of processing and transforming your data through
[Composite
Transforms](https://beam.apache.org/documentation/programming-guide/#composite-transforms).\n",
+ "\n",
+ "Composite transforms can nest multiple transforms into a single
composite transform, making your code easier to understand."
+ ],
+ "metadata": {
+ "id": "qPnSfU5wLTN5"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "To see an example of this, let's take a look at how we can improve
the `Pipeline` we built to count each word in Shakespeare's *King Lear*.\n",
+ "\n",
+ "Below is that `Pipeline` we built in [WordCount
tutorial](https://colab.research.google.com/drive/1_EncqFT_SmwXp7wlRqEf39m9efyrmm9p?usp=sharing):"
+ ],
+ "metadata": {
+ "id": "4tBsLkeatNUU"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "!mkdir -p data\n",
+ "!gsutil cp gs://dataflow-samples/shakespeare/kinglear.txt data/"
+ ],
+ "metadata": {
+ "id": "Vokbrhhyto7H"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "import re\n",
+ "\n",
+ "# Function used to run and display the result\n",
+ "def run(cmd):\n",
+ " print('>> {}'.format(cmd))\n",
+ " !{cmd}\n",
+ " print('')\n",
+ "\n",
+ "inputs_pattern = 'data/*'\n",
+ "outputs_prefix = 'outputs/part'\n",
+ "\n",
+ "# Running locally in the DirectRunner.\n",
+ "with beam.Pipeline() as pipeline:\n",
+ " word_count = (\n",
+ " pipeline\n",
+ " | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)\n",
+ " | 'Find words' >> beam.FlatMap(lambda line:
re.findall(r\"[a-zA-Z']+\", line))\n",
+ " | 'Pair words with 1' >> beam.Map(lambda word: (word, 1))\n",
+ " | 'Group and sum' >> beam.CombinePerKey(sum)\n",
+ " | 'Write results' >> beam.io.WriteToText(outputs_prefix)\n",
+ " )\n",
+ "\n",
+ "# Sample the first 20 results, remember there are no ordering
guarantees.\n",
+ "run('head -n 20 {}-00000-of-*'.format(outputs_prefix))"
+ ],
+ "metadata": {
+ "id": "R-4uyn0Tttr2"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "Although the code above is a viable way to design your `Pipeline`,
you can see that we use multiple transforms to perform one process:\n",
+ "1. `FlatMap` is used to find words in each line\n",
+ "2. `Map` is used to create key-value pairs with each word where the
value is 1\n",
+ "3. `CombinePerKey` is used so that we can then group by each word and
count up the sums\n",
+ "\n",
+ "All of these `PTransforms` in essence is meant to count each word in
*King Lear*. You can simplify the process and combine these three transforms
into one by using composite transforms.\n",
Review Comment:
```suggestion
"All of these `PTransforms`, in combination, are meant to count each
word in *King Lear*. You can simplify the process and combine these three
transforms into one by using composite transforms.\n",
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]