liferoad commented on code in PR #25719:
URL: https://github.com/apache/beam/pull/25719#discussion_r1136270327
##########
examples/notebooks/get-started/learn_beam_basics_by_doing.ipynb:
##########
@@ -0,0 +1,1068 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "colab_type": "text",
+ "id": "view-in-github"
+ },
+ "source": [
+ "<a
href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/get-started/learn_beam_basics_by_doing.ipynb\"
target=\"_parent\"><img
src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In
Colab\"/></a>"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "V913rQcmLS72"
+ },
+ "source": [
+ "# Welcome to Apache Beam!\n",
+ "\n",
+ "This notebook will be your introductory guide to Beam's main concepts
and its uses. This tutorial **does not** assume any prior Apache Beam
knowledge.\n",
+ "\n",
+ "We'll cover what Beam is, what it does, and a few basic
transforms!\n",
+ "\n",
+ "We aim to give you familiarity with:\n",
+ "- Creating a `Pipeline`\n",
+ "- Creating a `PCollection`\n",
+ "- Performing basic `PTransforms`\n",
+ " - Map\n",
+ " - Filter\n",
+ " - FlatMap\n",
+ " - Combine\n",
+ "- Applications\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "Lduh-9oXt3P_"
+ },
+ "source": [
+ "## How To Approach This Tutorial\n",
+ "\n",
+ "This tutorial was designed for someone who likes to **learn by
doing**. As such, there will be opportunities for you to practice writing your
own code in these notebooks with the answer hidden in a cell below.\n",
+ "\n",
+ "Codes that require editing will be with an `...` and each cell title
will say `Edit This Code`. However, you are free to play around with the other
cells if you would like to add something beyond our tutorial.\n",
+ "\n",
+ "It may be tempting to just copy and paste solutions, but even if you
do look at the Answer cells, try typing out the solutions manually. The muscle
memory will be very helpful.\n",
+ "\n",
+ "> Tip: For those who would like to learn concepts more from the
ground up, check out these
[notebooks](https://beam.apache.org/get-started/tour-of-beam/)!"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "42B-64Lvef3K"
+ },
+ "source": [
+ "## Prerequisites\n",
+ "\n",
+ "We'll assume you have familiarity with Python or Pandas, but you
should be able to follow along even if you’re coming from a different
programming language. We'll also assume you understand programming concepts
like functions, objects, arrays, and dictionaries.\n",
+ "\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "SeDD0nardXyL"
+ },
+ "source": [
+ "## Running CoLab\n",
+ "\n",
+ "To navigate through different sections, use the table of contents.
From View drop-down list, select Table of contents.\n",
+ "\n",
+ "To run a code cell, you can click the Run cell button at the top left
of the cell, or by select it and press `Shift+Enter`. Try modifying a code cell
and re-running it to see what happens."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "UdMPnMDDkGc8"
+ },
+ "source": [
+ "To begin, we have to set up our environment. Let's install and import
Apache Beam by running the cell below."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "5cPHukaOgDDM"
+ },
+ "outputs": [],
+ "source": [
+ "# Remember: You can press shift+enter to run this cell\n",
+ "!pip install --quiet apache-beam\n",
+ "import apache_beam as beam"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "30l8_MD-undP"
+ },
+ "outputs": [],
+ "source": [
+ "# Set the logging level to reduce verbose information\n",
+ "import logging\n",
+ "\n",
+ "logging.root.setLevel(logging.ERROR)\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "gyoo1gLKtZmU"
+ },
+ "source": [
+ "\n",
+ "\n",
+ "---\n",
+ "\n",
+ "\n",
+ "\n",
+ "---\n",
+ "\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "N29KJTfdMCtB"
+ },
+ "source": [
+ "# What is Apache Beam?"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "B2evhzEuMu8a"
+ },
+ "source": [
+ "Apache Beam is a library for data processing. It is often used for
[Extract-Transform-Load
(ETL)](https://en.wikipedia.org/wiki/Extract,_transform,_load) jobs, where
we:\n",
+ "1. *Extract* from a data source\n",
+ "2. *Transform* that data\n",
+ "3. *Load* that data into a data sink (like a database)\n",
+ "\n",
+ "Apache Beam makes these jobs easy with the ability to process
everything at the same time and its unified model and open-source SDKs. There
are many more parts of Beam, but throughout these tutorials, we will break down
each part to show you how they will all fit together.\n",
+ "\n",
+ "For this tutorial, you will use these Beam SDKs to build your own
`Pipeline` to process your data.\n",
+ "\n",
+ "Below, we will run through creating the heart of the `Pipeline`.
There are three main abstractions in Beam:\n",
+ "1. `Pipeline`\n",
+ "2. `PCollection`\n",
+ "3. `PTransform`"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "WecDJbfqpWb1"
+ },
+ "source": [
+ "\n",
+ "\n",
+ "---\n",
+ "\n",
+ "\n",
+ "---\n",
+ "\n",
+ "\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "FditMJAIAp9Q"
+ },
+ "source": [
+ "# 1. Design Your Pipeline"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "_xYj6e3Hvt1W"
+ },
+ "source": [
+ "## 1.1 What is a Pipeline"
+ ]
+ },
+ {
+ "attachments": {},
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "bVJCh9FLBtU9"
+ },
+ "source": [
+ "A `Pipeline` describes the whole cycle of your data processing task,
starting from the data sources to the processing transforms you will apply to
them until your desired output.\n",
+ "\n",
+ "`Pipeline` is responsible for reading, processing, and saving the
data. \n",
+ "Each `PTransform` is done on or outputs a `PCollection`, and this
process is done in your `Pipeline`.\n",
+ "More glossary details can be found at
[here](https://beam.apache.org/documentation/glossary/).\n",
+ "\n",
+ "A diagram of this process is shown below:\n",
+ "\n",
+ "<img
src='https://beam.apache.org/images/design-your-pipeline-linear.svg'>"
+ ]
+ },
+ {
+ "attachments": {},
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "cyn8_mB6zN4m"
+ },
+ "source": [
+ "In code, this process will look like this:\n",
+ "\n",
+ "\n",
+ "```\n",
+ "# Each `step` represents a specific transform. After `step3`, it will
save the data reference to `outputs`.\n",
+ "outputs = pipeline | step1 | step2 | step3\n",
+ "```\n",
+ "\n",
+ ">The pipe operator `|` applies the `PTransform` on the right side of
the pipe to the input `PCollection`.\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "gUyk2UWypI7g"
+ },
+ "source": [
+ "Pipelines can quickly grow long, so it's sometimes easier to read if
we surround them with parentheses and break them into multiple lines.\n",
+ "\n",
+ "```\n",
+ "# This is equivalent to the example above.\n",
+ "outputs = (\n",
+ " pipeline\n",
+ " | step1\n",
+ " | step2\n",
+ " | step3\n",
+ ")\n",
+ "```\n",
+ "\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "7hmlbjPlvZVY"
+ },
+ "source": [
+ "Sometimes, the transform names aren't very descriptive. Beam allows
each transform, or step, to have a unique label, or description. This makes it
a lot easier to debug, and it's in general a good practice to start.\n",
+ "\n",
+ "> You can use the right shift operator `>>` to add a label to your
transforms, like `'My description' >> MyTransform`.\n",
+ "\n",
+ "```\n",
+ "# Try to give short but descriptive labels.\n",
+ "# These serve both as comments and help debug later on.\n",
+ "outputs = (\n",
+ " pipeline\n",
+ " | 'First step' >> step1\n",
+ " | 'Second step' >> step2\n",
+ " | 'Third step' >> step3\n",
+ ")\n",
+ "```"
+ ]
+ },
+ {
+ "attachments": {},
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "GXQ__Kwxvr3J"
+ },
+ "source": [
+ "## 1.2 Loading Our Data\n",
+ "\n",
+ "Now, you can try to write your own pipeline!\n",
+ "\n",
+ "First, let's load the example data we will be using throughout this
tutorial into our file directory. This
[dataset](https://archive.ics.uci.edu/ml/datasets/SMS+Spam+Collection) consists
of a **collection of SMS messages in English tagged as either \"spam\" or
\"ham\" (a legitimate SMS\n",
+ ")**.\n",
+ "\n",
+ "For this tutorial, we will create a pipeline to **explore the dataset
using Beam to count words in SMS messages that contain spam or ham**."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "BkZ0wxsBPuvO"
+ },
+ "outputs": [],
+ "source": [
+ "# Creates a data directory with our dataset SMSSpamCollection\n",
+ "!mkdir -p data\n",
+ "!gsutil cp gs://apachebeamdt/SMSSpamCollection data/"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "TiHFDIMnLTbm"
+ },
+ "source": [
+ "**What does the data look like?**\n",
+ "\n",
+ "This dataset is a `txt` file with 5,574 rows and 4 columns recording
the following attributes:\n",
+ "1. `Column 1`: The label (either `ham` or `spam`)\n",
+ "2. `Column 2`: The SMS as raw text (type `string`)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "! head data/SMSSpamCollection"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "r-0WM38KNaTI"
+ },
+ "source": [
+ "## 1.3 Writing Our Own Pipeline\n",
+ "\n",
+ "Now that we understand our dataset, let's go into creating our
pipeline.\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "1Ajz7-WQCVfj"
+ },
+ "source": [
+ "To initialize a `Pipeline`, you first assign your pipeline
`beam.Pipeline()` to a name. Assign your pipeline to the name, `pipeline`, in
the code cell below."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "cXm0n80E9ZwT"
+ },
+ "outputs": [],
+ "source": [
+ "#@title Edit This Code Cell\n",
+ "..."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "cellView": "form",
+ "id": "q0gzEcos9hjp"
+ },
+ "outputs": [],
+ "source": [
+ "#@title Answer\n",
+ "pipeline = beam.Pipeline()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "-K6tpdVK9KzC"
+ },
+ "source": [
+ "This pipeline will be where we create our transformed `PCollection`.
In Beam, your data lives in a `PCollection`, which stands for `Parallel
Collection`.\n",
+ "\n",
+ "A **PCollection** is like a list of elements, but without any order
guarantees. This allows Beam to easily parallelize and distribute the
`PCollection`'s elements.\n",
+ "\n",
+ "Now, let's use one of Beam's `Read` transforms to turn our text file
(our dataset) into a `PCollection`."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "MS0XWqTiAIlE"
+ },
+ "source": [
+ "## 1.4 Reading from Text File\n",
+ "\n",
+ "We can use the\n",
+
"[`ReadFromText`](https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.ReadFromText)\n",
+ "transform to read text files into `str` elements.\n",
+ "\n",
+ "It takes a\n",
+ "[_glob
pattern_](https://en.wikipedia.org/wiki/Glob_%28programming%29)\n",
+ "as an input, and reads all the files that match that pattern. For
example, in the pattern `data/*.txt`, the `*` is a wildcard that matches
anything. This pattern matches all the files in the `data/` directory with a
`.txt` extension. It then **returns one element for each line** in the file.\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "PXUSzLhhBwdr"
+ },
+ "source": [
+ "Because we only want this pipeline to read the `SMSSpamCollection`
file that's in the `data/` directory, we will specify the input pattern to be
`'data/SMSSpamCollection'`.\n",
+ "\n",
+ "We will then use that input pattern with our transform
`beam.io.ReadFromText()` and apply it onto our pipeline. The
`beam.io.ReadFromText()` transform can take in an input pattern as an input."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "HYs5697QEArB"
+ },
+ "outputs": [],
+ "source": [
+ "#@title Hint\n",
+ "# If you get stuck on the syntax, use the Table of Contents to
navigate to 1.1.1 What is a Pipeline and reread that section."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "p5z6C65tEmtM"
+ },
+ "outputs": [],
+ "source": [
+ "#@title Edit This Code\n",
+ "\n",
+ "inputs_pattern = 'data/SMSSpamCollection'\n",
+ "\n",
+ "outputs = (\n",
+ " pipeline\n",
+ " | beam.io.ReadFromText(inputs_pattern)\n",
+ ")"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "cellView": "form",
+ "id": "zC9blgIXBv5A"
+ },
+ "outputs": [],
+ "source": [
+ "#@title Answer\n",
+ "inputs_pattern = 'data/SMSSpamCollection'\n",
+ "\n",
+ "# Remember: | is the apply function in Beam in Python\n",
+ "outputs = (\n",
+ " pipeline\n",
+ " # Remember to add short descriptions to your transforms for
good practice and easier understanding\n",
+ " | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)\n",
+ ")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "ISm7PF9wE7Ts"
+ },
+ "source": [
+ "## 1.5 Writing to Text File\n",
+ "\n",
+ "Now, how do we know if we did it correctly? Let's take a look at the
text file you just read.\n",
+ "\n",
+ "You may have noticed that we can't simply `print` the output
`PCollection` to see the elements. In Beam, you can __NOT__ access the elements
from a `PCollection` directly like a Python list.\n",
+ "\n",
+ "This is because, depending on the runner,\n",
+ "the `PCollection` elements might live in multiple worker machines.\n",
+ "\n",
+ "However, we can see our output `PCollection` by using a
[`WriteToText`](https://beam.apache.org/releases/pydoc/2.27.0/apache_beam.io.textio.html#apache_beam.io.textio.WriteToText)
transform to turn our `str` elements into a `txt` file (or another file type
of your choosing) and then running a command to show the head of our output
file."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "iz73k9d1IUGw"
+ },
+ "source": [
+ "> `beam.io.WriteToText` takes a _file path prefix_ as an input, and
it writes the all `str` elements into one or more files with filenames starting
with that prefix.\n",
+ "> You can optionally pass a `file_name_suffix` as well, usually used
for the file extension.\n",
+ "> Each element goes into its own line in the output files."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "D80HrbeKFxCv"
+ },
+ "source": [
+ "Now, you can try it. Save the results to a file path prefix
`'output'` and make the file_name_suffix `'.txt'`"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "v6RvVVINKOt9"
+ },
+ "outputs": [],
+ "source": [
+ "#@title Edit This Code\n",
+ "inputs_pattern = 'data/SMSSpamCollection'\n",
+ "\n",
+ "# Remember: | is the apply function in Beam in Python\n",
+ "outputs = (\n",
+ " pipeline\n",
+ " | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)\n",
+ " | 'Write results' >> beam.io.WriteToText(..., file_name_suffix
= ...)\n",
+ " # To see the results from the previous transform\n",
+ " | 'Print the text file name' >> beam.Map(print) # or
beam.LogElements()\n",
+ ")\n",
+ "\n",
+ "# To run the pipeline\n",
+ "pipeline.run()\n",
+ "\n",
+ "# The command used to view your output txt file.\n",
+ "# If you choose to save the file path prefix to a different location
or change the file type,\n",
+ "# you have to update this command as well.\n",
+ "! head output*.txt"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "cellView": "form",
+ "id": "nRYxclPaJArp"
+ },
+ "outputs": [],
+ "source": [
+ "#@title Answer\n",
+ "inputs_pattern = 'data/SMSSpamCollection'\n",
+ "\n",
+ "outputs = (\n",
+ " pipeline\n",
+ " | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)\n",
+ " # ADDED\n",
+ " | 'Write results' >> beam.io.WriteToText(\"ansoutput1\",
file_name_suffix = \".txt\")\n",
+ " | 'Print the text file name' >> beam.Map(print)\n",
+ ")\n",
+ "\n",
+ "pipeline.run()\n",
+ "\n",
+ "# The file this data is saved to is called \"ansoutput1\" as seen in
the WriteToText transform.\n",
+ "# The command below and the transform input should match.\n",
+ "! head ansoutput1*.txt"
+ ]
+ },
+ {
+ "attachments": {},
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "iie23Id-O33B"
+ },
+ "source": [
+ "# 2. PTransforms\n",
+ "\n",
+ "Now that we have read in our code, we can now process our data to
explore the text messages that could be classified as spam or non-spam
(ham).\n",
+ "\n",
+ "In order to achieve this, we need to use `PTransforms`.\n",
+ "\n",
+ "A **`PTransform`** is any data processing operation that performs a
processing function on one or more `PCollection`, outputting zero or more
`PCollection`.\n",
+ "\n",
+ "Some PTransforms accept user-defined functions that apply custom
logic, which you will learn in the *Advanced Transforms* notebook. The “P”
stands for “parallel.”"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "vPhZG2GgPCW7"
+ },
+ "source": [
+ "## 2.1 Map\n",
+ "\n",
+ "One feature to use for the classifier to distinguish spam SMS from
ham SMS is to compare the distribution of common words between the two
categories. To find the common words for the two categories, we want to perform
a frequency count of each word in the data set."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "cR1YWyvQj1tm"
+ },
+ "source": [
+ "First, because the data set is read line by line, let's clean up the
`PCollection` so that the label and the SMS is separated.\n",
+ "\n",
+ "To do so, we will use the transform `Map`, which takes a **function**
and **maps it** to **each element** of the collection and transforms a single
input `a` to a single output `b`.\n",
+ "\n",
+ "In this case, we will use `beam.Map` which takes in a lambda function
and uses regex to split the line into a two item list: [label, SMS]. The lambda
function we will use is `lambda line: line.split(\"\\t\")`, which splits each
element of the `PCollection` by tab and putting them into a list.\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "Xey-I5BFvCiH"
+ },
+ "source": [
+ "Add a line of code between the `ReadFromText` and `WriteToText`
transform that applies a `beam.Map` transform that takes in the function
described above. Remember to add a short description for your transform!"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "hdrOsitSuj7z"
+ },
+ "outputs": [],
+ "source": [
+ "#@title Edit This Code\n",
+ "import re\n",
+ "\n",
+ "inputs_pattern = 'data/SMSSpamCollection'\n",
+ "\n",
+ "outputs = (\n",
+ " pipeline\n",
+ " | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)\n",
+ " ...\n",
+ " | 'Write results' >> beam.io.WriteToText(\"output2\",
file_name_suffix = \".txt\")\n",
+ " | 'Print the text file name' >> beam.Map(print)\n",
+ ")\n",
+ "\n",
+ "pipeline.run()\n",
+ "\n",
+ "! head output2*.txt"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "cellView": "form",
+ "id": "2k4e3j24wTzM"
+ },
+ "outputs": [],
+ "source": [
+ "#@title Answer\n",
+ "import re\n",
+ "\n",
+ "inputs_pattern = 'data/SMSSpamCollection'\n",
+ "\n",
+ "outputs = (\n",
+ " pipeline\n",
+ " | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)\n",
+ " # ADDED\n",
+ " | 'Separate to list' >> beam.Map(lambda line:
line.split(\"\\t\"))\n",
+ " | 'Write results' >> beam.io.WriteToText(\"ansoutput2\",
file_name_suffix = \".txt\")\n",
+ " | 'Print the text file name' >> beam.Map(print)\n",
+ ")\n",
+ "\n",
+ "pipeline.run()\n",
+ "\n",
+ "! head ansoutput2*.txt"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "ju8TXxBYwvzX"
+ },
+ "source": [
+ "## 2.2 Filter\n",
+ "\n",
+ "Now that we have a list separating the label and the SMS, let's first
focus on only counting words with the **spam** label. In order to process
certain elements while igorning others, we want to filter out specific elements
in a collection using the transform `Filter`.\n",
+ "\n",
+ "`beam.Filter` takes in a function that checks a single element a, and
returns True to keep the element, or False to discard it.\n",
+ "\n",
+ "In this case, we want `Filter` to return true if the list contains
the label **spam**.\n",
+ "\n",
+ "We will use a lambda function again for this example, but this time,
you will write the lambda function yourself. Add a line of code after your
`beam.Map` transform to only return a `PCollection` that only contains lists
with the label **spam**."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "a72v9SQ0zb5u"
+ },
+ "outputs": [],
+ "source": [
+ "#@title Edit This Code\n",
+ "import re\n",
+ "\n",
+ "inputs_pattern = 'data/SMSSpamCollection'\n",
+ "\n",
+ "outputs = (\n",
+ " pipeline\n",
+ " | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)\n",
+ " | 'Separate to list' >> beam.Map(lambda line:
line.split(\"\\t\"))\n",
+ " ...\n",
+ " | 'Write results' >> beam.io.WriteToText(\"ansoutput3\",
file_name_suffix = \".txt\")\n",
+ " | 'Print the text file name' >> beam.Map(print)\n",
+ ")\n",
+ "\n",
+ "pipeline.run()\n",
+ "\n",
+ "! head ansoutput3*.txt"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "cellView": "form",
+ "id": "BmvUeOztzCv0"
+ },
+ "outputs": [],
+ "source": [
+ "#@title Answer\n",
+ "import re\n",
+ "\n",
+ "inputs_pattern = 'data/SMSSpamCollection'\n",
+ "\n",
+ "outputs = (\n",
+ " pipeline\n",
+ " | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)\n",
+ " | 'Separate to list' >> beam.Map(lambda line:
line.split(\"\\t\"))\n",
+ " # ADDED\n",
+ " | 'Keep only spam' >> beam.Filter(lambda line: line[0] ==
\"spam\")\n",
+ " | 'Write results' >> beam.io.WriteToText(\"ansoutput3\",
file_name_suffix = \".txt\")\n",
+ " | 'Print the text file name' >> beam.Map(print)\n",
+ ")\n",
+ "\n",
+ "pipeline.run()\n",
+ "\n",
+ "! head ansoutput3*.txt"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "v4EcJw45zy6Y"
+ },
+ "source": [
+ "## 2.3 FlatMap\n",
+ "\n",
+ "Now, that we know we only have SMS labelled spam, we now need to
change the element such that instead of each element being a list containing
the label and the SMS, each element is a word in the SMS.\n",
+ "\n",
+ "We can't use `Map`, since `Map` allows us to transform each
individual element, but we can't change the number of elements with it.\n",
+ "\n",
+ "Instead, we want to map a function to each element of a collection.
That function returns a list of output elements, so we would get a list of
lists of elements. Then we want to flatten the list of lists into a single
list.\n",
+ "\n",
+ "To do this, we will use `FlatMap`, which takes a **function** that
transforms a single input a into an **iterable of outputs** b. But we get a
**single collection** containing the outputs of all the elements. In this case,
all these elements will be the words found in the SMS."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "3V3w2cz11S_8"
+ },
+ "source": [
+ "Add a `FlatMap` transform that takes in the function `lambda line:
re.findall(r\"[a-zA-Z']+\", line[1])` to your code below. The lambda function
finds words by finding all elements in the SMS that match the specifications of
the regex."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "USHHIPO91xDn"
+ },
+ "outputs": [],
+ "source": [
+ "#@title Edit This Code\n",
+ "import re\n",
+ "\n",
+ "inputs_pattern = 'data/SMSSpamCollection'\n",
+ "\n",
+ "outputs = (\n",
+ " pipeline\n",
+ " | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)\n",
+ " | 'Separate to list' >> beam.Map(lambda line:
line.split(\"\\t\"))\n",
+ " | 'Keep only spam' >> beam.Filter(lambda line: line[0] ==
\"spam\")\n",
+ " ...\n",
+ " | 'Write results' >> beam.io.WriteToText(\"ansoutput3\",
file_name_suffix = \".txt\")\n",
+ " | 'Print the text file name' >> beam.Map(print)\n",
+ ")\n",
+ "\n",
+ "pipeline.run()\n",
+ "\n",
+ "! head ansoutput3*.txt"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "cellView": "form",
+ "id": "VFuRMIsN0Edn"
+ },
+ "outputs": [],
+ "source": [
+ "#@title Answer\n",
+ "import re\n",
+ "\n",
+ "inputs_pattern = 'data/SMSSpamCollection'\n",
+ "\n",
+ "outputs = (\n",
+ " pipeline\n",
+ " | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)\n",
+ " | 'Separate to list' >> beam.Map(lambda line:
line.split(\"\\t\"))\n",
+ " | 'Keep only spam' >> beam.Filter(lambda line: line[0] ==
\"spam\")\n",
+ " # ADDED\n",
+ " | 'Find words' >> beam.FlatMap(lambda line:
re.findall(r\"[a-zA-Z']+\", line[1]))\n",
+ " | 'Write results' >> beam.io.WriteToText(\"ansoutput3\",
file_name_suffix = \".txt\")\n",
+ " | 'Print the text file name' >> beam.Map(print)\n",
+ ")\n",
+ "\n",
+ "pipeline.run()\n",
+ "\n",
+ "! head ansoutput3*.txt"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "BFJMyIyJ17lE"
+ },
+ "source": [
+ "## 2.4 Combine\n",
+ "\n",
+ "Now that each word is one element, we have to count up the elements.
To do that we can use
[aggregation](https://beam.apache.org/documentation/transforms/python/overview/)
transforms, specifically `CombinePerKey` in this instance which transforms an
iterable of inputs a, and returns a single output a based on their key.\n",
+ "\n",
+ "Before using `CombinePerKey` however, we have to associate each word
with a numerical value to then combine them.\n",
+ "\n",
+ "To do this, we add `| 'Pair words with 1' >> beam.Map(lambda word:
(word, 1))` to the `Pipeline`, which associates each word with the numerical
value 1.\n",
+ "\n",
+ "With each word assigned to a numerical value, we can now combine
these numerical values to sum up all the counts of each word. Like the past
transforms, `CombinePerKey` takes in a function and applies it to each element
of the `PCollection`.\n",
+ "\n",
+ "However, instead of writing our own lambda function, we can use pass
one of Beam's built-in function `sum` into `CombinePerKey`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "Yw_3dfnzI2xA"
+ },
+ "outputs": [],
+ "source": [
+ "#@title Edit This Code\n",
+ "import re\n",
+ "\n",
+ "inputs_pattern = 'data/SMSSpamCollection'\n",
+ "\n",
+ "outputs = (\n",
+ " pipeline\n",
+ " | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)\n",
+ " | 'Separate to list' >> beam.Map(lambda line:
line.split(\"\\t\"))\n",
+ " | 'Keep only spam' >> beam.Filter(lambda line: line[0] ==
\"spam\")\n",
+ " | 'Find words' >> beam.FlatMap(lambda line:
re.findall(r\"[a-zA-Z']+\", line[1]))\n",
+ " | 'Pair words with 1' >> beam.Map(lambda word: (word, 1))\n",
+ " ...\n",
+ " | 'Write results' >> beam.io.WriteToText(\"ansoutput4\",
file_name_suffix = \".txt\")\n",
+ " | 'Print the text file name' >> beam.Map(print)\n",
+ ")\n",
+ "\n",
+ "pipeline.run()\n",
+ "\n",
+ "! head ansoutput4*.txt"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "cellView": "form",
+ "id": "qDyh9_faIkCo"
+ },
+ "outputs": [],
+ "source": [
+ "#@title Answer\n",
+ "import re\n",
+ "\n",
+ "inputs_pattern = 'data/SMSSpamCollection'\n",
+ "\n",
+ "outputs = (\n",
+ " pipeline\n",
+ " | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)\n",
+ " | 'Separate to list' >> beam.Map(lambda line:
line.split(\"\\t\"))\n",
+ " | 'Keep only spam' >> beam.Filter(lambda line: line[0] ==
\"spam\")\n",
+ " | 'Find words' >> beam.FlatMap(lambda line:
re.findall(r\"[a-zA-Z']+\", line[1]))\n",
+ " | 'Pair words with 1' >> beam.Map(lambda word: (word, 1))\n",
+ " # ADDED\n",
+ " | 'Group and sum' >> beam.CombinePerKey(sum)\n",
+ " | 'Write results' >> beam.io.WriteToText(\"ansoutput4\",
file_name_suffix = \".txt\")\n",
+ " | 'Print the text file name' >> beam.Map(print)\n",
+ ")\n",
+ "\n",
+ "pipeline.run()\n",
+ "\n",
+ "! head ansoutput4*.txt"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "L9zNI4LMJHur"
+ },
+ "source": [
+ "And we finished! Now that we have a count of all the words in our
data set, it makes it much easier for us to use this feature to create a
logistic regression model to detect spam texts."
Review Comment:
I also changed the words to avoid feature engineering.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]