davidcavazos commented on a change in pull request #14962:
URL: https://github.com/apache/beam/pull/14962#discussion_r660825491
##########
File path: examples/notebooks/tour-of-beam/windowing.ipynb
##########
@@ -0,0 +1,703 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 0,
+ "metadata": {
+ "colab": {
+ "name": "Windowing -- Tour of Beam",
+ "provenance": [],
+ "collapsed_sections": [],
+ "toc_visible": true
+ },
+ "kernelspec": {
+ "name": "python3",
+ "display_name": "Python 3"
+ }
+ },
+ "cells": [
+ {
+ "cell_type": "code",
+ "metadata": {
+ "cellView": "form",
+ "id": "upmJn_DjcThx"
+ },
+ "source": [
+ "#@title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License."
+ ],
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "5UC_aGanx6oE"
+ },
+ "source": [
+ "# Windowing -- _Tour of Beam_\n",
+ "\n",
+ "Sometimes, we want to
[aggregate](https://beam.apache.org/documentation/transforms/python/overview/#aggregation)
data, like `GroupByKey` or `Combine`, only at certain intervals, like hourly
or daily, instead of processing the entire `PCollection` of data only once.\n",
+ "\n",
+ "We might want to emit a [moving
average](https://en.wikipedia.org/wiki/Moving_average) as we're processing
data.\n",
+ "\n",
+ "Maybe we want to analyze the user experience for a certain task in a
web app, it would be nice to get the app events by sessions of activity.\n",
+ "\n",
+ "Or we could be running a streaming pipeline, and there is no end to
the data, so how can we aggregate data?\n",
+ "\n",
+ "_Windows_ in Beam allow us to process only certain data intervals at
a time.\n",
+ "In this notebook, we go through different ways of windowing our
pipeline.\n",
+ "\n",
+ "Lets begin by installing `apache-beam`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "metadata": {
+ "id": "R_Yhoc6N_Flg"
+ },
+ "source": [
+ "# Install apache-beam with pip.\n",
+ "!pip install --quiet apache-beam"
+ ],
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "_OkWHiAvpWDZ"
+ },
+ "source": [
+ "First, lets define some helper functions to simplify the rest of the
examples.\n",
+ "\n",
+ "We have a transform to help us analyze an element alongside its
window information, and we have another transform to help us analyze how many
elements landed into each window.\n",
+ "We use a custom
[`DoFn`](https://beam.apache.org/documentation/transforms/python/elementwise/pardo)\n",
+ "to access that information.\n",
+ "\n",
+ "You don't need to understand these, you just need to know they exist
🙂."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "metadata": {
+ "id": "C9yAN1Hgk3dF"
+ },
+ "source": [
+ "import apache_beam as beam\n",
+ "\n",
+ "def human_readable_window(window) -> str:\n",
+ " \"\"\"Formats a window object into a human readable
string.\"\"\"\n",
+ " if isinstance(window, beam.window.GlobalWindow):\n",
+ " return str(window)\n",
+ " return f'{window.start.to_utc_datetime()} -
{window.end.to_utc_datetime()}'\n",
+ "\n",
+ "class PrintElementInfo(beam.DoFn):\n",
+ " \"\"\"Prints an element with its Window information.\"\"\"\n",
+ " def process(self, element, timestamp=beam.DoFn.TimestampParam,
window=beam.DoFn.WindowParam):\n",
+ " print(f'[{human_readable_window(window)}]
{timestamp.to_utc_datetime()} -- {element}')\n",
+ " yield element\n",
+ "\n",
+ "@beam.ptransform_fn\n",
+ "def PrintWindowInfo(pcollection):\n",
+ " \"\"\"Prints the Window information with how many elements landed
in that window.\"\"\"\n",
+ " class PrintCountsInfo(beam.DoFn):\n",
+ " def process(self, num_elements, window=beam.DoFn.WindowParam):\n",
+ " print(f'>> Window [{human_readable_window(window)}] has
{num_elements} elements')\n",
+ " yield num_elements\n",
+ "\n",
+ " return (\n",
+ " pcollection\n",
+ " | 'Count elements per window' >>
beam.combiners.Count.Globally().without_defaults()\n",
+ " | 'Print counts info' >> beam.ParDo(PrintCountsInfo())\n",
+ " )"
+ ],
+ "execution_count": 1,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "CQrojV2QnqIU"
+ },
+ "source": [
+ "Now lets create some data to use in the examples.\n",
+ "\n",
+ "Windows define data intervals based on time, so we need to tell
Apache Beam a timestamp for each element.\n",
+ "\n",
+ "We define a `PTransform` for convenience, so we can attach the
timestamps automatically.\n",
+ "\n",
+ "Apache Beam requires us to provide the timestamp as [Unix
time](https://en.wikipedia.org/wiki/Unix_time), which is a way to represent a
date and time as the number of seconds since January 1st, 1970.\n",
+ "\n",
+ "For our data, lets analyze some events about the seasons and moon
phases for the year 2021, which might be [useful for a gardening
project](https://www.almanac.com/content/planting-by-the-moon).\n",
+ "\n",
+ "To attach timestamps to each element, we can `Map` each element and
return a
[`TimestmpedValue`](https://beam.apache.org/documentation/transforms/python/elementwise/withtimestamps/)."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "metadata": {
+ "colab": {
+ "base_uri": "https://localhost:8080/"
+ },
+ "id": "Sgzscopvmh1f",
+ "outputId": "e0c6fc19-ab97-4754-8f1f-1601807be940"
+ },
+ "source": [
+ "import time\n",
+ "from apache_beam.options.pipeline_options import PipelineOptions\n",
+ "\n",
+ "def to_unix_time(time_str: str, time_format='%Y-%m-%d %H:%M:%S') ->
int:\n",
+ " \"\"\"Converts a time string into Unix time.\"\"\"\n",
+ " time_tuple = time.strptime(time_str, time_format)\n",
+ " return int(time.mktime(time_tuple))\n",
+ "\n",
+ "@beam.ptransform_fn\n",
+ "@beam.typehints.with_input_types(beam.pvalue.PBegin)\n",
+ "@beam.typehints.with_output_types(beam.window.TimestampedValue)\n",
+ "def AstronomicalEvents(pipeline):\n",
+ " return (\n",
+ " pipeline\n",
+ " | 'Create data' >> beam.Create([\n",
+ " ('2021-03-20 03:37:00', 'March Equinox 2021'),\n",
+ " ('2021-04-26 22:31:00', 'Super full moon'),\n",
+ " ('2021-05-11 13:59:00', 'Micro new moon'),\n",
+ " ('2021-05-26 06:13:00', 'Super full moon, total lunar
eclipse'),\n",
+ " ('2021-06-20 22:32:00', 'June Solstice 2021'),\n",
+ " ('2021-08-22 07:01:00', 'Blue moon'),\n",
+ " ('2021-09-22 14:21:00', 'September Equinox 2021'),\n",
+ " ('2021-11-04 15:14:00', 'Super new moon'),\n",
+ " ('2021-11-19 02:57:00', 'Micro full moon, partial lunar
eclipse'),\n",
+ " ('2021-12-04 01:43:00', 'Super new moon'),\n",
+ " ('2021-12-18 10:35:00', 'Micro full moon'),\n",
+ " ('2021-12-21 09:59:00', 'December Solstice 2021'),\n",
+ " ])\n",
+ " | 'With timestamps' >> beam.MapTuple(\n",
+ " lambda timestamp, element:\n",
+ " beam.window.TimestampedValue(element,
to_unix_time(timestamp))\n",
+ " )\n",
+ " )\n",
+ "\n",
+ "# Lets see how the data looks like.\n",
+ "beam_options = PipelineOptions(flags=[],
type_check_additional='all')\n",
+ "with beam.Pipeline(options=beam_options) as pipeline:\n",
+ " (\n",
+ " pipeline\n",
+ " | 'Astronomical events' >> AstronomicalEvents()\n",
+ " | 'Print element' >> beam.Map(print)\n",
+ " )"
+ ],
+ "execution_count": 3,
+ "outputs": [
+ {
+ "output_type": "stream",
+ "text": [
+ "March Equinox 2021\n",
+ "Super full moon\n",
+ "Micro new moon\n",
+ "Super full moon, total lunar eclipse\n",
+ "June Solstice 2021\n",
+ "Blue moon\n",
+ "September Equinox 2021\n",
+ "December Solstice 2021\n",
+ "Super new moon\n",
+ "Micro full moon, partial lunar eclipse\n",
+ "Super new moon\n",
+ "Micro full moon\n"
+ ],
+ "name": "stdout"
+ }
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "qI0K3jSA2LbJ"
+ },
+ "source": [
+ "> ℹ️ After running this, it looks like the timestamps disappeared!\n",
+ "> They're actually still _implicitly_ part of the element, just like
the windowing information.\n",
+ "> If we need to access it, we can do so via a custom
[`DoFn`](https://beam.apache.org/documentation/transforms/python/elementwise/pardo).\n",
+ "> Aggregation transforms use each element's timestamp along with the
windowing we specified to create windows of elements."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "ymHF1WCqnG4V"
+ },
+ "source": [
+ "# Global window\n",
+ "\n",
+ "All pipelines use the
[`GlobalWindow`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.window.html#apache_beam.transforms.window.GlobalWindow)
by default.\n",
+ "This is a single window that covers the entire `PCollection`.\n",
+ "\n",
+ "In many cases, especially for batch pipelines, this is what we want
since we want to analyze all the data that we have.\n",
+ "\n",
+ "> ℹ️ `GlobalWindow` is not very useful in a streaming pipeline unless
you only need element-wise transforms.\n",
+ "> Aggregations, like `GroupByKey` and `Combine`, need to process the
entire window, but a streaming pipeline has no end, so they would never finish."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "metadata": {
+ "colab": {
+ "base_uri": "https://localhost:8080/"
+ },
+ "id": "xDXdE9uysriw",
+ "outputId": "b39e7fe7-dc13-4d77-89af-f2d1312ab673"
+ },
+ "source": [
+ "import apache_beam as beam\n",
+ "\n",
+ "# All elements fall into the GlobalWindow by default.\n",
+ "with beam.Pipeline() as pipeline:\n",
+ " (\n",
+ " pipeline\n",
+ " | 'Astrolonomical events' >> AstronomicalEvents()\n",
+ " | 'Print element info' >> beam.ParDo(PrintElementInfo())\n",
+ " | 'Print window info' >> PrintWindowInfo()\n",
+ " )"
+ ],
+ "execution_count": 4,
+ "outputs": [
+ {
+ "output_type": "stream",
+ "text": [
+ "[GlobalWindow] 2021-03-20 03:37:00 -- March Equinox 2021\n",
+ "[GlobalWindow] 2021-04-26 22:31:00 -- Super full moon\n",
+ "[GlobalWindow] 2021-05-11 13:59:00 -- Micro new moon\n",
+ "[GlobalWindow] 2021-05-26 06:13:00 -- Super full moon, total
lunar eclipse\n",
+ "[GlobalWindow] 2021-06-20 22:32:00 -- June Solstice 2021\n",
+ "[GlobalWindow] 2021-08-22 07:01:00 -- Blue moon\n",
+ "[GlobalWindow] 2021-09-22 14:21:00 -- September Equinox 2021\n",
+ "[GlobalWindow] 2021-12-21 09:59:00 -- December Solstice 2021\n",
+ "[GlobalWindow] 2021-11-04 15:14:00 -- Super new moon\n",
+ "[GlobalWindow] 2021-11-19 02:57:00 -- Micro full moon, partial
lunar eclipse\n",
+ "[GlobalWindow] 2021-12-04 01:43:00 -- Super new moon\n",
+ "[GlobalWindow] 2021-12-18 10:35:00 -- Micro full moon\n",
+ ">> Window [GlobalWindow] has 12 elements\n"
+ ],
+ "name": "stdout"
+ }
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "l3Kod_pR7a7S"
+ },
+ "source": [
+ ""
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "7WkYLzFCo4Rl"
+ },
+ "source": [
+ "# Fixed time windows\n",
+ "\n",
+ "If we want to analyze our data hourly, daily, monthly, etc. We might
want to create evenly spaced intervals.\n",
+ "\n",
+
"[`FixedWindows`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.window.html#apache_beam.transforms.window.FixedWindows)\n",
+ "allow us to create fixed-sized windows.\n",
+ "We only need to specify the _window size_ in seconds.\n",
+ "\n",
+ "In Python, we can use
[`timedelta`](https://docs.python.org/3/library/datetime.html#timedelta-objects)\n",
+ "to help us do the conversion of minutes, hours, or days for us.\n",
+ "\n",
+ "> ℹ️ Some time deltas like a month cannot be so easily converted into
seconds, since a month can have from 28 to 31 days.\n",
Review comment:
Makes sense, I'm changing all the months mentions to days.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]