tvalentyn commented on code in PR #25719:
URL: https://github.com/apache/beam/pull/25719#discussion_r1140883071


##########
examples/notebooks/get-started/learn_beam_basics_by_doing.ipynb:
##########
@@ -0,0 +1,1095 @@
+{
+  "cells": [
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "colab_type": "text",
+        "id": "view-in-github"
+      },
+      "source": [
+        "<a 
href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/get-started/learn_beam_basics_by_doing.ipynb\";
 target=\"_parent\"><img 
src=\"https://colab.research.google.com/assets/colab-badge.svg\"; alt=\"Open In 
Colab\"/></a>"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "V913rQcmLS72"
+      },
+      "source": [
+        "# Welcome to Apache Beam!\n",
+        "\n",
+        "This notebook will be your introductory guide to Beam's main concepts 
and its uses. This tutorial **does not** assume any prior Apache Beam 
knowledge.\n",
+        "\n",
+        "We'll cover what Beam is, what it does, and a few basic 
transforms!\n",
+        "\n",
+        "We aim to give you familiarity with:\n",
+        "- Creating a `Pipeline`\n",
+        "- Creating a `PCollection`\n",
+        "- Performing basic `PTransforms`\n",
+        "  - Map\n",
+        "  - Filter\n",
+        "  - FlatMap\n",
+        "  - Combine\n",
+        "- Applications\n"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "Lduh-9oXt3P_"
+      },
+      "source": [
+        "## How To Approach This Tutorial\n",
+        "\n",
+        "This tutorial was designed for someone who likes to **learn by 
doing**. As such, there will be opportunities for you to practice writing your 
own code in these notebooks with the answer hidden in a cell below.\n",
+        "\n",
+        "Codes that require editing will be with an `...` and each cell title 
will say `Edit This Code`. However, you are free to play around with the other 
cells if you would like to add something beyond our tutorial.\n",
+        "\n",
+        "It may be tempting to just copy and paste solutions, but even if you 
do look at the Answer cells, try typing out the solutions manually. The muscle 
memory will be very helpful.\n",
+        "\n",
+        "> Tip: For those who would like to learn concepts more from the 
ground up, check out these 
[notebooks](https://beam.apache.org/get-started/tour-of-beam/)!"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "42B-64Lvef3K"
+      },
+      "source": [
+        "## Prerequisites\n",
+        "\n",
+        "We'll assume you have familiarity with Python or Pandas, but you 
should be able to follow along even if you’re coming from a different 
programming language. We'll also assume you understand programming concepts 
like functions, objects, arrays, and dictionaries.\n",
+        "\n"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "SeDD0nardXyL"
+      },
+      "source": [
+        "## Running CoLab\n",
+        "\n",
+        "To navigate through different sections, use the table of contents. 
From View drop-down list, select Table of contents.\n",
+        "\n",
+        "To run a code cell, you can click the Run cell button at the top left 
of the cell, or by select it and press `Shift+Enter`. Try modifying a code cell 
and re-running it to see what happens."
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "UdMPnMDDkGc8"
+      },
+      "source": [
+        "To begin, we have to set up our environment. Let's install and import 
Apache Beam by running the cell below."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "5cPHukaOgDDM"
+      },
+      "outputs": [],
+      "source": [
+        "# Remember: You can press shift+enter to run this cell\n",
+        "!pip install --quiet apache-beam\n",
+        "import apache_beam as beam"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "30l8_MD-undP"
+      },
+      "outputs": [],
+      "source": [
+        "# Set the logging level to reduce verbose information\n",
+        "import logging\n",
+        "\n",
+        "logging.root.setLevel(logging.ERROR)\n"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "gyoo1gLKtZmU"
+      },
+      "source": [
+        "\n",
+        "\n",
+        "---\n",
+        "\n",
+        "\n",
+        "\n",
+        "---\n",
+        "\n"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "N29KJTfdMCtB"
+      },
+      "source": [
+        "# What is Apache Beam?"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "B2evhzEuMu8a"
+      },
+      "source": [
+        "Apache Beam is a library for data processing. It is often used for 
[Extract-Transform-Load 
(ETL)](https://en.wikipedia.org/wiki/Extract,_transform,_load) jobs, where 
we:\n",
+        "1. *Extract* from a data source\n",
+        "2. *Transform* that data\n",
+        "3. *Load* that data into a data sink (like a database)\n",
+        "\n",
+        "Apache Beam makes these jobs easy with the ability to process 
everything at the same time and its unified model and open-source SDKs. There 
are many more parts of Beam, but throughout these tutorials, we will break down 
each part to show you how they will all fit together.\n",
+        "\n",
+        "For this tutorial, you will use these Beam SDKs to build your own 
`Pipeline` to process your data.\n",
+        "\n",
+        "Below, we will run through creating the heart of the `Pipeline`. 
There are three main abstractions in Beam:\n",
+        "1. `Pipeline`\n",
+        "2. `PCollection`\n",
+        "3. `PTransform`"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "WecDJbfqpWb1"
+      },
+      "source": [
+        "\n",
+        "\n",
+        "---\n",
+        "\n",
+        "\n",
+        "---\n",
+        "\n",
+        "\n"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "FditMJAIAp9Q"
+      },
+      "source": [
+        "# 1. Design Your Pipeline"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "_xYj6e3Hvt1W"
+      },
+      "source": [
+        "## 1.1 What is a Pipeline"
+      ]
+    },
+    {
+      "attachments": {},
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "bVJCh9FLBtU9"
+      },
+      "source": [
+        "A `Pipeline` describes the whole cycle of your data processing task, 
starting from the data sources to the processing transforms you will apply to 
them until your desired output.\n",
+        "\n",
+        "`Pipeline` is responsible for reading, processing, and saving the 
data. \n",
+        "Each `PTransform` is done on or outputs a `PCollection`, and this 
process is done in your `Pipeline`.\n",
+        "More glossary details can be found at 
[here](https://beam.apache.org/documentation/glossary/).\n",
+        "\n",
+        "A diagram of this process is shown below:\n",
+        "\n",
+        "<img 
src='https://beam.apache.org/images/design-your-pipeline-linear.svg'>"
+      ]
+    },
+    {
+      "attachments": {},
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "cyn8_mB6zN4m"
+      },
+      "source": [
+        "In code, this process will look like this:\n",
+        "\n",
+        "\n",
+        "```\n",
+        "# Each `step` represents a specific transform. After `step3`, it will 
save the data reference to `outputs`.\n",
+        "outputs = pipeline | step1 | step2 | step3\n",
+        "```\n",
+        "\n",
+        ">The pipe operator `|`  applies the `PTransform` on the right side of 
the pipe to the input `PCollection`.\n"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "gUyk2UWypI7g"
+      },
+      "source": [
+        "Pipelines can quickly grow long, so it's sometimes easier to read if 
we surround them with parentheses and break them into multiple lines.\n",
+        "\n",
+        "```\n",
+        "# This is equivalent to the example above.\n",
+        "outputs = (\n",
+        "  pipeline\n",
+        "  | step1\n",
+        "  | step2\n",
+        "  | step3\n",
+        ")\n",
+        "```\n",
+        "\n"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "7hmlbjPlvZVY"
+      },
+      "source": [
+        "Sometimes, the transform names aren't very descriptive. Beam allows 
each transform, or step, to have a unique label, or description. This makes it 
a lot easier to debug, and it's in general a good practice to start.\n",
+        "\n",
+        "> You can use the right shift operator `>>` to add a label to your 
transforms, like `'My description' >> MyTransform`.\n",
+        "\n",
+        "```\n",
+        "# Try to give short but descriptive labels.\n",
+        "# These serve both as comments and help debug later on.\n",
+        "outputs = (\n",
+        "  pipeline\n",
+        "  | 'First step' >> step1\n",
+        "  | 'Second step' >> step2\n",
+        "  | 'Third step' >> step3\n",
+        ")\n",
+        "```"
+      ]
+    },
+    {
+      "attachments": {},
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "GXQ__Kwxvr3J"
+      },
+      "source": [
+        "## 1.2 Loading Our Data\n",
+        "\n",
+        "Now, you can try to write your own pipeline!\n",
+        "\n",
+        "First, let's load the example data we will be using throughout this 
tutorial into our file directory. This 
[dataset](https://archive.ics.uci.edu/ml/datasets/SMS+Spam+Collection) consists 
of a **collection of SMS messages in English tagged as either \"spam\" or 
\"ham\" (a legitimate SMS\n",
+        ")**.\n",
+        "\n",
+        "For this tutorial, we will create a pipeline to **explore the dataset 
using Beam to count words in SMS messages that contain spam or ham**."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "BkZ0wxsBPuvO"
+      },
+      "outputs": [],
+      "source": [
+        "# Creates a data directory with our dataset SMSSpamCollection\n",
+        "!mkdir -p data\n",
+        "!gsutil cp gs://apachebeamdt/SMSSpamCollection data/"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "TiHFDIMnLTbm"
+      },
+      "source": [
+        "**What does the data look like?**\n",
+        "\n",
+        "This dataset is a `txt` file with 5,574 rows and 4 columns recording 
the following attributes:\n",
+        "1. `Column 1`: The label (either `ham` or `spam`)\n",
+        "2. `Column 2`: The SMS as raw text (type `string`)"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {},
+      "outputs": [],
+      "source": [
+        "! head data/SMSSpamCollection"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "r-0WM38KNaTI"
+      },
+      "source": [
+        "## 1.3 Writing Our Own Pipeline\n",
+        "\n",
+        "Now that we understand our dataset, let's go into creating our 
pipeline.\n"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "1Ajz7-WQCVfj"
+      },
+      "source": [
+        "To initialize a `Pipeline`, you first assign your pipeline 
`beam.Pipeline()` to a name. Assign your pipeline to the name, `pipeline`, in 
the code cell below."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "cXm0n80E9ZwT"
+      },
+      "outputs": [],
+      "source": [
+        "#@title Edit This Code Cell\n",
+        "..."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "cellView": "form",
+        "id": "q0gzEcos9hjp"
+      },
+      "outputs": [],
+      "source": [
+        "#@title Answer\n",
+        "pipeline = beam.Pipeline()"
+      ]
+    },
+    {
+      "attachments": {},
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "-K6tpdVK9KzC"
+      },
+      "source": [
+        "This pipeline will be where we create our transformed `PCollection`. 
In Beam, your data lives in a `PCollection`, which stands for `Parallel 
Collection`.\n",
+        "\n",
+        "A **PCollection** is like a list of elements, but without any order 
guarantees. This allows Beam to easily parallelize and distribute the 
`PCollection`'s elements.\n",
+        "\n",
+        "Now, let's use one of Beam's `Read` transforms to turn our text file 
(our dataset) into a `PCollection`. "
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "MS0XWqTiAIlE"
+      },
+      "source": [
+        "## 1.4 Reading from Text File\n",
+        "\n",
+        "We can use the\n",
+        
"[`ReadFromText`](https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.ReadFromText)\n",
+        "transform to read text files into `str` elements.\n",
+        "\n",
+        "It takes a\n",
+        "[_glob 
pattern_](https://en.wikipedia.org/wiki/Glob_%28programming%29)\n",
+        "as an input, and reads all the files that match that pattern. For 
example, in the pattern `data/*.txt`, the `*` is a wildcard that matches 
anything. This pattern matches all the files in the `data/` directory with a 
`.txt` extension. It then **returns one element for each line** in the file.\n"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "PXUSzLhhBwdr"
+      },
+      "source": [
+        "Because we only want this pipeline to read the `SMSSpamCollection` 
file that's in the `data/` directory, we will specify the input pattern to be 
`'data/SMSSpamCollection'`.\n",
+        "\n",
+        "We will then use that input pattern with our transform 
`beam.io.ReadFromText()` and apply it onto our pipeline. The 
`beam.io.ReadFromText()` transform can take in an input pattern as an input."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "HYs5697QEArB"
+      },
+      "outputs": [],
+      "source": [
+        "#@title Hint\n",
+        "# If you get stuck on the syntax, use the Table of Contents to 
navigate to 1.1\n",
+        "# What is a Pipeline and reread that section."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "p5z6C65tEmtM"
+      },
+      "outputs": [],
+      "source": [
+        "#@title Edit This Code\n",
+        "\n",
+        "inputs_pattern = 'data/SMSSpamCollection'\n",
+        "\n",
+        "pipeline = beam.Pipeline()\n",
+        "\n",
+        "outputs = (\n",
+        "  pipeline\n",
+        "  | beam.io.ReadFromText(inputs_pattern)\n",
+        ")"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "cellView": "form",
+        "id": "zC9blgIXBv5A"
+      },
+      "outputs": [],
+      "source": [
+        "#@title Answer\n",
+        "inputs_pattern = 'data/SMSSpamCollection'\n",
+        "\n",
+        "pipeline = beam.Pipeline()\n",
+        "\n",
+        "# Remember: | is the apply function in Beam in Python\n",
+        "outputs = (\n",
+        "    pipeline\n",
+        "      # Remember to add short descriptions to your transforms for 
good practice and easier understanding\n",
+        "      | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)\n",
+        ")"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "ISm7PF9wE7Ts"
+      },
+      "source": [
+        "## 1.5 Writing to Text File\n",
+        "\n",
+        "Now, how do we know if we did it correctly? Let's take a look at the 
text file you just read.\n",
+        "\n",
+        "You may have noticed that we can't simply `print` the output 
`PCollection` to see the elements. In Beam, you can __NOT__ access the elements 
from a `PCollection` directly like a Python list.\n",
+        "\n",
+        "This is because, depending on the runner,\n",
+        "the `PCollection` elements might live in multiple worker machines.\n",
+        "\n",
+        "However, we can see our output `PCollection` by using a 
[`WriteToText`](https://beam.apache.org/releases/pydoc/2.27.0/apache_beam.io.textio.html#apache_beam.io.textio.WriteToText)
 transform to turn our `str` elements into a `txt` file (or another file type 
of your choosing) and then running a command to show the head of our output 
file."
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "iz73k9d1IUGw"
+      },
+      "source": [
+        "> `beam.io.WriteToText` takes a _file path prefix_ as an input, and 
it writes the all `str` elements into one or more files with filenames starting 
with that prefix.\n",
+        "> You can optionally pass a `file_name_suffix` as well, usually used 
for the file extension.\n",
+        "> Each element goes into its own line in the output files."
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "D80HrbeKFxCv"
+      },
+      "source": [
+        "Now, you can try it. Save the results to a file path prefix 
`'output'` and make the file_name_suffix `'.txt'`"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "v6RvVVINKOt9"
+      },
+      "outputs": [],
+      "source": [
+        "#@title Edit This Code\n",
+        "inputs_pattern = 'data/SMSSpamCollection'\n",
+        "\n",
+        "pipeline = beam.Pipeline()\n",
+        "\n",
+        "# Remember: | is the apply function in Beam in Python\n",
+        "outputs = (\n",
+        "    pipeline\n",
+        "      | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)\n",
+        "      | 'Write results' >> beam.io.WriteToText(..., file_name_suffix 
= ...)\n",
+        "      # To see the results from the previous transform\n",
+        "      | 'Print the text file name' >> beam.Map(print) # or 
beam.LogElements()\n",
+        ")\n",
+        "\n",
+        "# To run the pipeline\n",
+        "pipeline.run()\n",
+        "\n",
+        "# The command used to view your output txt file.\n",
+        "# If you choose to save the file path prefix to a different location 
or change the file type,\n",
+        "# you have to update this command as well.\n",
+        "! head output*.txt"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "cellView": "form",
+        "id": "nRYxclPaJArp"
+      },
+      "outputs": [],
+      "source": [
+        "#@title Answer\n",
+        "inputs_pattern = 'data/SMSSpamCollection'\n",
+        "\n",
+        "pipeline = beam.Pipeline()\n",
+        "\n",
+        "outputs = (\n",
+        "    pipeline\n",
+        "      | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)\n",
+        "      # ADDED\n",
+        "      | 'Write results' >> beam.io.WriteToText(\"ansoutput1\", 
file_name_suffix = \".txt\")\n",
+        "      | 'Print the text file name' >> beam.Map(print)\n",
+        ")\n",
+        "\n",
+        "pipeline.run()\n",
+        "\n",
+        "# The file this data is saved to is called \"ansoutput1\" as seen in 
the WriteToText transform.\n",
+        "# The command below and the transform input should match.\n",
+        "! head ansoutput1*.txt"
+      ]
+    },
+    {
+      "attachments": {},
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "iie23Id-O33B"
+      },
+      "source": [
+        "# 2. PTransforms\n",
+        "\n",
+        "Now that we have read in our code, we can now process our data to 
explore the text messages that could be classified as spam or non-spam 
(ham).\n",
+        "\n",
+        "In order to achieve this, we need to use `PTransforms`.\n",
+        "\n",
+        "A **`PTransform`** is any data processing operation that performs a 
processing function on one or more `PCollection`, outputting zero or more 
`PCollection`.\n",
+        "\n",
+        "Some PTransforms accept user-defined functions that apply custom 
logic, which you will learn in the *Advanced Transforms* notebook. The “P” 
stands for “parallel.”"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "vPhZG2GgPCW7"
+      },
+      "source": [
+        "## 2.1 Map\n",
+        "\n",
+        "One feature to use for the classifier to distinguish spam SMS from 
ham SMS is to compare the distribution of common words between the two 
categories. To find the common words for the two categories, we want to perform 
a frequency count of each word in the data set."
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "cR1YWyvQj1tm"
+      },
+      "source": [
+        "First, because the data set is read line by line, let's clean up the 
`PCollection` so that the label and the SMS is separated.\n",
+        "\n",
+        "To do so, we will use the transform `Map`, which takes a **function** 
and **maps it** to **each element** of the collection and transforms a single 
input `a` to a single output `b`.\n",
+        "\n",
+        "In this case, we will use `beam.Map` which takes in a lambda function 
and uses regex to split the line into a two item list: [label, SMS]. The lambda 
function we will use is `lambda line: line.split(\"\\t\")`, which splits each 
element of the `PCollection` by tab and putting them into a list.\n"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "Xey-I5BFvCiH"
+      },
+      "source": [
+        "Add a line of code between the `ReadFromText` and `WriteToText` 
transform that applies a `beam.Map` transform that takes in the function 
described above. Remember to add a short description for your transform!"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "hdrOsitSuj7z"
+      },
+      "outputs": [],
+      "source": [
+        "#@title Edit This Code\n",
+        "import re\n",
+        "\n",
+        "inputs_pattern = 'data/SMSSpamCollection'\n",
+        "\n",
+        "pipeline = beam.Pipeline()\n",
+        "\n",
+        "outputs = (\n",
+        "    pipeline\n",
+        "      | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)\n",
+        "      ...\n",
+        "      | 'Write results' >> beam.io.WriteToText(\"output2\", 
file_name_suffix = \".txt\")\n",
+        "      | 'Print the text file name' >> beam.Map(print)\n",
+        ")\n",
+        "\n",
+        "pipeline.run()\n",
+        "\n",
+        "! head output2*.txt"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "cellView": "form",
+        "id": "2k4e3j24wTzM"
+      },
+      "outputs": [],
+      "source": [
+        "#@title Answer\n",
+        "import re\n",
+        "\n",
+        "inputs_pattern = 'data/SMSSpamCollection'\n",
+        "\n",
+        "pipeline = beam.Pipeline()\n",
+        "\n",
+        "outputs = (\n",
+        "    pipeline\n",
+        "      | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)\n",
+        "      # ADDED\n",
+        "      | 'Separate to list' >> beam.Map(lambda line: 
line.split(\"\\t\"))\n",
+        "      | 'Write results' >> beam.io.WriteToText(\"ansoutput2\", 
file_name_suffix = \".txt\")\n",
+        "      | 'Print the text file name' >> beam.Map(print)\n",
+        ")\n",
+        "\n",
+        "pipeline.run()\n",
+        "\n",
+        "! head ansoutput2*.txt"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "ju8TXxBYwvzX"
+      },
+      "source": [
+        "## 2.2 Filter\n",
+        "\n",
+        "Now that we have a list separating the label and the SMS, let's first 
focus on only counting words with the **spam** label. In order to process 
certain elements while igorning others, we want to filter out specific elements 
in a collection using the transform `Filter`.\n",
+        "\n",
+        "`beam.Filter` takes in a function that checks a single element a, and 
returns True to keep the element, or False to discard it.\n",
+        "\n",
+        "In this case, we want `Filter` to return true if the list contains 
the label **spam**.\n",
+        "\n",
+        "We will use a lambda function again for this example, but this time, 
you will write the lambda function yourself. Add a line of code after your 
`beam.Map` transform to only return a `PCollection` that only contains lists 
with the label **spam**."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "a72v9SQ0zb5u"
+      },
+      "outputs": [],
+      "source": [
+        "#@title Edit This Code\n",
+        "import re\n",
+        "\n",
+        "inputs_pattern = 'data/SMSSpamCollection'\n",
+        "\n",
+        "pipeline = beam.Pipeline()\n",
+        "\n",
+        "outputs = (\n",
+        "    pipeline\n",
+        "      | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)\n",
+        "      | 'Separate to list' >> beam.Map(lambda line: 
line.split(\"\\t\"))\n",
+        "      ...\n",
+        "      | 'Write results' >> beam.io.WriteToText(\"ansoutput3\", 
file_name_suffix = \".txt\")\n",
+        "      | 'Print the text file name' >> beam.Map(print)\n",
+        ")\n",
+        "\n",
+        "pipeline.run()\n",
+        "\n",
+        "! head ansoutput3*.txt"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "cellView": "form",
+        "id": "BmvUeOztzCv0"
+      },
+      "outputs": [],
+      "source": [
+        "#@title Answer\n",
+        "import re\n",
+        "\n",
+        "inputs_pattern = 'data/SMSSpamCollection'\n",
+        "\n",
+        "pipeline = beam.Pipeline()\n",
+        "\n",
+        "outputs = (\n",
+        "    pipeline\n",
+        "      | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)\n",
+        "      | 'Separate to list' >> beam.Map(lambda line: 
line.split(\"\\t\"))\n",
+        "      # ADDED\n",
+        "      | 'Keep only spam' >> beam.Filter(lambda line: line[0] == 
\"spam\")\n",
+        "      | 'Write results' >> beam.io.WriteToText(\"ansoutput3\", 
file_name_suffix = \".txt\")\n",
+        "      | 'Print the text file name' >> beam.Map(print)\n",
+        ")\n",
+        "\n",
+        "pipeline.run()\n",
+        "\n",
+        "! head ansoutput3*.txt"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "v4EcJw45zy6Y"
+      },
+      "source": [
+        "## 2.3 FlatMap\n",
+        "\n",
+        "Now, that we know we only have SMS labelled spam, we now need to 
change the element such that instead of each element being a list containing 
the label and the SMS, each element is a word in the SMS.\n",
+        "\n",
+        "We can't use `Map`, since `Map` allows us to transform each 
individual element, but we can't change the number of elements with it.\n",
+        "\n",
+        "Instead, we want to map a function to each element of a collection. 
That function returns a list of output elements, so we would get a list of 
lists of elements. Then we want to flatten the list of lists into a single 
list.\n",
+        "\n",
+        "To do this, we will use `FlatMap`, which takes a **function** that 
transforms a single input `a` into an **iterable of outputs** `b`. But we get a 
**single collection** containing the outputs of all the elements. In this case, 
all these elements will be the words found in the SMS."
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "3V3w2cz11S_8"
+      },
+      "source": [
+        "Add a `FlatMap` transform that takes in the function `lambda line: 
re.findall(r\"[a-zA-Z']+\", line[1])` to your code below. The lambda function 
finds words by finding all elements in the SMS that match the specifications of 
the regex."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "USHHIPO91xDn"
+      },
+      "outputs": [],
+      "source": [
+        "#@title Edit This Code\n",
+        "import re\n",
+        "\n",
+        "inputs_pattern = 'data/SMSSpamCollection'\n",
+        "\n",
+        "pipeline = beam.Pipeline()\n",
+        "\n",
+        "outputs = (\n",
+        "    pipeline\n",
+        "      | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)\n",
+        "      | 'Separate to list' >> beam.Map(lambda line: 
line.split(\"\\t\"))\n",
+        "      | 'Keep only spam' >> beam.Filter(lambda line: line[0] == 
\"spam\")\n",
+        "      ...\n",
+        "      | 'Write results' >> beam.io.WriteToText(\"ansoutput3\", 
file_name_suffix = \".txt\")\n",
+        "      | 'Print the text file name' >> beam.Map(print)\n",
+        ")\n",
+        "\n",
+        "pipeline.run()\n",
+        "\n",
+        "! head ansoutput3*.txt"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "cellView": "form",
+        "id": "VFuRMIsN0Edn"
+      },
+      "outputs": [],
+      "source": [
+        "#@title Answer\n",
+        "import re\n",
+        "\n",
+        "inputs_pattern = 'data/SMSSpamCollection'\n",
+        "\n",
+        "pipeline = beam.Pipeline()\n",
+        "\n",
+        "outputs = (\n",
+        "    pipeline\n",
+        "      | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)\n",
+        "      | 'Separate to list' >> beam.Map(lambda line: 
line.split(\"\\t\"))\n",
+        "      | 'Keep only spam' >> beam.Filter(lambda line: line[0] == 
\"spam\")\n",
+        "      # ADDED\n",
+        "      | 'Find words' >> beam.FlatMap(lambda line: 
re.findall(r\"[a-zA-Z']+\", line[1]))\n",
+        "      | 'Write results' >> beam.io.WriteToText(\"ansoutput3\", 
file_name_suffix = \".txt\")\n",
+        "      | 'Print the text file name' >> beam.Map(print)\n",
+        ")\n",
+        "\n",
+        "pipeline.run()\n",
+        "\n",
+        "! head ansoutput3*.txt"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "BFJMyIyJ17lE"
+      },
+      "source": [
+        "## 2.4 Combine\n",
+        "\n",
+        "Now that each word is one element, we have to count up the elements. 
To do that we can use 
[aggregation](https://beam.apache.org/documentation/transforms/python/overview/)
 transforms, specifically `CombinePerKey` in this instance which transforms an 
iterable of inputs a, and returns a single output a based on their key.\n",
+        "\n",
+        "Before using `CombinePerKey` however, we have to associate each word 
with a numerical value to then combine them.\n",
+        "\n",
+        "To do this, we add `| 'Pair words with 1' >> beam.Map(lambda word: 
(word, 1))` to the `Pipeline`, which associates each word with the numerical 
value 1.\n",
+        "\n",
+        "With each word assigned to a numerical value, we can now combine 
these numerical values to sum up all the counts of each word. Like the past 
transforms, `CombinePerKey` takes in a function and applies it to each element 
of the `PCollection`.\n",
+        "\n",
+        "However, instead of writing our own lambda function, we can use pass 
one of Beam's built-in function `sum` into `CombinePerKey`."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "Yw_3dfnzI2xA"
+      },
+      "outputs": [],
+      "source": [
+        "#@title Edit This Code\n",
+        "import re\n",
+        "\n",
+        "inputs_pattern = 'data/SMSSpamCollection'\n",
+        "\n",
+        "pipeline = beam.Pipeline()\n",
+        "\n",
+        "outputs = (\n",
+        "    pipeline\n",
+        "      | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)\n",
+        "      | 'Separate to list' >> beam.Map(lambda line: 
line.split(\"\\t\"))\n",
+        "      | 'Keep only spam' >> beam.Filter(lambda line: line[0] == 
\"spam\")\n",
+        "      | 'Find words' >> beam.FlatMap(lambda line: 
re.findall(r\"[a-zA-Z']+\", line[1]))\n",
+        "      | 'Pair words with 1' >> beam.Map(lambda word: (word, 1))\n",
+        "      ...\n",
+        "      | 'Write results' >> beam.io.WriteToText(\"ansoutput4\", 
file_name_suffix = \".txt\")\n",
+        "      | 'Print the text file name' >> beam.Map(print)\n",
+        ")\n",
+        "\n",
+        "pipeline.run()\n",
+        "\n",
+        "! head ansoutput4*.txt"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "cellView": "form",
+        "id": "qDyh9_faIkCo"
+      },
+      "outputs": [],
+      "source": [
+        "#@title Answer\n",
+        "import re\n",
+        "\n",
+        "inputs_pattern = 'data/SMSSpamCollection'\n",
+        "\n",
+        "pipeline = beam.Pipeline()\n",
+        "\n",
+        "outputs = (\n",
+        "    pipeline\n",
+        "      | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)\n",
+        "      | 'Separate to list' >> beam.Map(lambda line: 
line.split(\"\\t\"))\n",
+        "      | 'Keep only spam' >> beam.Filter(lambda line: line[0] == 
\"spam\")\n",
+        "      | 'Find words' >> beam.FlatMap(lambda line: 
re.findall(r\"[a-zA-Z']+\", line[1]))\n",
+        "      | 'Pair words with 1' >> beam.Map(lambda word: (word, 1))\n",
+        "      # ADDED\n",
+        "      | 'Group and sum' >> beam.CombinePerKey(sum)\n",
+        "      | 'Write results' >> beam.io.WriteToText(\"ansoutput4\", 
file_name_suffix = \".txt\")\n",
+        "      | 'Print the text file name' >> beam.Map(print)\n",
+        ")\n",
+        "\n",
+        "pipeline.run()\n",
+        "\n",
+        "! head ansoutput4*.txt"
+      ]
+    },
+    {
+      "attachments": {},
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "L9zNI4LMJHur"
+      },
+      "source": [
+        "And we finished! Now that we have a count of all the words to gain 
better understanding about our dataset."
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "BozCqbzUPItB"
+      },
+      "source": [
+        "\n",
+        "\n",
+        "---\n",
+        "\n",
+        "\n",
+        "\n",
+        "---\n",
+        "\n"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "Cv9508k4MeS4"
+      },
+      "source": [
+        "# Full Spam Ham Apache Beam Example\n",
+        "\n",
+        "Below is a summary of all the code we performed for your convenience. 
Note you do not need to explicitly call run with the with statement."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "8iWWy3Y7Msxm"
+      },
+      "outputs": [],
+      "source": [
+        "!pip install --quiet apache-beam\n",
+        "!mkdir -p data\n",
+        "!gsutil cp gs://apachebeamdt/SMSSpamCollection data/"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "43oijI4BCAW1"
+      },
+      "outputs": [],
+      "source": [
+        "import apache_beam as beam\n",
+        "import re\n",
+        "\n",
+        "inputs_pattern = 'data/SMSSpamCollection'\n",
+        "outputs_prefix_ham = 'outputs/fullcodeham'\n",
+        "outputs_prefix_spam = 'outputs/fullcodespam'\n",
+        "\n",
+        "# Ham Word Count\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "     ham = (\n",
+        "      pipeline\n",
+        "      | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)\n",
+        "      | 'Separate to list' >> beam.Map(lambda line: 
line.split(\"\\t\"))\n",
+        "      | 'Keep only ham' >> beam.Filter(lambda line: line[0] == 
\"ham\")\n",
+        "      | 'Find words' >> beam.FlatMap(lambda line: 
re.findall(r\"[a-zA-Z']+\", line[1]))\n",
+        "      | 'Pair words with 1' >> beam.Map(lambda word: (word, 1))\n",
+        "      | 'Group and sum' >> beam.CombinePerKey(sum)\n",
+        "      | 'Format results' >> beam.Map(lambda word_c: str(word_c))\n",
+        "      | 'Write results' >> beam.io.WriteToText(outputs_prefix_ham, 
file_name_suffix = \".txt\")\n",
+        "    )\n",
+        "\n",
+        "# Spam Word Count\n",
+        "with beam.Pipeline() as pipeline1:\n",
+        "  spam = (\n",
+        "    pipeline1\n",
+        "    | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)\n",
+        "    | 'Separate to list' >> beam.Map(lambda line: 
line.split(\"\\t\"))\n",
+        "    | 'Filter out only spam' >> beam.Filter(lambda line: line[0] == 
\"spam\")\n",
+        "    | 'Find words' >> beam.FlatMap(lambda line: 
re.findall(r\"[a-zA-Z']+\", line[1]))\n",
+        "    | 'Pair words with 1' >> beam.Map(lambda word: (word, 1))\n",
+        "    | 'Group and sum' >> beam.CombinePerKey(sum)\n",
+        "    | 'Format results' >> beam.Map(lambda word_c: str(word_c))\n",
+        "    | 'Write results' >> beam.io.WriteToText(outputs_prefix_spam, 
file_name_suffix = \".txt\")\n",
+        "    )\n",
+        "\n",
+        "print('Ham Word Count Head')\n",
+        "! head outputs/fullcodeham*.txt\n",
+        "\n",
+        "print('Spam Word Count Head')\n",
+        "! head outputs/fullcodespam*.txt"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "FNg53x4fyXOO"
+      },
+      "source": [
+        "**One more thing: you can also visualize the Beam pipelines!**\n",
+        "\n",
+        "Check [this 
example](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/interactive/examples/Interactive%20Beam%20Example.ipynb)
 to learn more about the interatvive Beam."

Review Comment:
   ```suggestion
           "Check [this 
example](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/interactive/examples/Interactive%20Beam%20Example.ipynb)
 to learn more about the interactive Beam."
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to