liferoad commented on code in PR #26185:
URL: https://github.com/apache/beam/pull/26185#discussion_r1181804168


##########
examples/notebooks/get-started/learn_beam_windowing_by_doing.ipynb:
##########
@@ -0,0 +1,1836 @@
+{
+  "cells": [
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "colab_type": "text",
+        "id": "view-in-github"
+      },
+      "source": [
+        "<a 
href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/get-started/learn_beam_windowing_by_doing.ipynb\";
 target=\"_parent\"><img 
src=\"https://colab.research.google.com/assets/colab-badge.svg\"; alt=\"Open In 
Colab\"/></a>"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "cellView": "form",
+        "id": "L7ZbRufePd2g"
+      },
+      "outputs": [],
+      "source": [
+        "#@title ###### Licensed to the Apache Software Foundation (ASF), 
Version 2.0 (the \"License\")\n",
+        "\n",
+        "# Licensed to the Apache Software Foundation (ASF) under one\n",
+        "# or more contributor license agreements. See the NOTICE file\n",
+        "# distributed with this work for additional information\n",
+        "# regarding copyright ownership. The ASF licenses this file\n",
+        "# to you under the Apache License, Version 2.0 (the\n",
+        "# \"License\"); you may not use this file except in compliance\n",
+        "# with the License. You may obtain a copy of the License at\n",
+        "#\n",
+        "#   http://www.apache.org/licenses/LICENSE-2.0\n";,
+        "#\n",
+        "# Unless required by applicable law or agreed to in writing,\n",
+        "# software distributed under the License is distributed on an\n",
+        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+        "# KIND, either express or implied. See the License for the\n",
+        "# specific language governing permissions and limitations\n",
+        "# under the License."
+      ]
+    },
+    {
+      "attachments": {},
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "83TJhNxLD7-W"
+      },
+      "source": [
+        " # **Introduction to Windowing for Batch Processing in Apache 
Beam**\n",
+        "\n",
+        "In this notebook, we will learn the fundamentals of **batch 
processing** as we walk through a few introductory examples in Beam. The 
pipelines in these examples process real-world data for air quality levels in 
India between 2017 and 2022.\n",
+        "\n",
+        "After this tutorial you should have a basic understanding of the 
following:\n",
+        "\n",
+        "*   What is **batch vs. stream** data processing?\n",
+        "*   How can I use Beam to run a **simple batch analysis job**?\n",
+        "*   How can I use Beam's **windowing features** to process only 
certain intervals of data at a time?"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "Dj3ftRRqfumW"
+      },
+      "source": [
+        "To begin, run the following cell to set up Apache Beam."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": 1,
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/";
+        },
+        "id": "zmJ0pCmSvD0-",
+        "outputId": "9041f637-12a0-4f78-f60b-ebd3c3a1c186"
+      },
+      "outputs": [
+        {
+          "name": "stdout",
+          "output_type": "stream",
+          "text": [
+            "\u001b[2K     
\u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m14.5/14.5 
MB\u001b[0m \u001b[31m53.0 MB/s\u001b[0m eta \u001b[36m0:00:00\u001b[0m\n",
+            "\u001b[2K     
\u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m 
\u001b[32m144.1/144.1 kB\u001b[0m \u001b[31m1.1 MB/s\u001b[0m eta 
\u001b[36m0:00:00\u001b[0m\n",
+            "\u001b[2K     
\u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m89.7/89.7 
kB\u001b[0m \u001b[31m7.1 MB/s\u001b[0m eta \u001b[36m0:00:00\u001b[0m\n",
+            "\u001b[?25h  Preparing metadata (setup.py) ... 
\u001b[?25l\u001b[?25hdone\n",
+            "\u001b[2K     
\u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m515.5/515.5 
kB\u001b[0m \u001b[31m12.3 MB/s\u001b[0m eta \u001b[36m0:00:00\u001b[0m\n",
+            "\u001b[2K     
\u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m2.6/2.6 
MB\u001b[0m \u001b[31m22.6 MB/s\u001b[0m eta \u001b[36m0:00:00\u001b[0m\n",
+            "\u001b[2K     
\u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m152.0/152.0 
kB\u001b[0m \u001b[31m10.1 MB/s\u001b[0m eta \u001b[36m0:00:00\u001b[0m\n",
+            "\u001b[?25h  Preparing metadata (setup.py) ... 
\u001b[?25l\u001b[?25hdone\n",
+            "\u001b[2K     
\u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m2.7/2.7 
MB\u001b[0m \u001b[31m15.4 MB/s\u001b[0m eta \u001b[36m0:00:00\u001b[0m\n",
+            "\u001b[?25h  Preparing metadata (setup.py) ... 
\u001b[?25l\u001b[?25hdone\n",
+            "  Building wheel for crcmod (setup.py) ... 
\u001b[?25l\u001b[?25hdone\n",
+            "  Building wheel for dill (setup.py) ... 
\u001b[?25l\u001b[?25hdone\n",
+            "  Building wheel for docopt (setup.py) ... 
\u001b[?25l\u001b[?25hdone\n"
+          ]
+        }
+      ],
+      "source": [
+        "# Install apache-beam.\n",
+        "!pip install --quiet apache-beam"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": 2,
+      "metadata": {
+        "id": "7sBoLahzPlJ1"
+      },
+      "outputs": [],
+      "source": [
+        "# Set the logging level to reduce verbose information\n",
+        "import logging\n",
+        "\n",
+        "logging.root.setLevel(logging.ERROR)"
+      ]
+    },
+    {
+      "attachments": {},
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "BB6FAwYj1dAi"
+      },
+      "source": [
+        "<hr style=\"border: 5px solid #003262;\" />\n",
+        "<hr style=\"border: 1px solid #fdb515;\" />\n",
+        "\n",
+        "## Batch vs. Stream Data Processing\n",
+        "\n",
+        "What's the difference?\n",
+        "\n",
+        "**Batch processing** is when data processing and analysis happens on 
a set of data that have already been stored over a period of time. \n",
+        "In other words, the input is a finite, bounded data set. Examples 
include payroll and billing systems, which have to be processed weekly or 
monthly.\n",
+        "\n",
+        "**Stream processing** happens *as* data flows through a system. This 
results in analysis and reporting of events \n",
+        "within a short period of time or near real-time on an infinite, 
unbounded data set. \n",
+        "Examples include fraud detection or intrusion detection, which 
requires the continuous processing of transaction data.\n",
+        "\n",
+        "> This tutorial will focus on **batch processing** examples. \n",
+        "To learn more about stream processing in Beam, check out 
[this](https://beam.apache.org/documentation/sdks/python-streaming/)."
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "W_63UtsoBRql"
+      },
+      "source": [
+        "<hr style=\"border: 5px solid #003262;\" />\n",
+        "\n",
+        "## Load the Data\n",
+        "\n",
+        "Let's import the example data we will be using throughout this 
tutorial. The 
[dataset](https://www.kaggle.com/datasets/fedesoriano/air-quality-data-in-india)
 consists of **hourly air quality data (PM 2.5) in India from November 2017 to 
June 2022**.\n",
+        "\n",
+        "> The World Health Organization (WHO) reports 7 million premature 
deaths linked to air pollution each year. In India alone, more than 90% of the 
country's population live in areas where air quality is below the WHO's 
standards.\n",
+        "\n",
+        "**What does the data look like?**\n",
+        "\n",
+        "The data set has 36,192 rows and 6 columns in total recording the 
following attributes:\n",
+        "\n",
+        "1.   `Timestamp`: Timestamp in the format YYYY-MM-DD HH:MM:SS\n",
+        "2.   `Year`: Year of the measure\n",
+        "3.   `Month`: Month of the measure\n",
+        "4.   `Day`: Day of the measure\n",
+        "5.   `Hour`: Hour of the measure\n",
+        "6.   `PM2.5`: Fine particulate matter air pollutant level in air\n",
+        "\n",
+        "**For our purposes, we will focus on only the first and last column 
of the** `air_quality` **DataFrame**:\n",
+        "\n",
+        "1.   `Timestamp`: Timestamp in the format YYYY-MM-DD HH:MM:SS\n",
+        "2.   `PM 2.5`: Fine particulate matter air pollutant level in air\n",
+        "\n",
+        "Run the following cell to load the data into our file directory."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": 3,
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/";
+        },
+        "id": "GTteBUZ-7e2s",
+        "outputId": "3af9cdb0-c248-4c6d-96f6-c3739fb66014"
+      },
+      "outputs": [
+        {
+          "name": "stdout",
+          "output_type": "stream",
+          "text": [
+            "Copying gs://batch-processing-example/air-quality-india.csv...\n",
+            "/ [1 files][  1.4 MiB/  1.4 MiB]                                  
              \n",
+            "Operation completed over 1 objects/1.4 MiB.                       
               \n"
+          ]
+        }
+      ],
+      "source": [
+        "# Copy the dataset file into the local file system from Google Cloud 
Storage.\n",
+        "!mkdir -p data\n",
+        "!gsutil cp gs://batch-processing-example/air-quality-india.csv data/"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "1NcmPl7C43lY"
+      },
+      "source": [
+        "#### Data Preparation\n",
+        "\n",
+        "Before we load the data into a Beam pipeline, let's use Pandas to 
select two columns."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": 4,
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/";,
+          "height": 206
+        },
+        "id": "dq-k7hwRf4MA",
+        "outputId": "7d70a959-5278-453e-9315-f5ed06821744"
+      },
+      "outputs": [
+        {
+          "data": {
+            "text/html": [
+              "\n",
+              "  <div id=\"df-ca65f108-edf8-4e2b-8152-6df025dc7ff8\">\n",
+              "    <div class=\"colab-df-container\">\n",
+              "      <div>\n",
+              "<style scoped>\n",
+              "    .dataframe tbody tr th:only-of-type {\n",
+              "        vertical-align: middle;\n",
+              "    }\n",
+              "\n",
+              "    .dataframe tbody tr th {\n",
+              "        vertical-align: top;\n",
+              "    }\n",
+              "\n",
+              "    .dataframe thead th {\n",
+              "        text-align: right;\n",
+              "    }\n",
+              "</style>\n",
+              "<table border=\"1\" class=\"dataframe\">\n",
+              "  <thead>\n",
+              "    <tr style=\"text-align: right;\">\n",
+              "      <th></th>\n",
+              "      <th>Timestamp</th>\n",
+              "      <th>Year</th>\n",
+              "      <th>Month</th>\n",
+              "      <th>Day</th>\n",
+              "      <th>Hour</th>\n",
+              "      <th>PM2.5</th>\n",
+              "    </tr>\n",
+              "  </thead>\n",
+              "  <tbody>\n",
+              "    <tr>\n",
+              "      <th>0</th>\n",
+              "      <td>2017-11-07 12:00:00</td>\n",
+              "      <td>2017</td>\n",
+              "      <td>11</td>\n",
+              "      <td>7</td>\n",
+              "      <td>12</td>\n",
+              "      <td>64.51</td>\n",
+              "    </tr>\n",
+              "    <tr>\n",
+              "      <th>1</th>\n",
+              "      <td>2017-11-07 13:00:00</td>\n",
+              "      <td>2017</td>\n",
+              "      <td>11</td>\n",
+              "      <td>7</td>\n",
+              "      <td>13</td>\n",
+              "      <td>69.95</td>\n",
+              "    </tr>\n",
+              "    <tr>\n",
+              "      <th>2</th>\n",
+              "      <td>2017-11-07 14:00:00</td>\n",
+              "      <td>2017</td>\n",
+              "      <td>11</td>\n",
+              "      <td>7</td>\n",
+              "      <td>14</td>\n",
+              "      <td>92.79</td>\n",
+              "    </tr>\n",
+              "    <tr>\n",
+              "      <th>3</th>\n",
+              "      <td>2017-11-07 15:00:00</td>\n",
+              "      <td>2017</td>\n",
+              "      <td>11</td>\n",
+              "      <td>7</td>\n",
+              "      <td>15</td>\n",
+              "      <td>109.66</td>\n",
+              "    </tr>\n",
+              "    <tr>\n",
+              "      <th>4</th>\n",
+              "      <td>2017-11-07 16:00:00</td>\n",
+              "      <td>2017</td>\n",
+              "      <td>11</td>\n",
+              "      <td>7</td>\n",
+              "      <td>16</td>\n",
+              "      <td>116.50</td>\n",
+              "    </tr>\n",
+              "  </tbody>\n",
+              "</table>\n",
+              "</div>\n",
+              "      <button class=\"colab-df-convert\" 
onclick=\"convertToInteractive('df-ca65f108-edf8-4e2b-8152-6df025dc7ff8')\"\n",
+              "              title=\"Convert this dataframe to an interactive 
table.\"\n",
+              "              style=\"display:none;\">\n",
+              "        \n",
+              "  <svg xmlns=\"http://www.w3.org/2000/svg\"; 
height=\"24px\"viewBox=\"0 0 24 24\"\n",
+              "       width=\"24px\">\n",
+              "    <path d=\"M0 0h24v24H0V0z\" fill=\"none\"/>\n",
+              "    <path d=\"M18.56 5.44l.94 2.06.94-2.06 
2.06-.94-2.06-.94-.94-2.06-.94 2.06-2.06.94zm-11 1L8.5 8.5l.94-2.06 
2.06-.94-2.06-.94L8.5 2.5l-.94 2.06-2.06.94zm10 10l.94 2.06.94-2.06 
2.06-.94-2.06-.94-.94-2.06-.94 2.06-2.06.94z\"/><path d=\"M17.41 
7.96l-1.37-1.37c-.4-.4-.92-.59-1.43-.59-.52 0-1.04.2-1.43.59L10.3 9.45l-7.72 
7.72c-.78.78-.78 2.05 0 2.83L4 21.41c.39.39.9.59 1.41.59.51 0 1.02-.2 
1.41-.59l7.78-7.78 2.81-2.81c.8-.78.8-2.07 0-2.86zM5.41 20L4 18.59l7.72-7.72 
1.47 1.35L5.41 20z\"/>\n",
+              "  </svg>\n",
+              "      </button>\n",
+              "      \n",
+              "  \n",
+              "    <button class=\"colab-df-quickchart\" 
onclick=\"quickchart('df-ca65f108-edf8-4e2b-8152-6df025dc7ff8')\"\n",
+              "            title=\"Generate charts.\"\n",
+              "            style=\"display:none;\">\n",
+              "      \n",
+              "  <svg xmlns=\"http://www.w3.org/2000/svg\"; 
height=\"24px\"viewBox=\"0 0 24 24\"\n",
+              "       width=\"24px\">\n",
+              "      <g>\n",
+              "          <path d=\"M19 3H5c-1.1 0-2 .9-2 2v14c0 1.1.9 2 2 
2h14c1.1 0 2-.9 2-2V5c0-1.1-.9-2-2-2zM9 17H7v-7h2v7zm4 0h-2V7h2v10zm4 
0h-2v-4h2v4z\"/>\n",
+              "      </g>\n",
+              "  </svg>\n",
+              "    </button>\n",
+              "   \n",
+              "  <style>\n",
+              "    .colab-df-quickchart {\n",
+              "      background-color: #E8F0FE;\n",
+              "      border: none;\n",
+              "      border-radius: 50%;\n",
+              "      cursor: pointer;\n",
+              "      display: none;\n",
+              "      fill: #1967D2;\n",
+              "      height: 32px;\n",
+              "      padding: 0 0 0 0;\n",
+              "      width: 32px;\n",
+              "    }\n",
+              "\n",
+              "    .colab-df-quickchart:hover {\n",
+              "      background-color: #E2EBFA;\n",
+              "      box-shadow: 0px 1px 2px rgba(60, 64, 67, 0.3), 0px 1px 
3px 1px rgba(60, 64, 67, 0.15);\n",
+              "      fill: #174EA6;\n",
+              "    }\n",
+              "\n",
+              "    [theme=dark] .colab-df-quickchart {\n",
+              "      background-color: #3B4455;\n",
+              "      fill: #D2E3FC;\n",
+              "    }\n",
+              "\n",
+              "    [theme=dark] .colab-df-quickchart:hover {\n",
+              "      background-color: #434B5C;\n",
+              "      box-shadow: 0px 1px 3px 1px rgba(0, 0, 0, 0.15);\n",
+              "      filter: drop-shadow(0px 1px 2px rgba(0, 0, 0, 0.3));\n",
+              "      fill: #FFFFFF;\n",
+              "    }\n",
+              "  </style>\n",
+              "\n",
+              "    <script>\n",
+              "      const quickchartButtonEl =\n",
+              "        
document.querySelector('#df-ca65f108-edf8-4e2b-8152-6df025dc7ff8 
button.colab-df-quickchart');\n",
+              "      quickchartButtonEl.style.display =\n",
+              "        google.colab.kernel.accessAllowed ? 'block' : 
'none';\n",
+              "\n",
+              "      async function quickchart(key) {\n",
+              "        const containerElement = 
document.querySelector('#df-ca65f108-edf8-4e2b-8152-6df025dc7ff8');\n",
+              "        const charts = await 
google.colab.kernel.invokeFunction(\n",
+              "            'generateCharts', [key], {})\n",
+              "      }\n",
+              "    </script>    \n",
+              "  <style>\n",
+              "    .colab-df-container {\n",
+              "      display:flex;\n",
+              "      flex-wrap:wrap;\n",
+              "      gap: 12px;\n",
+              "    }\n",
+              "\n",
+              "    .colab-df-convert {\n",
+              "      background-color: #E8F0FE;\n",
+              "      border: none;\n",
+              "      border-radius: 50%;\n",
+              "      cursor: pointer;\n",
+              "      display: none;\n",
+              "      fill: #1967D2;\n",
+              "      height: 32px;\n",
+              "      padding: 0 0 0 0;\n",
+              "      width: 32px;\n",
+              "    }\n",
+              "\n",
+              "    .colab-df-convert:hover {\n",
+              "      background-color: #E2EBFA;\n",
+              "      box-shadow: 0px 1px 2px rgba(60, 64, 67, 0.3), 0px 1px 
3px 1px rgba(60, 64, 67, 0.15);\n",
+              "      fill: #174EA6;\n",
+              "    }\n",
+              "\n",
+              "    [theme=dark] .colab-df-convert {\n",
+              "      background-color: #3B4455;\n",
+              "      fill: #D2E3FC;\n",
+              "    }\n",
+              "\n",
+              "    [theme=dark] .colab-df-convert:hover {\n",
+              "      background-color: #434B5C;\n",
+              "      box-shadow: 0px 1px 3px 1px rgba(0, 0, 0, 0.15);\n",
+              "      filter: drop-shadow(0px 1px 2px rgba(0, 0, 0, 0.3));\n",
+              "      fill: #FFFFFF;\n",
+              "    }\n",
+              "  </style>\n",
+              "\n",
+              "      <script>\n",
+              "        const buttonEl =\n",
+              "          
document.querySelector('#df-ca65f108-edf8-4e2b-8152-6df025dc7ff8 
button.colab-df-convert');\n",
+              "        buttonEl.style.display =\n",
+              "          google.colab.kernel.accessAllowed ? 'block' : 
'none';\n",
+              "\n",
+              "        async function convertToInteractive(key) {\n",
+              "          const element = 
document.querySelector('#df-ca65f108-edf8-4e2b-8152-6df025dc7ff8');\n",
+              "          const dataTable =\n",
+              "            await 
google.colab.kernel.invokeFunction('convertToInteractive',\n",
+              "                                                     [key], 
{});\n",
+              "          if (!dataTable) return;\n",
+              "\n",
+              "          const docLinkHtml = 'Like what you see? Visit the ' 
+\n",
+              "            '<a target=\"_blank\" 
href=https://colab.research.google.com/notebooks/data_table.ipynb>data table 
notebook</a>'\n",
+              "            + ' to learn more about interactive tables.';\n",
+              "          element.innerHTML = '';\n",
+              "          dataTable['output_type'] = 'display_data';\n",
+              "          await google.colab.output.renderOutput(dataTable, 
element);\n",
+              "          const docLink = document.createElement('div');\n",
+              "          docLink.innerHTML = docLinkHtml;\n",
+              "          element.appendChild(docLink);\n",
+              "        }\n",
+              "      </script>\n",
+              "    </div>\n",
+              "  </div>\n",
+              "  "
+            ],
+            "text/plain": [
+              "             Timestamp  Year  Month  Day  Hour   PM2.5\n",
+              "0  2017-11-07 12:00:00  2017     11    7    12   64.51\n",
+              "1  2017-11-07 13:00:00  2017     11    7    13   69.95\n",
+              "2  2017-11-07 14:00:00  2017     11    7    14   92.79\n",
+              "3  2017-11-07 15:00:00  2017     11    7    15  109.66\n",
+              "4  2017-11-07 16:00:00  2017     11    7    16  116.50"
+            ]
+          },
+          "execution_count": 4,
+          "metadata": {},
+          "output_type": "execute_result"
+        }
+      ],
+      "source": [
+        "# Load the data into a Python Pandas DataFrame.\n",
+        "import pandas as pd\n",
+        "\n",
+        "air_quality = pd.read_csv(\"data/air-quality-india.csv\")\n",
+        "air_quality.head()"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": 5,
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/";,
+          "height": 237
+        },
+        "id": "WNXrvP-wDIkA",
+        "outputId": "3e932987-41b3-4aaf-b49f-3707a9728322"
+      },
+      "outputs": [
+        {
+          "data": {
+            "text/html": [
+              "\n",
+              "  <div id=\"df-db4299d8-d441-4fc1-af7d-8d1c199492ed\">\n",
+              "    <div class=\"colab-df-container\">\n",
+              "      <div>\n",
+              "<style scoped>\n",
+              "    .dataframe tbody tr th:only-of-type {\n",
+              "        vertical-align: middle;\n",
+              "    }\n",
+              "\n",
+              "    .dataframe tbody tr th {\n",
+              "        vertical-align: top;\n",
+              "    }\n",
+              "\n",
+              "    .dataframe thead th {\n",
+              "        text-align: right;\n",
+              "    }\n",
+              "</style>\n",
+              "<table border=\"1\" class=\"dataframe\">\n",
+              "  <thead>\n",
+              "    <tr style=\"text-align: right;\">\n",
+              "      <th></th>\n",
+              "      <th>PM2.5</th>\n",
+              "    </tr>\n",
+              "    <tr>\n",
+              "      <th>Timestamp</th>\n",
+              "      <th></th>\n",
+              "    </tr>\n",
+              "  </thead>\n",
+              "  <tbody>\n",
+              "    <tr>\n",
+              "      <th>2017-11-07 12:00:00</th>\n",
+              "      <td>64.51</td>\n",
+              "    </tr>\n",
+              "    <tr>\n",
+              "      <th>2017-11-07 13:00:00</th>\n",
+              "      <td>69.95</td>\n",
+              "    </tr>\n",
+              "    <tr>\n",
+              "      <th>2017-11-07 14:00:00</th>\n",
+              "      <td>92.79</td>\n",
+              "    </tr>\n",
+              "    <tr>\n",
+              "      <th>2017-11-07 15:00:00</th>\n",
+              "      <td>109.66</td>\n",
+              "    </tr>\n",
+              "    <tr>\n",
+              "      <th>2017-11-07 16:00:00</th>\n",
+              "      <td>116.50</td>\n",
+              "    </tr>\n",
+              "  </tbody>\n",
+              "</table>\n",
+              "</div>\n",
+              "      <button class=\"colab-df-convert\" 
onclick=\"convertToInteractive('df-db4299d8-d441-4fc1-af7d-8d1c199492ed')\"\n",
+              "              title=\"Convert this dataframe to an interactive 
table.\"\n",
+              "              style=\"display:none;\">\n",
+              "        \n",
+              "  <svg xmlns=\"http://www.w3.org/2000/svg\"; 
height=\"24px\"viewBox=\"0 0 24 24\"\n",
+              "       width=\"24px\">\n",
+              "    <path d=\"M0 0h24v24H0V0z\" fill=\"none\"/>\n",
+              "    <path d=\"M18.56 5.44l.94 2.06.94-2.06 
2.06-.94-2.06-.94-.94-2.06-.94 2.06-2.06.94zm-11 1L8.5 8.5l.94-2.06 
2.06-.94-2.06-.94L8.5 2.5l-.94 2.06-2.06.94zm10 10l.94 2.06.94-2.06 
2.06-.94-2.06-.94-.94-2.06-.94 2.06-2.06.94z\"/><path d=\"M17.41 
7.96l-1.37-1.37c-.4-.4-.92-.59-1.43-.59-.52 0-1.04.2-1.43.59L10.3 9.45l-7.72 
7.72c-.78.78-.78 2.05 0 2.83L4 21.41c.39.39.9.59 1.41.59.51 0 1.02-.2 
1.41-.59l7.78-7.78 2.81-2.81c.8-.78.8-2.07 0-2.86zM5.41 20L4 18.59l7.72-7.72 
1.47 1.35L5.41 20z\"/>\n",
+              "  </svg>\n",
+              "      </button>\n",
+              "      \n",
+              "  \n",
+              "    <button class=\"colab-df-quickchart\" 
onclick=\"quickchart('df-db4299d8-d441-4fc1-af7d-8d1c199492ed')\"\n",
+              "            title=\"Generate charts.\"\n",
+              "            style=\"display:none;\">\n",
+              "      \n",
+              "  <svg xmlns=\"http://www.w3.org/2000/svg\"; 
height=\"24px\"viewBox=\"0 0 24 24\"\n",
+              "       width=\"24px\">\n",
+              "      <g>\n",
+              "          <path d=\"M19 3H5c-1.1 0-2 .9-2 2v14c0 1.1.9 2 2 
2h14c1.1 0 2-.9 2-2V5c0-1.1-.9-2-2-2zM9 17H7v-7h2v7zm4 0h-2V7h2v10zm4 
0h-2v-4h2v4z\"/>\n",
+              "      </g>\n",
+              "  </svg>\n",
+              "    </button>\n",
+              "   \n",
+              "  <style>\n",
+              "    .colab-df-quickchart {\n",
+              "      background-color: #E8F0FE;\n",
+              "      border: none;\n",
+              "      border-radius: 50%;\n",
+              "      cursor: pointer;\n",
+              "      display: none;\n",
+              "      fill: #1967D2;\n",
+              "      height: 32px;\n",
+              "      padding: 0 0 0 0;\n",
+              "      width: 32px;\n",
+              "    }\n",
+              "\n",
+              "    .colab-df-quickchart:hover {\n",
+              "      background-color: #E2EBFA;\n",
+              "      box-shadow: 0px 1px 2px rgba(60, 64, 67, 0.3), 0px 1px 
3px 1px rgba(60, 64, 67, 0.15);\n",
+              "      fill: #174EA6;\n",
+              "    }\n",
+              "\n",
+              "    [theme=dark] .colab-df-quickchart {\n",
+              "      background-color: #3B4455;\n",
+              "      fill: #D2E3FC;\n",
+              "    }\n",
+              "\n",
+              "    [theme=dark] .colab-df-quickchart:hover {\n",
+              "      background-color: #434B5C;\n",
+              "      box-shadow: 0px 1px 3px 1px rgba(0, 0, 0, 0.15);\n",
+              "      filter: drop-shadow(0px 1px 2px rgba(0, 0, 0, 0.3));\n",
+              "      fill: #FFFFFF;\n",
+              "    }\n",
+              "  </style>\n",
+              "\n",
+              "    <script>\n",
+              "      const quickchartButtonEl =\n",
+              "        
document.querySelector('#df-db4299d8-d441-4fc1-af7d-8d1c199492ed 
button.colab-df-quickchart');\n",
+              "      quickchartButtonEl.style.display =\n",
+              "        google.colab.kernel.accessAllowed ? 'block' : 
'none';\n",
+              "\n",
+              "      async function quickchart(key) {\n",
+              "        const containerElement = 
document.querySelector('#df-db4299d8-d441-4fc1-af7d-8d1c199492ed');\n",
+              "        const charts = await 
google.colab.kernel.invokeFunction(\n",
+              "            'generateCharts', [key], {})\n",
+              "      }\n",
+              "    </script>    \n",
+              "  <style>\n",
+              "    .colab-df-container {\n",
+              "      display:flex;\n",
+              "      flex-wrap:wrap;\n",
+              "      gap: 12px;\n",
+              "    }\n",
+              "\n",
+              "    .colab-df-convert {\n",
+              "      background-color: #E8F0FE;\n",
+              "      border: none;\n",
+              "      border-radius: 50%;\n",
+              "      cursor: pointer;\n",
+              "      display: none;\n",
+              "      fill: #1967D2;\n",
+              "      height: 32px;\n",
+              "      padding: 0 0 0 0;\n",
+              "      width: 32px;\n",
+              "    }\n",
+              "\n",
+              "    .colab-df-convert:hover {\n",
+              "      background-color: #E2EBFA;\n",
+              "      box-shadow: 0px 1px 2px rgba(60, 64, 67, 0.3), 0px 1px 
3px 1px rgba(60, 64, 67, 0.15);\n",
+              "      fill: #174EA6;\n",
+              "    }\n",
+              "\n",
+              "    [theme=dark] .colab-df-convert {\n",
+              "      background-color: #3B4455;\n",
+              "      fill: #D2E3FC;\n",
+              "    }\n",
+              "\n",
+              "    [theme=dark] .colab-df-convert:hover {\n",
+              "      background-color: #434B5C;\n",
+              "      box-shadow: 0px 1px 3px 1px rgba(0, 0, 0, 0.15);\n",
+              "      filter: drop-shadow(0px 1px 2px rgba(0, 0, 0, 0.3));\n",
+              "      fill: #FFFFFF;\n",
+              "    }\n",
+              "  </style>\n",
+              "\n",
+              "      <script>\n",
+              "        const buttonEl =\n",
+              "          
document.querySelector('#df-db4299d8-d441-4fc1-af7d-8d1c199492ed 
button.colab-df-convert');\n",
+              "        buttonEl.style.display =\n",
+              "          google.colab.kernel.accessAllowed ? 'block' : 
'none';\n",
+              "\n",
+              "        async function convertToInteractive(key) {\n",
+              "          const element = 
document.querySelector('#df-db4299d8-d441-4fc1-af7d-8d1c199492ed');\n",
+              "          const dataTable =\n",
+              "            await 
google.colab.kernel.invokeFunction('convertToInteractive',\n",
+              "                                                     [key], 
{});\n",
+              "          if (!dataTable) return;\n",
+              "\n",
+              "          const docLinkHtml = 'Like what you see? Visit the ' 
+\n",
+              "            '<a target=\"_blank\" 
href=https://colab.research.google.com/notebooks/data_table.ipynb>data table 
notebook</a>'\n",
+              "            + ' to learn more about interactive tables.';\n",
+              "          element.innerHTML = '';\n",
+              "          dataTable['output_type'] = 'display_data';\n",
+              "          await google.colab.output.renderOutput(dataTable, 
element);\n",
+              "          const docLink = document.createElement('div');\n",
+              "          docLink.innerHTML = docLinkHtml;\n",
+              "          element.appendChild(docLink);\n",
+              "        }\n",
+              "      </script>\n",
+              "    </div>\n",
+              "  </div>\n",
+              "  "
+            ],
+            "text/plain": [
+              "                      PM2.5\n",
+              "Timestamp                  \n",
+              "2017-11-07 12:00:00   64.51\n",
+              "2017-11-07 13:00:00   69.95\n",
+              "2017-11-07 14:00:00   92.79\n",
+              "2017-11-07 15:00:00  109.66\n",
+              "2017-11-07 16:00:00  116.50"
+            ]
+          },
+          "execution_count": 5,
+          "metadata": {},
+          "output_type": "execute_result"
+        }
+      ],
+      "source": [
+        "import csv\n",
+        "\n",
+        "#Select only the two features of the DataFrame we're interested 
in.\n",
+        "airq = air_quality.loc[:, [\"Timestamp\", 
\"PM2.5\"]].set_index(\"Timestamp\")\n",
+        "saved_new = pd.DataFrame(airq)\n",
+        "saved_new.to_csv(\"data/air_quality.csv\")\n",
+        "airq.head()"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "VRFkb_sLDUCD"
+      },
+      "source": [
+        "<hr style=\"border: 5px solid #003262;\" />\n",
+        "\n",
+        "# 1. Average Air Quality Index (AQI)\n",
+        "\n",
+        "Before we explore more advanced batch processing with different types 
of windowing, we will start with a simple batch analysis example.\n",
+        "\n",
+        "Our **objective** is to analyze the *entire* dataset to find the 
**average PM2.5 air quality index** in India across the entire 11/2017-6/2022 
period.\n",
+        "\n",
+        "> This examples uses the `GlobalWindow`, which is a single window 
that covers the entire PCollection. All pipelines use the 
[`GlobalWindow`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.window.html#apache_beam.transforms.window.GlobalWindow)
 by default. In many cases, especially for batch pipelines, this is what we 
want since we want to analyze all the data that we have."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": 6,
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/";,
+          "height": 34
+        },
+        "id": "v06NFe9sDYXc",
+        "outputId": "f65eae63-0424-4ac0-8609-78e98ac21bd0"
+      },
+      "outputs": [
+        {
+          "data": {
+            "application/javascript": "\n        if (typeof 
window.interactive_beam_jquery == 'undefined') {\n          var jqueryScript = 
document.createElement('script');\n          jqueryScript.src = 
'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n          
jqueryScript.type = 'text/javascript';\n          jqueryScript.onload = 
function() {\n            var datatableScript = 
document.createElement('script');\n            datatableScript.src = 
'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n            
datatableScript.type = 'text/javascript';\n            datatableScript.onload = 
function() {\n              window.interactive_beam_jquery = 
jQuery.noConflict(true);\n              
window.interactive_beam_jquery(document).ready(function($){\n                \n 
             });\n            }\n            
document.head.appendChild(datatableScript);\n          };\n          
document.head.appendChild(jqueryScript);\n        } else {\n          
window.interactive
 _beam_jquery(document).ready(function($){\n            \n          });\n       
 }"
+          },
+          "metadata": {},
+          "output_type": "display_data"
+        },
+        {
+          "name": "stdout",
+          "output_type": "stream",
+          "text": [
+            "49.308428658266905\n"
+          ]
+        }
+      ],
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "def parse_file(element):\n",
+        "  file = csv.reader([element], quotechar='\"', delimiter=',',\n",
+        "                    quoting=csv.QUOTE_ALL, skipinitialspace=True)\n",
+        "  for line in file:\n",
+        "    return line\n",
+        "\n",
+        "with beam.Pipeline() as pipeline:\n",
+        " (\n",
+        "      pipeline\n",
+        "      | 'Read input file' >> 
beam.io.ReadFromText(\"data/air_quality.csv\",\n",
+        "                                                  
skip_header_lines=1)\n",
+        "      | 'Parse file' >> beam.Map(parse_file)\n",
+        "      | 'Get PM' >> beam.Map(lambda x: float(x[1])) # only process 
PM2.5\n",
+        "      | 'Get mean value' >> beam.combiners.Mean.Globally()\n",
+        "      | beam.Map(print))"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": 7,
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/";
+        },
+        "id": "GmHEE1G5Y1z-",
+        "outputId": "248ee3d7-43af-4b53-9832-8da0eb7ac974"
+      },
+      "outputs": [
+        {
+          "data": {
+            "text/plain": [
+              "49.30842865826703"
+            ]
+          },
+          "execution_count": 7,
+          "metadata": {},
+          "output_type": "execute_result"
+        }
+      ],
+      "source": [
+        "# To verify, the above mean value matches what Pandas produces\n",
+        "airq[\"PM2.5\"].mean()"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "b3gGxC6w6qXx"
+      },
+      "source": [
+        "**Congratulations!** You just created a simple aggregation processing 
pipeline in batch using `GlobalWindow`."
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "vRameihqDJ8l"
+      },
+      "source": [
+        "<hr style=\"border: 5px solid #003262;\" />\n",
+        "\n",
+        "# 2. Advanced Processing in Batch with Windowing\n",
+        "\n",
+        "Sometimes, we want to 
[aggregate](https://beam.apache.org/documentation/transforms/python/overview/#aggregation)
 data, like `GroupByKey` or `Combine`, only at certain intervals, like hourly 
or daily, instead of processing the entire `PCollection` of data only once.\n",
+        "\n",
+        "In this case, our **objective** is to determine the **fluctuations of 
air quality *every 30 days*.\n",
+        "\n",
+        "**_Windows_** in Beam allow us to process only certain data intervals 
at a time.\n",
+        "In this notebook, we will go through different ways of windowing our 
pipeline.\n",
+        "\n",
+        "We have already been introduced to the default GlobalWindow (see 
above) that covers the entire PCollection. Now we will dive into **fixed time 
windows, sliding time windows, and session windows**.\n",
+        "\n",
+        "> [Another windowing 
tutorial](https://colab.sandbox.google.com/github/apache/beam/blob/master/examples/notebooks/tour-of-beam/windowing.ipynb)
 with a toy dataset is recommended to go through."
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "gj0_S5Ka3-zb"
+      },
+      "source": [
+        "### First, we need to convert timestamps to Unix time\n",
+        "\n",
+        "Apache Beam requires us to provide the timestamp as Unix time. Let us 
write the simple time conversion function:"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": 8,
+      "metadata": {
+        "id": "nKBYsxFg4SIa"
+      },
+      "outputs": [],
+      "source": [
+        "import time\n",
+        "\n",
+        "# This function is modifiable and can convert integers to time 
formats like unix\n",
+        "# Without this function and .strptime, you may run into comparison 
issues!\n",
+        "def to_unix_time(time_str: str, time_format='%Y-%m-%d %H:%M:%S') -> 
int:\n",
+        "  \"\"\"Converts a time string into Unix time.\"\"\"\n",
+        "  time_tuple = time.strptime(time_str, time_format)\n",
+        "  return int(time.mktime(time_tuple))"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": 9,
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/";
+        },
+        "id": "_mPge0KdRx20",
+        "outputId": "43475bbe-548a-4817-ed0b-534cebbe70ce"
+      },
+      "outputs": [
+        {
+          "data": {
+            "text/plain": [
+              "1634220000"
+            ]
+          },
+          "execution_count": 9,
+          "metadata": {},
+          "output_type": "execute_result"
+        }
+      ],
+      "source": [
+        "to_unix_time('2021-10-14 14:00:00')"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "lL0_QONF1aMH"
+      },
+      "source": [
+        "### Second, let us define some helper functions to develop our 
pipeline\n",
+        "\n",
+        "In this code, we have a transform (`PrintWindowInfo`) to help us 
analyze an element alongside its window information, and we have another 
transform\n",
+        "(`PrintWindowInfo`) to help us analyze how many elements landed into 
each window.\n",
+        "We use a custom 
[`DoFn`](https://beam.apache.org/documentation/transforms/python/elementwise/pardo)\n",
+        "to access that information."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": 10,
+      "metadata": {
+        "id": "KtPL-echb2xv"
+      },
+      "outputs": [],
+      "source": [
+        "#@title Helper functions to develop our pipeline\n",
+        "\n",
+        "def human_readable_window(window) -> str:\n",
+        "  \"\"\"Formats a window object into a human readable 
string.\"\"\"\n",
+        "  if isinstance(window, beam.window.GlobalWindow):\n",
+        "    return str(window)\n",
+        "  return f'{window.start.to_utc_datetime()} - 
{window.end.to_utc_datetime()}'\n",
+        "\n",
+        "class PrintElementInfo(beam.DoFn):\n",
+        "  \"\"\"Prints an element with its Window information for 
debugging.\"\"\"\n",
+        "  def process(self, element, timestamp=beam.DoFn.TimestampParam, 
window=beam.DoFn.WindowParam):\n",
+        "    print(f'[{human_readable_window(window)}] 
{timestamp.to_utc_datetime()} -- {element}')\n",
+        "    yield element\n",
+        "\n",
+        "@beam.ptransform_fn\n",
+        "def PrintWindowInfo(pcollection):\n",

Review Comment:
   I added `LogElements` as the note. The issue for `LogElements` is the 
printing format is fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to