This is an automated email from the ASF dual-hosted git repository.
bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7db9554 [BEAM-12535] add dataframes notebook (#15470)
7db9554 is described below
commit 7db955478ea19bdbc0ffb22b61db6b291ca2947e
Author: David Cavazos <[email protected]>
AuthorDate: Thu Sep 16 14:57:22 2021 -0700
[BEAM-12535] add dataframes notebook (#15470)
* [BEAM-12535] add dataframes notebook
* add license
* writing grammar improvements
* Mention Schema instead of beam.Row
Co-authored-by: Brian Hulette <[email protected]>
* Add button to DataFrames notebook
* Apply suggestions from code review
Co-authored-by: Brian Hulette <[email protected]>
* Add notes and warnings
* Add note about pandas dependency
Co-authored-by: Brian Hulette <[email protected]>
---
examples/notebooks/tour-of-beam/dataframes.ipynb | 747 +++++++++++++++++++++
.../dsls/dataframes/differences-from-pandas.md | 2 +-
.../en/documentation/dsls/dataframes/overview.md | 4 +
.../site/content/en/get-started/tour-of-beam.md | 9 +
4 files changed, 761 insertions(+), 1 deletion(-)
diff --git a/examples/notebooks/tour-of-beam/dataframes.ipynb
b/examples/notebooks/tour-of-beam/dataframes.ipynb
new file mode 100644
index 0000000..c19d991
--- /dev/null
+++ b/examples/notebooks/tour-of-beam/dataframes.ipynb
@@ -0,0 +1,747 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 2,
+ "metadata": {
+ "colab": {
+ "name": "Beam DataFrames",
+ "provenance": [],
+ "collapsed_sections": [],
+ "toc_visible": true
+ },
+ "kernelspec": {
+ "name": "python3",
+ "display_name": "Python 3"
+ },
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "cells": [
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "source": [
+ "#@title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License."
+ ],
+ "outputs": [],
+ "metadata": {
+ "cellView": "form",
+ "id": "rz2qIC9IL2rI"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Beam DataFrames\n",
+ "\n",
+ "<button>\n",
+ " <a
href=\"https://beam.apache.org/documentation/dsls/dataframes/overview/\">\n",
+ " <img src=\"https://beam.apache.org/images/favicon.ico\"
alt=\"Open the docs\" height=\"16\"/>\n",
+ " Beam DataFrames overview\n",
+ " </a>\n",
+ "</button>\n",
+ "\n",
+ "Beam DataFrames provide a pandas-like
[DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)\n",
+ "API to declare Beam pipelines.\n",
+ "\n",
+ "> ℹ️ To learn more about Beam DataFrames, take a look at the\n",
+ "[Beam DataFrames
overview](https://beam.apache.org/documentation/dsls/dataframes/overview)
page.\n",
+ "\n",
+ "First, we need to install Apache Beam with the `interactive` extra
for the Interactive runner.",
+ "We also need `pandas` for this notebook, but the Interactive runner
already depends on it."
+ ],
+ "metadata": {
+ "id": "hDuXLLSZnI1D"
+ }
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "source": [
+ "%pip install --quiet apache-beam[interactive]"
+ ],
+ "outputs": [],
+ "metadata": {
+ "id": "8QVByaWjkarZ"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "Lets create a small data file of\n",
+ "[Comma-Separated Values
(CSV)](https://en.wikipedia.org/wiki/Comma-separated_values).\n",
+ "It simply includes the dates of the\n",
+ "[equinoxes](https://en.wikipedia.org/wiki/Equinox) and\n",
+ "[solstices](https://en.wikipedia.org/wiki/Solstice)\n",
+ "of the year 2021."
+ ],
+ "metadata": {
+ "id": "aLqdbX4Mgipq"
+ }
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "source": [
+ "%%writefile solar_events.csv\n",
+ "timestamp,event\n",
+ "2021-03-20 09:37:00,March Equinox\n",
+ "2021-06-21 03:32:00,June Solstice\n",
+ "2021-09-22 19:21:00,September Equinox\n",
+ "2021-12-21 15:59:00,December Solstice"
+ ],
+ "outputs": [],
+ "metadata": {
+ "id": "hZjwAm7qotrJ"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Interactive Beam\n",
+ "\n",
+ "Pandas has the\n",
+
"[`pandas.read_csv`](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html)\n",
+ "function to easily read CSV files into DataFrames.\n",
+ "Beam has the\n",
+
"[`beam.dataframe.io.read_csv`](https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.io.html#apache_beam.dataframe.io.read_csv)\n",
+ "function that emulates `pandas.read_csv`, but returns a deferred Beam
DataFrame.\n",
+ "\n",
+ "If you’re using\n",
+ "[Interactive
Beam](https://beam.apache.org/releases/pydoc/current/apache_beam.runners.interactive.interactive_beam.html),\n",
+ "you can use `collect` to bring a Beam DataFrame into local memory as
a Pandas DataFrame."
+ ],
+ "metadata": {
+ "id": "Hv_58JulleQ_"
+ }
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 3,
+ "source": [
+ "import apache_beam as beam\n",
+ "import apache_beam.runners.interactive.interactive_beam as ib\n",
+ "from apache_beam.runners.interactive.interactive_runner import
InteractiveRunner\n",
+ "\n",
+ "pipeline = beam.Pipeline(InteractiveRunner())\n",
+ "\n",
+ "# Create a deferred Beam DataFrame with the contents of our csv
file.\n",
+ "beam_df = pipeline | 'Read CSV' >>
beam.dataframe.io.read_csv('solar_events.csv')\n",
+ "\n",
+ "# We can use `ib.collect` to view the contents of a Beam
DataFrame.\n",
+ "ib.collect(beam_df)"
+ ],
+ "outputs": [
+ {
+ "output_type": "display_data",
+ "data": {
+ "application/javascript": "\n if (typeof
window.interactive_beam_jquery == 'undefined') {\n var jqueryScript =
document.createElement('script');\n jqueryScript.src =
'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n
jqueryScript.type = 'text/javascript';\n jqueryScript.onload =
function() {\n var datatableScript =
document.createElement('script');\n datatableScript.src =
'https://cdn.datatables.net/1.10.20/j [...]
+ },
+ "metadata": {}
+ },
+ {
+ "output_type": "display_data",
+ "data": {
+ "text/html": [
+ "\n",
+ " <link rel=\"stylesheet\"
href=\"https://stackpath.bootstrapcdn.com/bootstrap/4.4.1/css/bootstrap.min.css\"
integrity=\"sha384-Vkoo8x4CGsO3+Hhxv8T/Q5PaXtkKtu6ug5TOeNV6gBiFeWPGFN9MuhOf23Q9Ifjh\"
crossorigin=\"anonymous\">\n",
+ " <div
id=\"progress_indicator_1516f4062e4fc6d4e58f33cf44c41c1d\"
class=\"spinner-border text-info\" role=\"status\">\n",
+ " </div>"
+ ],
+ "text/plain": [
+ "<IPython.core.display.HTML object>"
+ ]
+ },
+ "metadata": {}
+ },
+ {
+ "output_type": "stream",
+ "name": "stderr",
+ "text": [
+ "WARNING:root:Make sure that locally built Python SDK docker image
has Python 3.7 interpreter.\n"
+ ]
+ },
+ {
+ "output_type": "display_data",
+ "data": {
+ "application/javascript": "\n if (typeof
window.interactive_beam_jquery == 'undefined') {\n var jqueryScript =
document.createElement('script');\n jqueryScript.src =
'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n
jqueryScript.type = 'text/javascript';\n jqueryScript.onload =
function() {\n var datatableScript =
document.createElement('script');\n datatableScript.src =
'https://cdn.datatables.net/1.10.20/j [...]
+ },
+ "metadata": {}
+ },
+ {
+ "output_type": "execute_result",
+ "data": {
+ "text/html": [
+ "<div>\n",
+ "<style scoped>\n",
+ " .dataframe tbody tr th:only-of-type {\n",
+ " vertical-align: middle;\n",
+ " }\n",
+ "\n",
+ " .dataframe tbody tr th {\n",
+ " vertical-align: top;\n",
+ " }\n",
+ "\n",
+ " .dataframe thead th {\n",
+ " text-align: right;\n",
+ " }\n",
+ "</style>\n",
+ "<table border=\"1\" class=\"dataframe\">\n",
+ " <thead>\n",
+ " <tr style=\"text-align: right;\">\n",
+ " <th></th>\n",
+ " <th>timestamp</th>\n",
+ " <th>event</th>\n",
+ " </tr>\n",
+ " </thead>\n",
+ " <tbody>\n",
+ " <tr>\n",
+ " <th>solar_events.csv:0</th>\n",
+ " <td>2021-03-20 09:37:00</td>\n",
+ " <td>March Equinox</td>\n",
+ " </tr>\n",
+ " <tr>\n",
+ " <th>solar_events.csv:1</th>\n",
+ " <td>2021-06-21 03:32:00</td>\n",
+ " <td>June Solstice</td>\n",
+ " </tr>\n",
+ " <tr>\n",
+ " <th>solar_events.csv:2</th>\n",
+ " <td>2021-09-22 19:21:00</td>\n",
+ " <td>September Equinox</td>\n",
+ " </tr>\n",
+ " <tr>\n",
+ " <th>solar_events.csv:3</th>\n",
+ " <td>2021-12-21 15:59:00</td>\n",
+ " <td>December Solstice</td>\n",
+ " </tr>\n",
+ " </tbody>\n",
+ "</table>\n",
+ "</div>"
+ ],
+ "text/plain": [
+ " timestamp event\n",
+ "solar_events.csv:0 2021-03-20 09:37:00 March Equinox\n",
+ "solar_events.csv:1 2021-06-21 03:32:00 June Solstice\n",
+ "solar_events.csv:2 2021-09-22 19:21:00 September Equinox\n",
+ "solar_events.csv:3 2021-12-21 15:59:00 December Solstice"
+ ]
+ },
+ "metadata": {},
+ "execution_count": 3
+ }
+ ],
+ "metadata": {
+ "colab": {
+ "base_uri": "https://localhost:8080/",
+ "height": 242
+ },
+ "id": "sKAMXD5ElhYP",
+ "outputId": "928d9ad7-ae75-42d7-8dc6-8c5afd730b11"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "Collecting a Beam DataFrame into a Pandas DataFrame is useful to
perform\n",
+ "[operations not supported by Beam
DataFrames](https://beam.apache.org/documentation/dsls/dataframes/differences-from-pandas#classes-of-unsupported-operations).\n",
+ "\n",
+ "For example, let's say we want to take only the first two events in
chronological order.\n",
+ "Since a deferred Beam DataFrame does not have any ordering
guarantees,\n",
+ "first we need to sort the values.\n",
+ "In Pandas, we could first\n",
+
"[`df.sort_values(by='timestamp')`](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.sort_values.html)
and then\n",
+
"[`df.head(2)`](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.head.html)
to achieve this.\n",
+ "\n",
+ "However, these are\n",
+ "[order-sensitive
operations](https://beam.apache.org/documentation/dsls/dataframes/differences-from-pandas#order-sensitive-operations)\n",
+ "so using them in a Beam DataFrame raises a\n",
+
"[`WontImplementError`](https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.frame_base.html#apache_beam.dataframe.frame_base.WontImplementError).\n",
+ "We can work around this by using `collect` to convert the Beam
DataFrame into a Pandas DataFrame."
+ ],
+ "metadata": {
+ "id": "t3Is6dArtN_Z"
+ }
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 4,
+ "source": [
+ "import apache_beam.runners.interactive.interactive_beam as ib\n",
+ "\n",
+ "# Collect the Beam DataFrame into a Pandas DataFrame.\n",
+ "df = ib.collect(beam_df)\n",
+ "\n",
+ "# We can now use any Pandas transforms with our data.\n",
+ "df.sort_values(by='timestamp').head(2)"
+ ],
+ "outputs": [
+ {
+ "output_type": "display_data",
+ "data": {
+ "text/html": [
+ "\n",
+ " <link rel=\"stylesheet\"
href=\"https://stackpath.bootstrapcdn.com/bootstrap/4.4.1/css/bootstrap.min.css\"
integrity=\"sha384-Vkoo8x4CGsO3+Hhxv8T/Q5PaXtkKtu6ug5TOeNV6gBiFeWPGFN9MuhOf23Q9Ifjh\"
crossorigin=\"anonymous\">\n",
+ " <div
id=\"progress_indicator_4486e01c01f75e7a68a4a5fefa9ecd2c\"
class=\"spinner-border text-info\" role=\"status\">\n",
+ " </div>"
+ ],
+ "text/plain": [
+ "<IPython.core.display.HTML object>"
+ ]
+ },
+ "metadata": {}
+ },
+ {
+ "output_type": "display_data",
+ "data": {
+ "application/javascript": "\n if (typeof
window.interactive_beam_jquery == 'undefined') {\n var jqueryScript =
document.createElement('script');\n jqueryScript.src =
'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n
jqueryScript.type = 'text/javascript';\n jqueryScript.onload =
function() {\n var datatableScript =
document.createElement('script');\n datatableScript.src =
'https://cdn.datatables.net/1.10.20/j [...]
+ },
+ "metadata": {}
+ },
+ {
+ "output_type": "execute_result",
+ "data": {
+ "text/html": [
+ "<div>\n",
+ "<style scoped>\n",
+ " .dataframe tbody tr th:only-of-type {\n",
+ " vertical-align: middle;\n",
+ " }\n",
+ "\n",
+ " .dataframe tbody tr th {\n",
+ " vertical-align: top;\n",
+ " }\n",
+ "\n",
+ " .dataframe thead th {\n",
+ " text-align: right;\n",
+ " }\n",
+ "</style>\n",
+ "<table border=\"1\" class=\"dataframe\">\n",
+ " <thead>\n",
+ " <tr style=\"text-align: right;\">\n",
+ " <th></th>\n",
+ " <th>timestamp</th>\n",
+ " <th>event</th>\n",
+ " </tr>\n",
+ " </thead>\n",
+ " <tbody>\n",
+ " <tr>\n",
+ " <th>solar_events.csv:0</th>\n",
+ " <td>2021-03-20 09:37:00</td>\n",
+ " <td>March Equinox</td>\n",
+ " </tr>\n",
+ " <tr>\n",
+ " <th>solar_events.csv:1</th>\n",
+ " <td>2021-06-21 03:32:00</td>\n",
+ " <td>June Solstice</td>\n",
+ " </tr>\n",
+ " </tbody>\n",
+ "</table>\n",
+ "</div>"
+ ],
+ "text/plain": [
+ " timestamp event\n",
+ "solar_events.csv:0 2021-03-20 09:37:00 March Equinox\n",
+ "solar_events.csv:1 2021-06-21 03:32:00 June Solstice"
+ ]
+ },
+ "metadata": {},
+ "execution_count": 4
+ }
+ ],
+ "metadata": {
+ "colab": {
+ "base_uri": "https://localhost:8080/",
+ "height": 138
+ },
+ "id": "8haEu6_9iTi7",
+ "outputId": "a1e07bdc-c66d-45e5-efff-90b93219c648"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "> ℹ️ Note that `collect` is _only_ accessible if you’re using\n",
+ "[Interactive
Beam](https://beam.apache.org/releases/pydoc/current/apache_beam.runners.interactive.interactive_beam.html)"
+ ],
+ "metadata": {
+ "id": "ZkthQ13pwpm0"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Beam DataFrames to PCollections\n",
+ "\n",
+ "If you have your data as a Beam DataFrame, you can convert it into a
regular PCollection with\n",
+
"[`to_pcollection`](https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.convert.html#apache_beam.dataframe.convert.to_pcollection).\n",
+ "\n",
+ "Converting a Beam DataFrame in this way yields a PCollection with a
[schema](https://beam.apache.org/documentation/programming-guide/#what-is-a-schema).\n",
+ "This allows us to easily access each property by attribute, for
example `element.event` and `element.timestamp`.\n",
+ "\n",
+ "Sometimes it's more convenient to convert the named tuples to Python
dictionaries.\n",
+ "We can do that with the\n",
+
"[`_asdict`](https://docs.python.org/3/library/collections.html#collections.somenamedtuple._asdict)\n",
+ "method."
+ ],
+ "metadata": {
+ "id": "ujRm4K0iP8SX"
+ }
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 5,
+ "source": [
+ "import apache_beam as beam\n",
+ "from apache_beam.dataframe import convert\n",
+ "\n",
+ "with beam.Pipeline() as pipeline:\n",
+ " beam_df = pipeline | 'Read CSV' >>
beam.dataframe.io.read_csv('solar_events.csv')\n",
+ "\n",
+ " (\n",
+ " # Convert the Beam DataFrame to a PCollection.\n",
+ " convert.to_pcollection(beam_df)\n",
+ "\n",
+ " # We get named tuples, we can convert them to dictionaries like
this.\n",
+ " | 'To dictionaries' >> beam.Map(lambda x: dict(x._asdict()))\n",
+ "\n",
+ " # Print the elements in the PCollection.\n",
+ " | 'Print' >> beam.Map(print)\n",
+ " )"
+ ],
+ "outputs": [
+ {
+ "output_type": "stream",
+ "name": "stderr",
+ "text": [
+ "WARNING:root:Make sure that locally built Python SDK docker image
has Python 3.7 interpreter.\n"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "name": "stdout",
+ "text": [
+ "{'timestamp': '2021-03-20 09:37:00', 'event': 'March Equinox'}\n",
+ "{'timestamp': '2021-06-21 03:32:00', 'event': 'June Solstice'}\n",
+ "{'timestamp': '2021-09-22 19:21:00', 'event': 'September
Equinox'}\n",
+ "{'timestamp': '2021-12-21 15:59:00', 'event': 'December
Solstice'}\n"
+ ]
+ }
+ ],
+ "metadata": {
+ "colab": {
+ "base_uri": "https://localhost:8080/"
+ },
+ "id": "g22op8rZPvB3",
+ "outputId": "bba88b0b-4d19-4d61-dac7-2c168998a2e4"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Pandas DataFrames to PCollections\n",
+ "\n",
+ "If you have your data as a Pandas DataFrame, you can convert it into
a regular PCollection with\n",
+
"[`to_pcollection`](https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.convert.html#apache_beam.dataframe.convert.to_pcollection).\n",
+ "\n",
+ "Since Pandas DataFrames are not part of any Beam pipeline, we must
provide the `pipeline` explicitly."
+ ],
+ "metadata": {
+ "id": "t6xNIO0iPwtn"
+ }
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 6,
+ "source": [
+ "import pandas as pd\n",
+ "import apache_beam as beam\n",
+ "from apache_beam.dataframe import convert\n",
+ "\n",
+ "with beam.Pipeline() as pipeline:\n",
+ " df = pd.read_csv('solar_events.csv')\n",
+ "\n",
+ " (\n",
+ " # Convert the Pandas DataFrame to a PCollection.\n",
+ " convert.to_pcollection(df, pipeline=pipeline)\n",
+ "\n",
+ " # We get named tuples, we can convert them to dictionaries like
this.\n",
+ " | 'To dictionaries' >> beam.Map(lambda x: dict(x._asdict()))\n",
+ "\n",
+ " # Print the elements in the PCollection.\n",
+ " | 'Print' >> beam.Map(print)\n",
+ " )"
+ ],
+ "outputs": [
+ {
+ "output_type": "stream",
+ "name": "stderr",
+ "text": [
+ "WARNING:root:Make sure that locally built Python SDK docker image
has Python 3.7 interpreter.\n"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "name": "stdout",
+ "text": [
+ "{'timestamp': '2021-03-20 09:37:00', 'event': 'March Equinox'}\n",
+ "{'timestamp': '2021-06-21 03:32:00', 'event': 'June Solstice'}\n",
+ "{'timestamp': '2021-09-22 19:21:00', 'event': 'September
Equinox'}\n",
+ "{'timestamp': '2021-12-21 15:59:00', 'event': 'December
Solstice'}\n"
+ ]
+ }
+ ],
+ "metadata": {
+ "colab": {
+ "base_uri": "https://localhost:8080/"
+ },
+ "id": "YWYVFkvFuksz",
+ "outputId": "a3e3e6fa-85ce-4891-95a0-389fba4461a6"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "If you have your data as a PCollection of Pandas DataFrames, you can
convert them into a PCollection with\n",
+
"[`FlatMap`](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap).\n",
+ "\n",
+ "> ℹ️ If the number of elements in each DataFrame can be very
different (that is, some DataFrames might contain thousands of elements while
others contain only a handful of elements), it might be a good idea to\n",
+ ">
[`Reshuffle`](https://beam.apache.org/documentation/transforms/python/other/reshuffle).\n",
+ "> This basically rebalances the elements in the PCollection, which
helps make sure all the workers have a balanced number of elements."
+ ],
+ "metadata": {
+ "id": "z6Q_tyWszkMC"
+ }
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 7,
+ "source": [
+ "import pandas as pd\n",
+ "import apache_beam as beam\n",
+ "\n",
+ "with beam.Pipeline() as pipeline:\n",
+ " (\n",
+ " pipeline\n",
+ " | 'Filename' >> beam.Create(['solar_events.csv'])\n",
+ "\n",
+ " # Each element is a Pandas DataFrame, so we can do any Pandas
operation.\n",
+ " | 'Read CSV' >> beam.Map(pd.read_csv)\n",
+ "\n",
+ " # We yield each element of all the DataFrames into a
PCollection of dictionaries.\n",
+ " | 'To dictionaries' >> beam.FlatMap(lambda df:
df.to_dict('records'))\n",
+ "\n",
+ " # Reshuffle to make sure parallelization is balanced.\n",
+ " | 'Reshuffle' >> beam.Reshuffle()\n",
+ "\n",
+ " # Print the elements in the PCollection.\n",
+ " | 'Print' >> beam.Map(print)\n",
+ " )"
+ ],
+ "outputs": [
+ {
+ "output_type": "stream",
+ "name": "stderr",
+ "text": [
+ "WARNING:root:Make sure that locally built Python SDK docker image
has Python 3.7 interpreter.\n"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "name": "stdout",
+ "text": [
+ "{'timestamp': '2021-03-20 09:37:00', 'event': 'March Equinox'}\n",
+ "{'timestamp': '2021-06-21 03:32:00', 'event': 'June Solstice'}\n",
+ "{'timestamp': '2021-09-22 19:21:00', 'event': 'September
Equinox'}\n",
+ "{'timestamp': '2021-12-21 15:59:00', 'event': 'December
Solstice'}\n"
+ ]
+ }
+ ],
+ "metadata": {
+ "colab": {
+ "base_uri": "https://localhost:8080/"
+ },
+ "id": "fVWjO2Zfziqu",
+ "outputId": "c5db7be4-f764-487a-bc3b-bd5cbad4e396"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# PCollections to Beam DataFrames\n",
+ "\n",
+ "If you have your data as a PCollection, you can convert it into a
deferred Beam DataFrame with\n",
+
"[`to_dataframe`](https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.convert.html#apache_beam.dataframe.convert.to_dataframe).\n",
+ "\n",
+ "> ℹ️ To convert a PCollection to a Beam DataFrame, each element
_must_ have a\n",
+
"[schema](https://beam.apache.org/documentation/programming-guide/#what-is-a-schema)."
+ ],
+ "metadata": {
+ "id": "_Dm2u71EIRFr"
+ }
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 8,
+ "source": [
+ "import csv\n",
+ "import apache_beam as beam\n",
+ "from apache_beam.dataframe import convert\n",
+ "\n",
+ "with open('solar_events.csv') as f:\n",
+ " solar_events = [dict(row) for row in csv.DictReader(f)]\n",
+ "\n",
+ "with beam.Pipeline() as pipeline:\n",
+ " pcoll = pipeline | 'Create data' >> beam.Create(solar_events)\n",
+ "\n",
+ " # Convert the PCollection into a Beam DataFrame\n",
+ " beam_df = convert.to_dataframe(pcoll | 'To Rows' >> beam.Map(\n",
+ " lambda x: beam.Row(\n",
+ " timestamp=x['timestamp'],\n",
+ " event=x['event'],\n",
+ " )\n",
+ " ))\n",
+ "\n",
+ " # Print the elements in the Beam DataFrame.\n",
+ " (\n",
+ " convert.to_pcollection(beam_df)\n",
+ " | 'To dictionaries' >> beam.Map(lambda x: dict(x._asdict()))\n",
+ " | 'Print' >> beam.Map(print)\n",
+ " )"
+ ],
+ "outputs": [
+ {
+ "output_type": "stream",
+ "name": "stderr",
+ "text": [
+ "WARNING:root:Make sure that locally built Python SDK docker image
has Python 3.7 interpreter.\n"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "name": "stdout",
+ "text": [
+ "{'timestamp': '2021-03-20 09:37:00', 'event': 'March Equinox'}\n",
+ "{'timestamp': '2021-06-21 03:32:00', 'event': 'June Solstice'}\n",
+ "{'timestamp': '2021-09-22 19:21:00', 'event': 'September
Equinox'}\n",
+ "{'timestamp': '2021-12-21 15:59:00', 'event': 'December
Solstice'}\n"
+ ]
+ }
+ ],
+ "metadata": {
+ "colab": {
+ "base_uri": "https://localhost:8080/"
+ },
+ "id": "N6dVNkkEIWa_",
+ "outputId": "16556170-fbf6-4980-962c-bb466d0b76b2"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# PCollections to Pandas DataFrames\n",
+ "\n",
+ "If you have your data as a PCollection, you can convert it into an
in-memory Pandas DataFrame via a\n",
+ "[side
input](https://beam.apache.org/documentation/programming-guide#side-inputs).\n",
+ "\n",
+ "> ℹ️ It's recommended to **only** do this if you need to use a Pandas
operation that is\n",
+ "> [not supported in Beam
DataFrames](https://beam.apache.org/documentation/dsls/dataframes/differences-from-pandas/#classes-of-unsupported-operations).\n",
+ "> Converting a PCollection into a Pandas DataFrame consolidates
elements from potentially multiple workers into a single worker, which could
create a performance bottleneck.\n"
+ "\n",
+ "> ⚠️ Pandas DataFrames are in-memory data structures, so make sure
all the elements in the PCollection fit into memory.\n",
+ "> If they don't fit into memory, consider yielding multiple DataFrame
elements via\n",
+ ">
[`FlatMap`](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap)."
+ ],
+ "metadata": {
+ "id": "kj08jOZQQa_q"
+ }
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 9,
+ "source": [
+ "import csv\n",
+ "import pandas as pd\n",
+ "import apache_beam as beam\n",
+ "\n",
+ "with open('solar_events.csv') as f:\n",
+ " solar_events = [dict(row) for row in csv.DictReader(f)]\n",
+ "\n",
+ "with beam.Pipeline() as pipeline:\n",
+ " pcoll = pipeline | 'Create data' >> beam.Create(solar_events)\n",
+ "\n",
+ " (\n",
+ " pipeline\n",
+ "\n",
+ " # Create a single element containing the entire PCollection.
\n",
+ " | 'Singleton' >> beam.Create([None])\n",
+ " | 'As Pandas' >> beam.Map(\n",
+ " lambda _, dict_iter: pd.DataFrame(dict_iter),\n",
+ " dict_iter=beam.pvalue.AsIter(pcoll),\n",
+ " )\n",
+ "\n",
+ " # Print the Pandas DataFrame.\n",
+ " | 'Print' >> beam.Map(print)\n",
+ " )"
+ ],
+ "outputs": [
+ {
+ "output_type": "stream",
+ "name": "stderr",
+ "text": [
+ "WARNING:root:Make sure that locally built Python SDK docker image
has Python 3.7 interpreter.\n"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "name": "stdout",
+ "text": [
+ " timestamp event\n",
+ "0 2021-03-20 09:37:00 March Equinox\n",
+ "1 2021-06-21 03:32:00 June Solstice\n",
+ "2 2021-09-22 19:21:00 September Equinox\n",
+ "3 2021-12-21 15:59:00 December Solstice\n"
+ ]
+ }
+ ],
+ "metadata": {
+ "colab": {
+ "base_uri": "https://localhost:8080/"
+ },
+ "id": "cHZdiPbOG-sy",
+ "outputId": "11c84948-fccf-41fd-c276-7c5803264ff7"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# What's next?\n",
+ "\n",
+ "* [Beam DataFrames
overview](https://beam.apache.org/documentation/dsls/dataframes/overview) -- an
overview of the Beam DataFrames API.\n",
+ "* [Differences from
pandas](https://beam.apache.org/documentation/dsls/dataframes/differences-from-pandas)
-- goes through some of the differences between Beam DataFrames and Pandas
DataFrames, as well as some of the workarounds for unsupported operations.\n",
+ "* [10 minutes to
Pandas](https://pandas.pydata.org/pandas-docs/stable/user_guide/10min.html) --
a quickstart guide to Pandas DataFrames.\n",
+ "* [Pandas DataFrame
API](https://pandas.pydata.org/pandas-docs/stable/reference/frame.html) -- the
API reference for Pandas DataFrames"
+ ],
+ "metadata": {
+ "id": "UflW6AJp6-ss"
+ }
+ }
+ ]
+}
diff --git
a/website/www/site/content/en/documentation/dsls/dataframes/differences-from-pandas.md
b/website/www/site/content/en/documentation/dsls/dataframes/differences-from-pandas.md
index fb432e9..edcdddc 100644
---
a/website/www/site/content/en/documentation/dsls/dataframes/differences-from-pandas.md
+++
b/website/www/site/content/en/documentation/dsls/dataframes/differences-from-pandas.md
@@ -87,6 +87,6 @@ The Beam DataFrame API implements many of the commonly used
pandas DataFrame ope
Interactive Beam is a module designed for use in interactive notebooks. The
module, which by convention is imported as `ib`, provides an `ib.collect`
function that brings a `PCollection` or deferred DataFrame into local memory as
a pandas DataFrame. After using `ib.collect` to materialize a deferred
DataFrame you will be able to perform any operation in the pandas API, not just
those that are supported in Beam.
-<!-- TODO: Add code sample: https://issues.apache.org/jira/browse/BEAM-12535
-->
+{{< button-colab
url="https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/tour-of-beam/dataframes.ipynb"
>}}
To get started with Beam in a notebook, see [Try Apache
Beam](https://beam.apache.org/get-started/try-apache-beam/).
diff --git
a/website/www/site/content/en/documentation/dsls/dataframes/overview.md
b/website/www/site/content/en/documentation/dsls/dataframes/overview.md
index e08c61b..8d2c0a8 100644
--- a/website/www/site/content/en/documentation/dsls/dataframes/overview.md
+++ b/website/www/site/content/en/documentation/dsls/dataframes/overview.md
@@ -18,6 +18,8 @@ limitations under the License.
# Beam DataFrames overview
+{{< button-colab
url="https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/tour-of-beam/dataframes.ipynb"
>}}
+
The Apache Beam Python SDK provides a DataFrame API for working with
pandas-like
[DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)
objects. The feature lets you convert a PCollection to a DataFrame and then
interact with the DataFrame using the standard methods available on the pandas
DataFrame API. The DataFrame API is built on top of the pandas implementation,
and pandas DataFrame methods are invoked on subsets of the datasets in
parallel. Th [...]
You can think of Beam DataFrames as a domain-specific language (DSL) for Beam
pipelines. Similar to [Beam
SQL](https://beam.apache.org/documentation/dsls/sql/overview/), DataFrames is a
DSL built into the Beam Python SDK. Using this DSL, you can create pipelines
without referencing standard Beam constructs like
[ParDo](https://beam.apache.org/documentation/transforms/python/elementwise/pardo/)
or
[CombinePerKey](https://beam.apache.org/documentation/transforms/python/aggregation/combinep
[...]
@@ -115,3 +117,5 @@ pc1, pc2 = {'a': pc} | DataframeTransform(lambda a: expr1,
expr2)
[pydoc_dataframe_transform]:
https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.transforms.html#apache_beam.dataframe.transforms.DataframeTransform
[pydoc_sql_transform]:
https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.sql.html#apache_beam.transforms.sql.SqlTransform
+
+{{< button-colab
url="https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/tour-of-beam/dataframes.ipynb"
>}}
diff --git a/website/www/site/content/en/get-started/tour-of-beam.md
b/website/www/site/content/en/get-started/tour-of-beam.md
index 9e9589b..b2f1484 100644
--- a/website/www/site/content/en/get-started/tour-of-beam.md
+++ b/website/www/site/content/en/get-started/tour-of-beam.md
@@ -49,6 +49,15 @@ We introduce the `GlobalWindow`, `FixedWindows`,
`SlidingWindows`, and `Sessions
{{< button-colab
url="https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/tour-of-beam/windowing.ipynb"
>}}
+### DataFrames
+
+Beam DataFrames provide a pandas-like
[DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)
+API to declare Beam pipelines.
+To learn more about Beam DataFrames, take a look at the
+[Beam DataFrames
overview](https://beam.apache.org/documentation/dsls/dataframes/overview) page.
+
+{{< button-colab
url="https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/tour-of-beam/dataframes.ipynb"
>}}
+
## Transforms
Check the [Python transform
catalog](/documentation/transforms/python/overview/)