TheNeuralBit commented on a change in pull request #15470:
URL: https://github.com/apache/beam/pull/15470#discussion_r704841059



##########
File path: examples/notebooks/tour-of-beam/dataframes.ipynb
##########
@@ -0,0 +1,744 @@
+{
+  "nbformat": 4,
+  "nbformat_minor": 2,
+  "metadata": {
+    "colab": {
+      "name": "Beam DataFrames",
+      "provenance": [],
+      "collapsed_sections": [],
+      "toc_visible": true
+    },
+    "kernelspec": {
+      "name": "python3",
+      "display_name": "Python 3"
+    },
+    "language_info": {
+      "name": "python"
+    }
+  },
+  "cells": [
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "source": [
+        "#@title ###### Licensed to the Apache Software Foundation (ASF), 
Version 2.0 (the \"License\")\n",
+        "\n",
+        "# Licensed to the Apache Software Foundation (ASF) under one\n",
+        "# or more contributor license agreements. See the NOTICE file\n",
+        "# distributed with this work for additional information\n",
+        "# regarding copyright ownership. The ASF licenses this file\n",
+        "# to you under the Apache License, Version 2.0 (the\n",
+        "# \"License\"); you may not use this file except in compliance\n",
+        "# with the License. You may obtain a copy of the License at\n",
+        "#\n",
+        "#   http://www.apache.org/licenses/LICENSE-2.0\n";,
+        "#\n",
+        "# Unless required by applicable law or agreed to in writing,\n",
+        "# software distributed under the License is distributed on an\n",
+        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+        "# KIND, either express or implied. See the License for the\n",
+        "# specific language governing permissions and limitations\n",
+        "# under the License."
+      ],
+      "outputs": [],
+      "metadata": {
+        "cellView": "form",
+        "id": "rz2qIC9IL2rI"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "# Beam DataFrames\n",
+        "\n",
+        "<button>\n",
+        "  <a 
href=\"https://beam.apache.org/documentation/dsls/dataframes/overview/\";>\n",
+        "    <img src=\"https://beam.apache.org/images/favicon.ico\"; 
alt=\"Open the docs\" height=\"16\"/>\n",
+        "    Beam DataFrames overview\n",
+        "  </a>\n",
+        "</button>\n",
+        "\n",
+        "Beam DataFrames provide a pandas-like 
[DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)\n",
+        "API to declare Beam pipelines.\n",
+        "\n",
+        "> ℹ️ To learn more about Beam DataFrames, take a look at the\n",
+        "[Beam DataFrames 
overview](https://beam.apache.org/documentation/dsls/dataframes/overview) 
page.\n",
+        "\n",
+        "First, we need to install Apache Beam with the `interactive` extra 
for the Interactive runner."
+      ],
+      "metadata": {
+        "id": "hDuXLLSZnI1D"
+      }
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "source": [
+        "%pip install --quiet apache-beam[interactive]"

Review comment:
       ```suggestion
           "First, we need to install Apache Beam with the `interactive` extra 
for the Interactive runner, as well as a supported version of pandas."
         ],
         "metadata": {
           "id": "hDuXLLSZnI1D"
         }
       },
       {
         "cell_type": "code",
         "execution_count": null,
         "source": [
           "%pip install --quiet apache-beam[interactive],pandas==1.2.4"
   ```
   
   Could we make this explicitly install pandas? Technically the original 
command would work since interactive beam requires pandas, but it's good to 
make users aware that the Beam DataFrame API requires pandas.

##########
File path: examples/notebooks/tour-of-beam/dataframes.ipynb
##########
@@ -0,0 +1,744 @@
+{
+  "nbformat": 4,
+  "nbformat_minor": 2,
+  "metadata": {
+    "colab": {
+      "name": "Beam DataFrames",
+      "provenance": [],
+      "collapsed_sections": [],
+      "toc_visible": true
+    },
+    "kernelspec": {
+      "name": "python3",
+      "display_name": "Python 3"
+    },
+    "language_info": {
+      "name": "python"
+    }
+  },
+  "cells": [
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "source": [
+        "#@title ###### Licensed to the Apache Software Foundation (ASF), 
Version 2.0 (the \"License\")\n",
+        "\n",
+        "# Licensed to the Apache Software Foundation (ASF) under one\n",
+        "# or more contributor license agreements. See the NOTICE file\n",
+        "# distributed with this work for additional information\n",
+        "# regarding copyright ownership. The ASF licenses this file\n",
+        "# to you under the Apache License, Version 2.0 (the\n",
+        "# \"License\"); you may not use this file except in compliance\n",
+        "# with the License. You may obtain a copy of the License at\n",
+        "#\n",
+        "#   http://www.apache.org/licenses/LICENSE-2.0\n";,
+        "#\n",
+        "# Unless required by applicable law or agreed to in writing,\n",
+        "# software distributed under the License is distributed on an\n",
+        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+        "# KIND, either express or implied. See the License for the\n",
+        "# specific language governing permissions and limitations\n",
+        "# under the License."
+      ],
+      "outputs": [],
+      "metadata": {
+        "cellView": "form",
+        "id": "rz2qIC9IL2rI"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "# Beam DataFrames\n",
+        "\n",
+        "<button>\n",
+        "  <a 
href=\"https://beam.apache.org/documentation/dsls/dataframes/overview/\";>\n",
+        "    <img src=\"https://beam.apache.org/images/favicon.ico\"; 
alt=\"Open the docs\" height=\"16\"/>\n",
+        "    Beam DataFrames overview\n",
+        "  </a>\n",
+        "</button>\n",
+        "\n",
+        "Beam DataFrames provide a pandas-like 
[DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)\n",
+        "API to declare Beam pipelines.\n",
+        "\n",
+        "> ℹ️ To learn more about Beam DataFrames, take a look at the\n",
+        "[Beam DataFrames 
overview](https://beam.apache.org/documentation/dsls/dataframes/overview) 
page.\n",
+        "\n",
+        "First, we need to install Apache Beam with the `interactive` extra 
for the Interactive runner."
+      ],
+      "metadata": {
+        "id": "hDuXLLSZnI1D"
+      }
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "source": [
+        "%pip install --quiet apache-beam[interactive]"
+      ],
+      "outputs": [],
+      "metadata": {
+        "id": "8QVByaWjkarZ"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "Lets create a small data file of\n",
+        "[Comma-Separated Values 
(CSV)](https://en.wikipedia.org/wiki/Comma-separated_values).\n",
+        "It simply includes the dates of the\n",
+        "[equinoxes](https://en.wikipedia.org/wiki/Equinox) and\n",
+        "[solstices](https://en.wikipedia.org/wiki/Solstice)\n",
+        "of the year 2021."
+      ],
+      "metadata": {
+        "id": "aLqdbX4Mgipq"
+      }
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "source": [
+        "%%writefile solar_events.csv\n",
+        "timestamp,event\n",
+        "2021-03-20 09:37:00,March Equinox\n",
+        "2021-06-21 03:32:00,June Solstice\n",
+        "2021-09-22 19:21:00,September Equinox\n",
+        "2021-12-21 15:59:00,December Solstice"
+      ],
+      "outputs": [],
+      "metadata": {
+        "id": "hZjwAm7qotrJ"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "# Interactive Beam\n",
+        "\n",
+        "Pandas has the\n",
+        
"[`pandas.read_csv`](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html)\n",
+        "function to easily read CSV files into DataFrames.\n",
+        "Beam has the\n",
+        
"[`beam.dataframe.io.read_csv`](https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.io.html#apache_beam.dataframe.io.read_csv)\n",
+        "function that emulates `pandas.read_csv`, but returns a deferred Beam 
DataFrame.\n",
+        "\n",
+        "If you’re using\n",
+        "[Interactive 
Beam](https://beam.apache.org/releases/pydoc/current/apache_beam.runners.interactive.interactive_beam.html),\n",
+        "you can use `collect` to bring a Beam DataFrame into local memory as 
a Pandas DataFrame."
+      ],
+      "metadata": {
+        "id": "Hv_58JulleQ_"
+      }
+    },
+    {
+      "cell_type": "code",
+      "execution_count": 3,
+      "source": [
+        "import apache_beam as beam\n",
+        "import apache_beam.runners.interactive.interactive_beam as ib\n",
+        "from apache_beam.runners.interactive.interactive_runner import 
InteractiveRunner\n",
+        "\n",
+        "pipeline = beam.Pipeline(InteractiveRunner())\n",
+        "\n",
+        "# Create a deferred Beam DataFrame with the contents of our csv 
file.\n",
+        "beam_df = pipeline | 'Read CSV' >> 
beam.dataframe.io.read_csv('solar_events.csv')\n",
+        "\n",
+        "# We can use `ib.collect` to view the contents of a Beam 
DataFrame.\n",
+        "ib.collect(beam_df)"
+      ],
+      "outputs": [
+        {
+          "output_type": "display_data",
+          "data": {
+            "application/javascript": "\n        if (typeof 
window.interactive_beam_jquery == 'undefined') {\n          var jqueryScript = 
document.createElement('script');\n          jqueryScript.src = 
'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n          
jqueryScript.type = 'text/javascript';\n          jqueryScript.onload = 
function() {\n            var datatableScript = 
document.createElement('script');\n            datatableScript.src = 
'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n            
datatableScript.type = 'text/javascript';\n            datatableScript.onload = 
function() {\n              window.interactive_beam_jquery = 
jQuery.noConflict(true);\n              
window.interactive_beam_jquery(document).ready(function($){\n                \n 
             });\n            }\n            
document.head.appendChild(datatableScript);\n          };\n          
document.head.appendChild(jqueryScript);\n        } else {\n          
window.interactive
 _beam_jquery(document).ready(function($){\n            \n          });\n       
 }"
+          },
+          "metadata": {}
+        },
+        {
+          "output_type": "display_data",
+          "data": {
+            "text/html": [
+              "\n",
+              "            <link rel=\"stylesheet\" 
href=\"https://stackpath.bootstrapcdn.com/bootstrap/4.4.1/css/bootstrap.min.css\";
 
integrity=\"sha384-Vkoo8x4CGsO3+Hhxv8T/Q5PaXtkKtu6ug5TOeNV6gBiFeWPGFN9MuhOf23Q9Ifjh\"
 crossorigin=\"anonymous\">\n",
+              "            <div 
id=\"progress_indicator_1516f4062e4fc6d4e58f33cf44c41c1d\" 
class=\"spinner-border text-info\" role=\"status\">\n",
+              "            </div>"
+            ],
+            "text/plain": [
+              "<IPython.core.display.HTML object>"
+            ]
+          },
+          "metadata": {}
+        },
+        {
+          "output_type": "stream",
+          "name": "stderr",
+          "text": [
+            "WARNING:root:Make sure that locally built Python SDK docker image 
has Python 3.7 interpreter.\n"
+          ]
+        },
+        {
+          "output_type": "display_data",
+          "data": {
+            "application/javascript": "\n        if (typeof 
window.interactive_beam_jquery == 'undefined') {\n          var jqueryScript = 
document.createElement('script');\n          jqueryScript.src = 
'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n          
jqueryScript.type = 'text/javascript';\n          jqueryScript.onload = 
function() {\n            var datatableScript = 
document.createElement('script');\n            datatableScript.src = 
'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n            
datatableScript.type = 'text/javascript';\n            datatableScript.onload = 
function() {\n              window.interactive_beam_jquery = 
jQuery.noConflict(true);\n              
window.interactive_beam_jquery(document).ready(function($){\n                \n 
           
$(\"#progress_indicator_1516f4062e4fc6d4e58f33cf44c41c1d\").remove();\n         
     });\n            }\n            
document.head.appendChild(datatableScript);\n          };\n          docum
 ent.head.appendChild(jqueryScript);\n        } else {\n          
window.interactive_beam_jquery(document).ready(function($){\n            \n     
       $(\"#progress_indicator_1516f4062e4fc6d4e58f33cf44c41c1d\").remove();\n  
        });\n        }"
+          },
+          "metadata": {}
+        },
+        {
+          "output_type": "execute_result",
+          "data": {
+            "text/html": [
+              "<div>\n",
+              "<style scoped>\n",
+              "    .dataframe tbody tr th:only-of-type {\n",
+              "        vertical-align: middle;\n",
+              "    }\n",
+              "\n",
+              "    .dataframe tbody tr th {\n",
+              "        vertical-align: top;\n",
+              "    }\n",
+              "\n",
+              "    .dataframe thead th {\n",
+              "        text-align: right;\n",
+              "    }\n",
+              "</style>\n",
+              "<table border=\"1\" class=\"dataframe\">\n",
+              "  <thead>\n",
+              "    <tr style=\"text-align: right;\">\n",
+              "      <th></th>\n",
+              "      <th>timestamp</th>\n",
+              "      <th>event</th>\n",
+              "    </tr>\n",
+              "  </thead>\n",
+              "  <tbody>\n",
+              "    <tr>\n",
+              "      <th>solar_events.csv:0</th>\n",
+              "      <td>2021-03-20 09:37:00</td>\n",
+              "      <td>March Equinox</td>\n",
+              "    </tr>\n",
+              "    <tr>\n",
+              "      <th>solar_events.csv:1</th>\n",
+              "      <td>2021-06-21 03:32:00</td>\n",
+              "      <td>June Solstice</td>\n",
+              "    </tr>\n",
+              "    <tr>\n",
+              "      <th>solar_events.csv:2</th>\n",
+              "      <td>2021-09-22 19:21:00</td>\n",
+              "      <td>September Equinox</td>\n",
+              "    </tr>\n",
+              "    <tr>\n",
+              "      <th>solar_events.csv:3</th>\n",
+              "      <td>2021-12-21 15:59:00</td>\n",
+              "      <td>December Solstice</td>\n",
+              "    </tr>\n",
+              "  </tbody>\n",
+              "</table>\n",
+              "</div>"
+            ],
+            "text/plain": [
+              "                              timestamp              event\n",
+              "solar_events.csv:0  2021-03-20 09:37:00      March Equinox\n",
+              "solar_events.csv:1  2021-06-21 03:32:00      June Solstice\n",
+              "solar_events.csv:2  2021-09-22 19:21:00  September Equinox\n",
+              "solar_events.csv:3  2021-12-21 15:59:00  December Solstice"
+            ]
+          },
+          "metadata": {},
+          "execution_count": 3
+        }
+      ],
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/";,
+          "height": 242
+        },
+        "id": "sKAMXD5ElhYP",
+        "outputId": "928d9ad7-ae75-42d7-8dc6-8c5afd730b11"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "Collecting a Beam DataFrame into a Pandas DataFrame is useful to 
perform\n",
+        "[operations not supported by Beam 
DataFrames](https://beam.apache.org/documentation/dsls/dataframes/differences-from-pandas#classes-of-unsupported-operations).\n",
+        "\n",
+        "For example, let's say we want to take only the first two events in 
chronological order.\n",
+        "Since a deferred Beam DataFrame does not have any ordering 
guarantees,\n",
+        "first we need to sort the values.\n",
+        "In Pandas, we could first\n",
+        
"[`df.sort_values(by='timestamp')`](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.sort_values.html)
 and then\n",
+        
"[`df.head(2)`](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.head.html)
 to achieve this.\n",
+        "\n",
+        "However, these are\n",
+        "[order-sensitive 
operations](https://beam.apache.org/documentation/dsls/dataframes/differences-from-pandas#order-sensitive-operations)\n",
+        "so using them in a Beam DataFrame raises a\n",
+        
"[`WontImplementError`](https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.frame_base.html#apache_beam.dataframe.frame_base.WontImplementError).\n",
+        "We can work around this by using `collect` to convert the Beam 
DataFrame into a Pandas DataFrame."
+      ],
+      "metadata": {
+        "id": "t3Is6dArtN_Z"
+      }
+    },
+    {
+      "cell_type": "code",
+      "execution_count": 4,
+      "source": [
+        "import apache_beam.runners.interactive.interactive_beam as ib\n",
+        "\n",
+        "# Collect the Beam DataFrame into a Pandas DataFrame.\n",
+        "df = ib.collect(beam_df)\n",
+        "\n",
+        "# We can now use any Pandas transforms with our data.\n",
+        "df.sort_values(by='timestamp').head(2)"
+      ],
+      "outputs": [
+        {
+          "output_type": "display_data",
+          "data": {
+            "text/html": [
+              "\n",
+              "            <link rel=\"stylesheet\" 
href=\"https://stackpath.bootstrapcdn.com/bootstrap/4.4.1/css/bootstrap.min.css\";
 
integrity=\"sha384-Vkoo8x4CGsO3+Hhxv8T/Q5PaXtkKtu6ug5TOeNV6gBiFeWPGFN9MuhOf23Q9Ifjh\"
 crossorigin=\"anonymous\">\n",
+              "            <div 
id=\"progress_indicator_4486e01c01f75e7a68a4a5fefa9ecd2c\" 
class=\"spinner-border text-info\" role=\"status\">\n",
+              "            </div>"
+            ],
+            "text/plain": [
+              "<IPython.core.display.HTML object>"
+            ]
+          },
+          "metadata": {}
+        },
+        {
+          "output_type": "display_data",
+          "data": {
+            "application/javascript": "\n        if (typeof 
window.interactive_beam_jquery == 'undefined') {\n          var jqueryScript = 
document.createElement('script');\n          jqueryScript.src = 
'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n          
jqueryScript.type = 'text/javascript';\n          jqueryScript.onload = 
function() {\n            var datatableScript = 
document.createElement('script');\n            datatableScript.src = 
'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n            
datatableScript.type = 'text/javascript';\n            datatableScript.onload = 
function() {\n              window.interactive_beam_jquery = 
jQuery.noConflict(true);\n              
window.interactive_beam_jquery(document).ready(function($){\n                \n 
           
$(\"#progress_indicator_4486e01c01f75e7a68a4a5fefa9ecd2c\").remove();\n         
     });\n            }\n            
document.head.appendChild(datatableScript);\n          };\n          docum
 ent.head.appendChild(jqueryScript);\n        } else {\n          
window.interactive_beam_jquery(document).ready(function($){\n            \n     
       $(\"#progress_indicator_4486e01c01f75e7a68a4a5fefa9ecd2c\").remove();\n  
        });\n        }"
+          },
+          "metadata": {}
+        },
+        {
+          "output_type": "execute_result",
+          "data": {
+            "text/html": [
+              "<div>\n",
+              "<style scoped>\n",
+              "    .dataframe tbody tr th:only-of-type {\n",
+              "        vertical-align: middle;\n",
+              "    }\n",
+              "\n",
+              "    .dataframe tbody tr th {\n",
+              "        vertical-align: top;\n",
+              "    }\n",
+              "\n",
+              "    .dataframe thead th {\n",
+              "        text-align: right;\n",
+              "    }\n",
+              "</style>\n",
+              "<table border=\"1\" class=\"dataframe\">\n",
+              "  <thead>\n",
+              "    <tr style=\"text-align: right;\">\n",
+              "      <th></th>\n",
+              "      <th>timestamp</th>\n",
+              "      <th>event</th>\n",
+              "    </tr>\n",
+              "  </thead>\n",
+              "  <tbody>\n",
+              "    <tr>\n",
+              "      <th>solar_events.csv:0</th>\n",
+              "      <td>2021-03-20 09:37:00</td>\n",
+              "      <td>March Equinox</td>\n",
+              "    </tr>\n",
+              "    <tr>\n",
+              "      <th>solar_events.csv:1</th>\n",
+              "      <td>2021-06-21 03:32:00</td>\n",
+              "      <td>June Solstice</td>\n",
+              "    </tr>\n",
+              "  </tbody>\n",
+              "</table>\n",
+              "</div>"
+            ],
+            "text/plain": [
+              "                              timestamp          event\n",
+              "solar_events.csv:0  2021-03-20 09:37:00  March Equinox\n",
+              "solar_events.csv:1  2021-06-21 03:32:00  June Solstice"
+            ]
+          },
+          "metadata": {},
+          "execution_count": 4
+        }
+      ],
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/";,
+          "height": 138
+        },
+        "id": "8haEu6_9iTi7",
+        "outputId": "a1e07bdc-c66d-45e5-efff-90b93219c648"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "> ℹ️ Note that `collect` is _only_ accessible if you’re using\n",
+        "[Interactive 
Beam](https://beam.apache.org/releases/pydoc/current/apache_beam.runners.interactive.interactive_beam.html)"
+      ],
+      "metadata": {
+        "id": "ZkthQ13pwpm0"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "# Beam DataFrames to PCollections\n",
+        "\n",
+        "If you have your data as a Beam DataFrame, you can convert it into a 
regular PCollection with\n",
+        
"[`to_pcollection`](https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.convert.html#apache_beam.dataframe.convert.to_pcollection).\n",
+        "\n",
+        "Converting a Beam DataFrame into a PCollection gives us each element 
as a\n",
+        
"[`namedtuple`](https://docs.python.org/3/library/collections.html#collections.namedtuple)\n",
+        "instance.\n",
+        "This allows us to easily access each property, for example 
`element.event` and `element.timestamp`.\n",
+        "\n",
+        "Sometimes it's more convenient to convert the named tuples to Python 
dictionaries.\n",
+        "We can do that with the\n",
+        
"[`_asdict`](https://docs.python.org/3/library/collections.html#collections.somenamedtuple._asdict)\n",
+        "method."
+      ],
+      "metadata": {
+        "id": "ujRm4K0iP8SX"
+      }
+    },
+    {
+      "cell_type": "code",
+      "execution_count": 5,
+      "source": [
+        "import apache_beam as beam\n",
+        "from apache_beam.dataframe import convert\n",
+        "\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  beam_df = pipeline | 'Read CSV' >> 
beam.dataframe.io.read_csv('solar_events.csv')\n",
+        "\n",
+        "  (\n",
+        "      # Convert the Beam DataFrame to a PCollection.\n",
+        "      convert.to_pcollection(beam_df)\n",
+        "\n",
+        "      # We get named tuples, we can convert them to dictionaries like 
this.\n",
+        "      | 'To dictionaries' >> beam.Map(lambda x: dict(x._asdict()))\n",
+        "\n",
+        "      # Print the elements in the PCollection.\n",
+        "      | 'Print' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "outputs": [
+        {
+          "output_type": "stream",
+          "name": "stderr",
+          "text": [
+            "WARNING:root:Make sure that locally built Python SDK docker image 
has Python 3.7 interpreter.\n"
+          ]
+        },
+        {
+          "output_type": "stream",
+          "name": "stdout",
+          "text": [
+            "{'timestamp': '2021-03-20 09:37:00', 'event': 'March Equinox'}\n",
+            "{'timestamp': '2021-06-21 03:32:00', 'event': 'June Solstice'}\n",
+            "{'timestamp': '2021-09-22 19:21:00', 'event': 'September 
Equinox'}\n",
+            "{'timestamp': '2021-12-21 15:59:00', 'event': 'December 
Solstice'}\n"
+          ]
+        }
+      ],
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/";
+        },
+        "id": "g22op8rZPvB3",
+        "outputId": "bba88b0b-4d19-4d61-dac7-2c168998a2e4"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "# Pandas DataFrames to PCollections\n",
+        "\n",
+        "If you have your data as a Pandas DataFrame, you can convert it into 
a regular PCollection with\n",
+        
"[`to_pcollection`](https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.convert.html#apache_beam.dataframe.convert.to_pcollection).\n",
+        "\n",
+        "Since Pandas DataFrames are not part of any Beam pipeline, we must 
provide the `pipeline` explicitly."
+      ],
+      "metadata": {
+        "id": "t6xNIO0iPwtn"
+      }
+    },
+    {
+      "cell_type": "code",
+      "execution_count": 6,
+      "source": [
+        "import pandas as pd\n",
+        "import apache_beam as beam\n",
+        "from apache_beam.dataframe import convert\n",
+        "\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  df = pd.read_csv('solar_events.csv')\n",
+        "\n",
+        "  (\n",
+        "      # Convert the Pandas DataFrame to a PCollection.\n",
+        "      convert.to_pcollection(df, pipeline=pipeline)\n",
+        "\n",
+        "      # We get named tuples, we can convert them to dictionaries like 
this.\n",
+        "      | 'To dictionaries' >> beam.Map(lambda x: dict(x._asdict()))\n",
+        "\n",
+        "      # Print the elements in the PCollection.\n",
+        "      | 'Print' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "outputs": [
+        {
+          "output_type": "stream",
+          "name": "stderr",
+          "text": [
+            "WARNING:root:Make sure that locally built Python SDK docker image 
has Python 3.7 interpreter.\n"
+          ]
+        },
+        {
+          "output_type": "stream",
+          "name": "stdout",
+          "text": [
+            "{'timestamp': '2021-03-20 09:37:00', 'event': 'March Equinox'}\n",
+            "{'timestamp': '2021-06-21 03:32:00', 'event': 'June Solstice'}\n",
+            "{'timestamp': '2021-09-22 19:21:00', 'event': 'September 
Equinox'}\n",
+            "{'timestamp': '2021-12-21 15:59:00', 'event': 'December 
Solstice'}\n"
+          ]
+        }
+      ],
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/";
+        },
+        "id": "YWYVFkvFuksz",
+        "outputId": "a3e3e6fa-85ce-4891-95a0-389fba4461a6"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "If you have your data as a PCollection of Pandas DataFrames, you can 
convert them into a PCollection with\n",
+        
"[`FlatMap`](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap).\n",
+        "\n",
+        "> ℹ️ If the number of elements in each DataFrame can be very 
different (that is, some DataFrames might contain thousands of elements while 
others contain only a handful of elements), it might be a good idea to\n",
+        "> 
[`Reshuffle`](https://beam.apache.org/documentation/transforms/python/other/reshuffle).\n",
+        "> This basically rebalances the elements in the PCollection, which 
helps make sure all the workers have a balanced number of elements."
+      ],
+      "metadata": {
+        "id": "z6Q_tyWszkMC"
+      }
+    },
+    {
+      "cell_type": "code",
+      "execution_count": 7,
+      "source": [
+        "import pandas as pd\n",
+        "import apache_beam as beam\n",
+        "\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Filename' >> beam.Create(['solar_events.csv'])\n",
+        "\n",
+        "      # Each element is a Pandas DataFrame, so we can do any Pandas 
operation.\n",
+        "      | 'Read CSV' >> beam.Map(pd.read_csv)\n",
+        "\n",
+        "      # We yield each element of all the DataFrames into a 
PCollection of dictionaries.\n",
+        "      | 'To dictionaries' >> beam.FlatMap(lambda df: 
df.to_dict('records'))\n",
+        "\n",
+        "      # Reshuffle to make sure parallelization is balanced.\n",
+        "      | 'Reshuffle' >> beam.Reshuffle()\n",
+        "\n",
+        "      # Print the elements in the PCollection.\n",
+        "      | 'Print' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "outputs": [
+        {
+          "output_type": "stream",
+          "name": "stderr",
+          "text": [
+            "WARNING:root:Make sure that locally built Python SDK docker image 
has Python 3.7 interpreter.\n"
+          ]
+        },
+        {
+          "output_type": "stream",
+          "name": "stdout",
+          "text": [
+            "{'timestamp': '2021-03-20 09:37:00', 'event': 'March Equinox'}\n",
+            "{'timestamp': '2021-06-21 03:32:00', 'event': 'June Solstice'}\n",
+            "{'timestamp': '2021-09-22 19:21:00', 'event': 'September 
Equinox'}\n",
+            "{'timestamp': '2021-12-21 15:59:00', 'event': 'December 
Solstice'}\n"
+          ]
+        }
+      ],
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/";
+        },
+        "id": "fVWjO2Zfziqu",
+        "outputId": "c5db7be4-f764-487a-bc3b-bd5cbad4e396"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "# PCollections to Beam DataFrames\n",
+        "\n",
+        "If you have your data as a PCollection, you can convert it into a 
deferred Beam DataFrame with\n",
+        
"[`to_dataframe`](https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.convert.html#apache_beam.dataframe.convert.to_dataframe).\n",
+        "\n",
+        "> ℹ️ To convert a PCollection to a Beam DataFrame, each element 
_must_ be a\n",
+        
"[`beam.Row`](https://beam.apache.org/releases/pydoc/current/apache_beam.pvalue.html#apache_beam.pvalue.Row)."
+      ],
+      "metadata": {
+        "id": "_Dm2u71EIRFr"
+      }
+    },
+    {
+      "cell_type": "code",
+      "execution_count": 8,
+      "source": [
+        "import csv\n",
+        "import apache_beam as beam\n",
+        "from apache_beam.dataframe import convert\n",
+        "\n",
+        "with open('solar_events.csv') as f:\n",
+        "  solar_events = [dict(row) for row in csv.DictReader(f)]\n",
+        "\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  pcoll = pipeline | 'Create data' >> beam.Create(solar_events)\n",
+        "\n",
+        "  # Convert the PCollection into a Beam DataFrame\n",
+        "  beam_df = convert.to_dataframe(pcoll | 'To Rows' >> beam.Map(\n",
+        "      lambda x: beam.Row(\n",
+        "          timestamp=x['timestamp'],\n",
+        "          event=x['event'],\n",
+        "      )\n",
+        "  ))\n",
+        "\n",
+        "  # Print the elements in the Beam DataFrame.\n",
+        "  (\n",
+        "      convert.to_pcollection(beam_df)\n",
+        "      | 'To dictionaries' >> beam.Map(lambda x: dict(x._asdict()))\n",
+        "      | 'Print' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "outputs": [
+        {
+          "output_type": "stream",
+          "name": "stderr",
+          "text": [
+            "WARNING:root:Make sure that locally built Python SDK docker image 
has Python 3.7 interpreter.\n"
+          ]
+        },
+        {
+          "output_type": "stream",
+          "name": "stdout",
+          "text": [
+            "{'timestamp': '2021-03-20 09:37:00', 'event': 'March Equinox'}\n",
+            "{'timestamp': '2021-06-21 03:32:00', 'event': 'June Solstice'}\n",
+            "{'timestamp': '2021-09-22 19:21:00', 'event': 'September 
Equinox'}\n",
+            "{'timestamp': '2021-12-21 15:59:00', 'event': 'December 
Solstice'}\n"
+          ]
+        }
+      ],
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/";
+        },
+        "id": "N6dVNkkEIWa_",
+        "outputId": "16556170-fbf6-4980-962c-bb466d0b76b2"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "# PCollections to Pandas DataFrames\n",
+        "\n",
+        "If you have your data as a PCollection, you can convert it into an 
in-memory Pandas DataFrame via a\n",
+        "[side 
input](https://beam.apache.org/documentation/programming-guide#side-inputs).\n",

Review comment:
       I think this pattern is a little confusing, do we really want to 
encourage it? I'd prefer people use the DataFrame API rather than something 
like this.

##########
File path: examples/notebooks/tour-of-beam/dataframes.ipynb
##########
@@ -0,0 +1,744 @@
+{
+  "nbformat": 4,
+  "nbformat_minor": 2,
+  "metadata": {
+    "colab": {
+      "name": "Beam DataFrames",
+      "provenance": [],
+      "collapsed_sections": [],
+      "toc_visible": true
+    },
+    "kernelspec": {
+      "name": "python3",
+      "display_name": "Python 3"
+    },
+    "language_info": {
+      "name": "python"
+    }
+  },
+  "cells": [
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "source": [
+        "#@title ###### Licensed to the Apache Software Foundation (ASF), 
Version 2.0 (the \"License\")\n",

Review comment:
       ```suggestion
           "###### Licensed to the Apache Software Foundation (ASF), Version 
2.0 (the \"License\")\n",
   ```
   
   nit: This shows up as a section in the table of contents, which looks a 
little odd. I think this fixes it?

##########
File path: website/www/site/content/en/documentation/dsls/dataframes/overview.md
##########
@@ -115,3 +117,5 @@ pc1, pc2 = {'a': pc} | DataframeTransform(lambda a: expr1, 
expr2)
 
 [pydoc_dataframe_transform]: 
https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.transforms.html#apache_beam.dataframe.transforms.DataframeTransform
 [pydoc_sql_transform]: 
https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.sql.html#apache_beam.transforms.sql.SqlTransform
+
+{{< button-colab 
url="https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/tour-of-beam/dataframes.ipynb";
 >}}

Review comment:
       There's actually a TODO to link to this from the "Differences from 
pandas" page: 
https://github.com/apache/beam/blob/77fcf291d1986a8592f960e6ba77c77bcf9dc4f4/website/www/site/content/en/documentation/dsls/dataframes/differences-from-pandas.md#L90
   
   Could you move it there?
   
   Also it would be really nice if we could make it so the "Interactive Beam" 
section shows up on the website directly, in addition to a "run in colab" 
button, but I'm not sure if that's possible.

##########
File path: examples/notebooks/tour-of-beam/dataframes.ipynb
##########
@@ -0,0 +1,744 @@
+{
+  "nbformat": 4,
+  "nbformat_minor": 2,
+  "metadata": {
+    "colab": {
+      "name": "Beam DataFrames",
+      "provenance": [],
+      "collapsed_sections": [],
+      "toc_visible": true
+    },
+    "kernelspec": {
+      "name": "python3",
+      "display_name": "Python 3"
+    },
+    "language_info": {
+      "name": "python"
+    }
+  },
+  "cells": [
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "source": [
+        "#@title ###### Licensed to the Apache Software Foundation (ASF), 
Version 2.0 (the \"License\")\n",
+        "\n",
+        "# Licensed to the Apache Software Foundation (ASF) under one\n",
+        "# or more contributor license agreements. See the NOTICE file\n",
+        "# distributed with this work for additional information\n",
+        "# regarding copyright ownership. The ASF licenses this file\n",
+        "# to you under the Apache License, Version 2.0 (the\n",
+        "# \"License\"); you may not use this file except in compliance\n",
+        "# with the License. You may obtain a copy of the License at\n",
+        "#\n",
+        "#   http://www.apache.org/licenses/LICENSE-2.0\n";,
+        "#\n",
+        "# Unless required by applicable law or agreed to in writing,\n",
+        "# software distributed under the License is distributed on an\n",
+        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+        "# KIND, either express or implied. See the License for the\n",
+        "# specific language governing permissions and limitations\n",
+        "# under the License."
+      ],
+      "outputs": [],
+      "metadata": {
+        "cellView": "form",
+        "id": "rz2qIC9IL2rI"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "# Beam DataFrames\n",
+        "\n",
+        "<button>\n",
+        "  <a 
href=\"https://beam.apache.org/documentation/dsls/dataframes/overview/\";>\n",
+        "    <img src=\"https://beam.apache.org/images/favicon.ico\"; 
alt=\"Open the docs\" height=\"16\"/>\n",
+        "    Beam DataFrames overview\n",
+        "  </a>\n",
+        "</button>\n",
+        "\n",
+        "Beam DataFrames provide a pandas-like 
[DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)\n",
+        "API to declare Beam pipelines.\n",
+        "\n",
+        "> ℹ️ To learn more about Beam DataFrames, take a look at the\n",
+        "[Beam DataFrames 
overview](https://beam.apache.org/documentation/dsls/dataframes/overview) 
page.\n",
+        "\n",
+        "First, we need to install Apache Beam with the `interactive` extra 
for the Interactive runner."
+      ],
+      "metadata": {
+        "id": "hDuXLLSZnI1D"
+      }
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "source": [
+        "%pip install --quiet apache-beam[interactive]"
+      ],
+      "outputs": [],
+      "metadata": {
+        "id": "8QVByaWjkarZ"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "Lets create a small data file of\n",
+        "[Comma-Separated Values 
(CSV)](https://en.wikipedia.org/wiki/Comma-separated_values).\n",
+        "It simply includes the dates of the\n",
+        "[equinoxes](https://en.wikipedia.org/wiki/Equinox) and\n",
+        "[solstices](https://en.wikipedia.org/wiki/Solstice)\n",
+        "of the year 2021."
+      ],
+      "metadata": {
+        "id": "aLqdbX4Mgipq"
+      }
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "source": [
+        "%%writefile solar_events.csv\n",
+        "timestamp,event\n",
+        "2021-03-20 09:37:00,March Equinox\n",
+        "2021-06-21 03:32:00,June Solstice\n",
+        "2021-09-22 19:21:00,September Equinox\n",
+        "2021-12-21 15:59:00,December Solstice"
+      ],
+      "outputs": [],
+      "metadata": {
+        "id": "hZjwAm7qotrJ"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "# Interactive Beam\n",
+        "\n",
+        "Pandas has the\n",
+        
"[`pandas.read_csv`](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html)\n",
+        "function to easily read CSV files into DataFrames.\n",
+        "Beam has the\n",
+        
"[`beam.dataframe.io.read_csv`](https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.io.html#apache_beam.dataframe.io.read_csv)\n",
+        "function that emulates `pandas.read_csv`, but returns a deferred Beam 
DataFrame.\n",
+        "\n",
+        "If you’re using\n",
+        "[Interactive 
Beam](https://beam.apache.org/releases/pydoc/current/apache_beam.runners.interactive.interactive_beam.html),\n",
+        "you can use `collect` to bring a Beam DataFrame into local memory as 
a Pandas DataFrame."
+      ],
+      "metadata": {
+        "id": "Hv_58JulleQ_"
+      }
+    },
+    {
+      "cell_type": "code",
+      "execution_count": 3,
+      "source": [
+        "import apache_beam as beam\n",
+        "import apache_beam.runners.interactive.interactive_beam as ib\n",
+        "from apache_beam.runners.interactive.interactive_runner import 
InteractiveRunner\n",
+        "\n",
+        "pipeline = beam.Pipeline(InteractiveRunner())\n",
+        "\n",
+        "# Create a deferred Beam DataFrame with the contents of our csv 
file.\n",
+        "beam_df = pipeline | 'Read CSV' >> 
beam.dataframe.io.read_csv('solar_events.csv')\n",
+        "\n",
+        "# We can use `ib.collect` to view the contents of a Beam 
DataFrame.\n",
+        "ib.collect(beam_df)"
+      ],
+      "outputs": [
+        {
+          "output_type": "display_data",
+          "data": {
+            "application/javascript": "\n        if (typeof 
window.interactive_beam_jquery == 'undefined') {\n          var jqueryScript = 
document.createElement('script');\n          jqueryScript.src = 
'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n          
jqueryScript.type = 'text/javascript';\n          jqueryScript.onload = 
function() {\n            var datatableScript = 
document.createElement('script');\n            datatableScript.src = 
'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n            
datatableScript.type = 'text/javascript';\n            datatableScript.onload = 
function() {\n              window.interactive_beam_jquery = 
jQuery.noConflict(true);\n              
window.interactive_beam_jquery(document).ready(function($){\n                \n 
             });\n            }\n            
document.head.appendChild(datatableScript);\n          };\n          
document.head.appendChild(jqueryScript);\n        } else {\n          
window.interactive
 _beam_jquery(document).ready(function($){\n            \n          });\n       
 }"
+          },
+          "metadata": {}
+        },
+        {
+          "output_type": "display_data",
+          "data": {
+            "text/html": [
+              "\n",
+              "            <link rel=\"stylesheet\" 
href=\"https://stackpath.bootstrapcdn.com/bootstrap/4.4.1/css/bootstrap.min.css\";
 
integrity=\"sha384-Vkoo8x4CGsO3+Hhxv8T/Q5PaXtkKtu6ug5TOeNV6gBiFeWPGFN9MuhOf23Q9Ifjh\"
 crossorigin=\"anonymous\">\n",
+              "            <div 
id=\"progress_indicator_1516f4062e4fc6d4e58f33cf44c41c1d\" 
class=\"spinner-border text-info\" role=\"status\">\n",
+              "            </div>"
+            ],
+            "text/plain": [
+              "<IPython.core.display.HTML object>"
+            ]
+          },
+          "metadata": {}
+        },
+        {
+          "output_type": "stream",
+          "name": "stderr",
+          "text": [
+            "WARNING:root:Make sure that locally built Python SDK docker image 
has Python 3.7 interpreter.\n"
+          ]
+        },
+        {
+          "output_type": "display_data",
+          "data": {
+            "application/javascript": "\n        if (typeof 
window.interactive_beam_jquery == 'undefined') {\n          var jqueryScript = 
document.createElement('script');\n          jqueryScript.src = 
'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n          
jqueryScript.type = 'text/javascript';\n          jqueryScript.onload = 
function() {\n            var datatableScript = 
document.createElement('script');\n            datatableScript.src = 
'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n            
datatableScript.type = 'text/javascript';\n            datatableScript.onload = 
function() {\n              window.interactive_beam_jquery = 
jQuery.noConflict(true);\n              
window.interactive_beam_jquery(document).ready(function($){\n                \n 
           
$(\"#progress_indicator_1516f4062e4fc6d4e58f33cf44c41c1d\").remove();\n         
     });\n            }\n            
document.head.appendChild(datatableScript);\n          };\n          docum
 ent.head.appendChild(jqueryScript);\n        } else {\n          
window.interactive_beam_jquery(document).ready(function($){\n            \n     
       $(\"#progress_indicator_1516f4062e4fc6d4e58f33cf44c41c1d\").remove();\n  
        });\n        }"
+          },
+          "metadata": {}
+        },
+        {
+          "output_type": "execute_result",
+          "data": {
+            "text/html": [
+              "<div>\n",
+              "<style scoped>\n",
+              "    .dataframe tbody tr th:only-of-type {\n",
+              "        vertical-align: middle;\n",
+              "    }\n",
+              "\n",
+              "    .dataframe tbody tr th {\n",
+              "        vertical-align: top;\n",
+              "    }\n",
+              "\n",
+              "    .dataframe thead th {\n",
+              "        text-align: right;\n",
+              "    }\n",
+              "</style>\n",
+              "<table border=\"1\" class=\"dataframe\">\n",
+              "  <thead>\n",
+              "    <tr style=\"text-align: right;\">\n",
+              "      <th></th>\n",
+              "      <th>timestamp</th>\n",
+              "      <th>event</th>\n",
+              "    </tr>\n",
+              "  </thead>\n",
+              "  <tbody>\n",
+              "    <tr>\n",
+              "      <th>solar_events.csv:0</th>\n",
+              "      <td>2021-03-20 09:37:00</td>\n",
+              "      <td>March Equinox</td>\n",
+              "    </tr>\n",
+              "    <tr>\n",
+              "      <th>solar_events.csv:1</th>\n",
+              "      <td>2021-06-21 03:32:00</td>\n",
+              "      <td>June Solstice</td>\n",
+              "    </tr>\n",
+              "    <tr>\n",
+              "      <th>solar_events.csv:2</th>\n",
+              "      <td>2021-09-22 19:21:00</td>\n",
+              "      <td>September Equinox</td>\n",
+              "    </tr>\n",
+              "    <tr>\n",
+              "      <th>solar_events.csv:3</th>\n",
+              "      <td>2021-12-21 15:59:00</td>\n",
+              "      <td>December Solstice</td>\n",
+              "    </tr>\n",
+              "  </tbody>\n",
+              "</table>\n",
+              "</div>"
+            ],
+            "text/plain": [
+              "                              timestamp              event\n",
+              "solar_events.csv:0  2021-03-20 09:37:00      March Equinox\n",
+              "solar_events.csv:1  2021-06-21 03:32:00      June Solstice\n",
+              "solar_events.csv:2  2021-09-22 19:21:00  September Equinox\n",
+              "solar_events.csv:3  2021-12-21 15:59:00  December Solstice"
+            ]
+          },
+          "metadata": {},
+          "execution_count": 3
+        }
+      ],
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/";,
+          "height": 242
+        },
+        "id": "sKAMXD5ElhYP",
+        "outputId": "928d9ad7-ae75-42d7-8dc6-8c5afd730b11"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "Collecting a Beam DataFrame into a Pandas DataFrame is useful to 
perform\n",
+        "[operations not supported by Beam 
DataFrames](https://beam.apache.org/documentation/dsls/dataframes/differences-from-pandas#classes-of-unsupported-operations).\n",
+        "\n",
+        "For example, let's say we want to take only the first two events in 
chronological order.\n",
+        "Since a deferred Beam DataFrame does not have any ordering 
guarantees,\n",
+        "first we need to sort the values.\n",
+        "In Pandas, we could first\n",
+        
"[`df.sort_values(by='timestamp')`](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.sort_values.html)
 and then\n",
+        
"[`df.head(2)`](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.head.html)
 to achieve this.\n",
+        "\n",
+        "However, these are\n",
+        "[order-sensitive 
operations](https://beam.apache.org/documentation/dsls/dataframes/differences-from-pandas#order-sensitive-operations)\n",
+        "so using them in a Beam DataFrame raises a\n",
+        
"[`WontImplementError`](https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.frame_base.html#apache_beam.dataframe.frame_base.WontImplementError).\n",
+        "We can work around this by using `collect` to convert the Beam 
DataFrame into a Pandas DataFrame."
+      ],
+      "metadata": {
+        "id": "t3Is6dArtN_Z"
+      }
+    },
+    {
+      "cell_type": "code",
+      "execution_count": 4,
+      "source": [
+        "import apache_beam.runners.interactive.interactive_beam as ib\n",
+        "\n",
+        "# Collect the Beam DataFrame into a Pandas DataFrame.\n",
+        "df = ib.collect(beam_df)\n",
+        "\n",
+        "# We can now use any Pandas transforms with our data.\n",
+        "df.sort_values(by='timestamp').head(2)"
+      ],
+      "outputs": [
+        {
+          "output_type": "display_data",
+          "data": {
+            "text/html": [
+              "\n",
+              "            <link rel=\"stylesheet\" 
href=\"https://stackpath.bootstrapcdn.com/bootstrap/4.4.1/css/bootstrap.min.css\";
 
integrity=\"sha384-Vkoo8x4CGsO3+Hhxv8T/Q5PaXtkKtu6ug5TOeNV6gBiFeWPGFN9MuhOf23Q9Ifjh\"
 crossorigin=\"anonymous\">\n",
+              "            <div 
id=\"progress_indicator_4486e01c01f75e7a68a4a5fefa9ecd2c\" 
class=\"spinner-border text-info\" role=\"status\">\n",
+              "            </div>"
+            ],
+            "text/plain": [
+              "<IPython.core.display.HTML object>"
+            ]
+          },
+          "metadata": {}
+        },
+        {
+          "output_type": "display_data",
+          "data": {
+            "application/javascript": "\n        if (typeof 
window.interactive_beam_jquery == 'undefined') {\n          var jqueryScript = 
document.createElement('script');\n          jqueryScript.src = 
'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n          
jqueryScript.type = 'text/javascript';\n          jqueryScript.onload = 
function() {\n            var datatableScript = 
document.createElement('script');\n            datatableScript.src = 
'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n            
datatableScript.type = 'text/javascript';\n            datatableScript.onload = 
function() {\n              window.interactive_beam_jquery = 
jQuery.noConflict(true);\n              
window.interactive_beam_jquery(document).ready(function($){\n                \n 
           
$(\"#progress_indicator_4486e01c01f75e7a68a4a5fefa9ecd2c\").remove();\n         
     });\n            }\n            
document.head.appendChild(datatableScript);\n          };\n          docum
 ent.head.appendChild(jqueryScript);\n        } else {\n          
window.interactive_beam_jquery(document).ready(function($){\n            \n     
       $(\"#progress_indicator_4486e01c01f75e7a68a4a5fefa9ecd2c\").remove();\n  
        });\n        }"
+          },
+          "metadata": {}
+        },
+        {
+          "output_type": "execute_result",
+          "data": {
+            "text/html": [
+              "<div>\n",
+              "<style scoped>\n",
+              "    .dataframe tbody tr th:only-of-type {\n",
+              "        vertical-align: middle;\n",
+              "    }\n",
+              "\n",
+              "    .dataframe tbody tr th {\n",
+              "        vertical-align: top;\n",
+              "    }\n",
+              "\n",
+              "    .dataframe thead th {\n",
+              "        text-align: right;\n",
+              "    }\n",
+              "</style>\n",
+              "<table border=\"1\" class=\"dataframe\">\n",
+              "  <thead>\n",
+              "    <tr style=\"text-align: right;\">\n",
+              "      <th></th>\n",
+              "      <th>timestamp</th>\n",
+              "      <th>event</th>\n",
+              "    </tr>\n",
+              "  </thead>\n",
+              "  <tbody>\n",
+              "    <tr>\n",
+              "      <th>solar_events.csv:0</th>\n",
+              "      <td>2021-03-20 09:37:00</td>\n",
+              "      <td>March Equinox</td>\n",
+              "    </tr>\n",
+              "    <tr>\n",
+              "      <th>solar_events.csv:1</th>\n",
+              "      <td>2021-06-21 03:32:00</td>\n",
+              "      <td>June Solstice</td>\n",
+              "    </tr>\n",
+              "  </tbody>\n",
+              "</table>\n",
+              "</div>"
+            ],
+            "text/plain": [
+              "                              timestamp          event\n",
+              "solar_events.csv:0  2021-03-20 09:37:00  March Equinox\n",
+              "solar_events.csv:1  2021-06-21 03:32:00  June Solstice"
+            ]
+          },
+          "metadata": {},
+          "execution_count": 4
+        }
+      ],
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/";,
+          "height": 138
+        },
+        "id": "8haEu6_9iTi7",
+        "outputId": "a1e07bdc-c66d-45e5-efff-90b93219c648"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "> ℹ️ Note that `collect` is _only_ accessible if you’re using\n",
+        "[Interactive 
Beam](https://beam.apache.org/releases/pydoc/current/apache_beam.runners.interactive.interactive_beam.html)"
+      ],
+      "metadata": {
+        "id": "ZkthQ13pwpm0"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "# Beam DataFrames to PCollections\n",
+        "\n",
+        "If you have your data as a Beam DataFrame, you can convert it into a 
regular PCollection with\n",
+        
"[`to_pcollection`](https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.convert.html#apache_beam.dataframe.convert.to_pcollection).\n",
+        "\n",
+        "Converting a Beam DataFrame into a PCollection gives us each element 
as a\n",
+        
"[`namedtuple`](https://docs.python.org/3/library/collections.html#collections.namedtuple)\n",
+        "instance.\n",
+        "This allows us to easily access each property, for example 
`element.event` and `element.timestamp`.\n",

Review comment:
       ```suggestion
           "Converting a Beam DataFrame in this way yields a PCollection with a 
[schema](https://beam.apache.org/documentation/programming-guide/#what-is-a-schema).\n",
           "This allows us to easily access each property by attribute, for 
example `element.event` and `element.timestamp`.\n",
   ```
   
   Similar note here, the important thing is the schema, not the element type.

##########
File path: examples/notebooks/tour-of-beam/dataframes.ipynb
##########
@@ -0,0 +1,744 @@
+{
+  "nbformat": 4,
+  "nbformat_minor": 2,
+  "metadata": {
+    "colab": {
+      "name": "Beam DataFrames",
+      "provenance": [],
+      "collapsed_sections": [],
+      "toc_visible": true
+    },
+    "kernelspec": {
+      "name": "python3",
+      "display_name": "Python 3"
+    },
+    "language_info": {
+      "name": "python"
+    }
+  },
+  "cells": [
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "source": [
+        "#@title ###### Licensed to the Apache Software Foundation (ASF), 
Version 2.0 (the \"License\")\n",
+        "\n",
+        "# Licensed to the Apache Software Foundation (ASF) under one\n",
+        "# or more contributor license agreements. See the NOTICE file\n",
+        "# distributed with this work for additional information\n",
+        "# regarding copyright ownership. The ASF licenses this file\n",
+        "# to you under the Apache License, Version 2.0 (the\n",
+        "# \"License\"); you may not use this file except in compliance\n",
+        "# with the License. You may obtain a copy of the License at\n",
+        "#\n",
+        "#   http://www.apache.org/licenses/LICENSE-2.0\n";,
+        "#\n",
+        "# Unless required by applicable law or agreed to in writing,\n",
+        "# software distributed under the License is distributed on an\n",
+        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+        "# KIND, either express or implied. See the License for the\n",
+        "# specific language governing permissions and limitations\n",
+        "# under the License."
+      ],
+      "outputs": [],
+      "metadata": {
+        "cellView": "form",
+        "id": "rz2qIC9IL2rI"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "# Beam DataFrames\n",
+        "\n",
+        "<button>\n",
+        "  <a 
href=\"https://beam.apache.org/documentation/dsls/dataframes/overview/\";>\n",
+        "    <img src=\"https://beam.apache.org/images/favicon.ico\"; 
alt=\"Open the docs\" height=\"16\"/>\n",
+        "    Beam DataFrames overview\n",
+        "  </a>\n",
+        "</button>\n",
+        "\n",
+        "Beam DataFrames provide a pandas-like 
[DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)\n",
+        "API to declare Beam pipelines.\n",
+        "\n",
+        "> ℹ️ To learn more about Beam DataFrames, take a look at the\n",
+        "[Beam DataFrames 
overview](https://beam.apache.org/documentation/dsls/dataframes/overview) 
page.\n",
+        "\n",
+        "First, we need to install Apache Beam with the `interactive` extra 
for the Interactive runner."
+      ],
+      "metadata": {
+        "id": "hDuXLLSZnI1D"
+      }
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "source": [
+        "%pip install --quiet apache-beam[interactive]"
+      ],
+      "outputs": [],
+      "metadata": {
+        "id": "8QVByaWjkarZ"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "Lets create a small data file of\n",
+        "[Comma-Separated Values 
(CSV)](https://en.wikipedia.org/wiki/Comma-separated_values).\n",
+        "It simply includes the dates of the\n",
+        "[equinoxes](https://en.wikipedia.org/wiki/Equinox) and\n",
+        "[solstices](https://en.wikipedia.org/wiki/Solstice)\n",
+        "of the year 2021."
+      ],
+      "metadata": {
+        "id": "aLqdbX4Mgipq"
+      }
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "source": [
+        "%%writefile solar_events.csv\n",
+        "timestamp,event\n",
+        "2021-03-20 09:37:00,March Equinox\n",
+        "2021-06-21 03:32:00,June Solstice\n",
+        "2021-09-22 19:21:00,September Equinox\n",
+        "2021-12-21 15:59:00,December Solstice"
+      ],
+      "outputs": [],
+      "metadata": {
+        "id": "hZjwAm7qotrJ"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "# Interactive Beam\n",
+        "\n",
+        "Pandas has the\n",
+        
"[`pandas.read_csv`](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html)\n",
+        "function to easily read CSV files into DataFrames.\n",
+        "Beam has the\n",
+        
"[`beam.dataframe.io.read_csv`](https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.io.html#apache_beam.dataframe.io.read_csv)\n",
+        "function that emulates `pandas.read_csv`, but returns a deferred Beam 
DataFrame.\n",
+        "\n",
+        "If you’re using\n",
+        "[Interactive 
Beam](https://beam.apache.org/releases/pydoc/current/apache_beam.runners.interactive.interactive_beam.html),\n",
+        "you can use `collect` to bring a Beam DataFrame into local memory as 
a Pandas DataFrame."
+      ],
+      "metadata": {
+        "id": "Hv_58JulleQ_"
+      }
+    },
+    {
+      "cell_type": "code",
+      "execution_count": 3,
+      "source": [
+        "import apache_beam as beam\n",
+        "import apache_beam.runners.interactive.interactive_beam as ib\n",
+        "from apache_beam.runners.interactive.interactive_runner import 
InteractiveRunner\n",
+        "\n",
+        "pipeline = beam.Pipeline(InteractiveRunner())\n",
+        "\n",
+        "# Create a deferred Beam DataFrame with the contents of our csv 
file.\n",
+        "beam_df = pipeline | 'Read CSV' >> 
beam.dataframe.io.read_csv('solar_events.csv')\n",
+        "\n",
+        "# We can use `ib.collect` to view the contents of a Beam 
DataFrame.\n",
+        "ib.collect(beam_df)"
+      ],
+      "outputs": [
+        {
+          "output_type": "display_data",
+          "data": {
+            "application/javascript": "\n        if (typeof 
window.interactive_beam_jquery == 'undefined') {\n          var jqueryScript = 
document.createElement('script');\n          jqueryScript.src = 
'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n          
jqueryScript.type = 'text/javascript';\n          jqueryScript.onload = 
function() {\n            var datatableScript = 
document.createElement('script');\n            datatableScript.src = 
'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n            
datatableScript.type = 'text/javascript';\n            datatableScript.onload = 
function() {\n              window.interactive_beam_jquery = 
jQuery.noConflict(true);\n              
window.interactive_beam_jquery(document).ready(function($){\n                \n 
             });\n            }\n            
document.head.appendChild(datatableScript);\n          };\n          
document.head.appendChild(jqueryScript);\n        } else {\n          
window.interactive
 _beam_jquery(document).ready(function($){\n            \n          });\n       
 }"
+          },
+          "metadata": {}
+        },
+        {
+          "output_type": "display_data",
+          "data": {
+            "text/html": [
+              "\n",
+              "            <link rel=\"stylesheet\" 
href=\"https://stackpath.bootstrapcdn.com/bootstrap/4.4.1/css/bootstrap.min.css\";
 
integrity=\"sha384-Vkoo8x4CGsO3+Hhxv8T/Q5PaXtkKtu6ug5TOeNV6gBiFeWPGFN9MuhOf23Q9Ifjh\"
 crossorigin=\"anonymous\">\n",
+              "            <div 
id=\"progress_indicator_1516f4062e4fc6d4e58f33cf44c41c1d\" 
class=\"spinner-border text-info\" role=\"status\">\n",
+              "            </div>"
+            ],
+            "text/plain": [
+              "<IPython.core.display.HTML object>"
+            ]
+          },
+          "metadata": {}
+        },
+        {
+          "output_type": "stream",
+          "name": "stderr",
+          "text": [
+            "WARNING:root:Make sure that locally built Python SDK docker image 
has Python 3.7 interpreter.\n"
+          ]
+        },
+        {
+          "output_type": "display_data",
+          "data": {
+            "application/javascript": "\n        if (typeof 
window.interactive_beam_jquery == 'undefined') {\n          var jqueryScript = 
document.createElement('script');\n          jqueryScript.src = 
'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n          
jqueryScript.type = 'text/javascript';\n          jqueryScript.onload = 
function() {\n            var datatableScript = 
document.createElement('script');\n            datatableScript.src = 
'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n            
datatableScript.type = 'text/javascript';\n            datatableScript.onload = 
function() {\n              window.interactive_beam_jquery = 
jQuery.noConflict(true);\n              
window.interactive_beam_jquery(document).ready(function($){\n                \n 
           
$(\"#progress_indicator_1516f4062e4fc6d4e58f33cf44c41c1d\").remove();\n         
     });\n            }\n            
document.head.appendChild(datatableScript);\n          };\n          docum
 ent.head.appendChild(jqueryScript);\n        } else {\n          
window.interactive_beam_jquery(document).ready(function($){\n            \n     
       $(\"#progress_indicator_1516f4062e4fc6d4e58f33cf44c41c1d\").remove();\n  
        });\n        }"
+          },
+          "metadata": {}
+        },
+        {
+          "output_type": "execute_result",
+          "data": {
+            "text/html": [
+              "<div>\n",
+              "<style scoped>\n",
+              "    .dataframe tbody tr th:only-of-type {\n",
+              "        vertical-align: middle;\n",
+              "    }\n",
+              "\n",
+              "    .dataframe tbody tr th {\n",
+              "        vertical-align: top;\n",
+              "    }\n",
+              "\n",
+              "    .dataframe thead th {\n",
+              "        text-align: right;\n",
+              "    }\n",
+              "</style>\n",
+              "<table border=\"1\" class=\"dataframe\">\n",
+              "  <thead>\n",
+              "    <tr style=\"text-align: right;\">\n",
+              "      <th></th>\n",
+              "      <th>timestamp</th>\n",
+              "      <th>event</th>\n",
+              "    </tr>\n",
+              "  </thead>\n",
+              "  <tbody>\n",
+              "    <tr>\n",
+              "      <th>solar_events.csv:0</th>\n",
+              "      <td>2021-03-20 09:37:00</td>\n",
+              "      <td>March Equinox</td>\n",
+              "    </tr>\n",
+              "    <tr>\n",
+              "      <th>solar_events.csv:1</th>\n",
+              "      <td>2021-06-21 03:32:00</td>\n",
+              "      <td>June Solstice</td>\n",
+              "    </tr>\n",
+              "    <tr>\n",
+              "      <th>solar_events.csv:2</th>\n",
+              "      <td>2021-09-22 19:21:00</td>\n",
+              "      <td>September Equinox</td>\n",
+              "    </tr>\n",
+              "    <tr>\n",
+              "      <th>solar_events.csv:3</th>\n",
+              "      <td>2021-12-21 15:59:00</td>\n",
+              "      <td>December Solstice</td>\n",
+              "    </tr>\n",
+              "  </tbody>\n",
+              "</table>\n",
+              "</div>"
+            ],
+            "text/plain": [
+              "                              timestamp              event\n",
+              "solar_events.csv:0  2021-03-20 09:37:00      March Equinox\n",
+              "solar_events.csv:1  2021-06-21 03:32:00      June Solstice\n",
+              "solar_events.csv:2  2021-09-22 19:21:00  September Equinox\n",
+              "solar_events.csv:3  2021-12-21 15:59:00  December Solstice"
+            ]
+          },
+          "metadata": {},
+          "execution_count": 3
+        }
+      ],
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/";,
+          "height": 242
+        },
+        "id": "sKAMXD5ElhYP",
+        "outputId": "928d9ad7-ae75-42d7-8dc6-8c5afd730b11"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "Collecting a Beam DataFrame into a Pandas DataFrame is useful to 
perform\n",
+        "[operations not supported by Beam 
DataFrames](https://beam.apache.org/documentation/dsls/dataframes/differences-from-pandas#classes-of-unsupported-operations).\n",
+        "\n",
+        "For example, let's say we want to take only the first two events in 
chronological order.\n",
+        "Since a deferred Beam DataFrame does not have any ordering 
guarantees,\n",
+        "first we need to sort the values.\n",
+        "In Pandas, we could first\n",
+        
"[`df.sort_values(by='timestamp')`](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.sort_values.html)
 and then\n",
+        
"[`df.head(2)`](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.head.html)
 to achieve this.\n",
+        "\n",
+        "However, these are\n",
+        "[order-sensitive 
operations](https://beam.apache.org/documentation/dsls/dataframes/differences-from-pandas#order-sensitive-operations)\n",
+        "so using them in a Beam DataFrame raises a\n",
+        
"[`WontImplementError`](https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.frame_base.html#apache_beam.dataframe.frame_base.WontImplementError).\n",
+        "We can work around this by using `collect` to convert the Beam 
DataFrame into a Pandas DataFrame."
+      ],
+      "metadata": {
+        "id": "t3Is6dArtN_Z"
+      }
+    },
+    {
+      "cell_type": "code",
+      "execution_count": 4,
+      "source": [
+        "import apache_beam.runners.interactive.interactive_beam as ib\n",
+        "\n",
+        "# Collect the Beam DataFrame into a Pandas DataFrame.\n",
+        "df = ib.collect(beam_df)\n",
+        "\n",
+        "# We can now use any Pandas transforms with our data.\n",
+        "df.sort_values(by='timestamp').head(2)"
+      ],
+      "outputs": [
+        {
+          "output_type": "display_data",
+          "data": {
+            "text/html": [
+              "\n",
+              "            <link rel=\"stylesheet\" 
href=\"https://stackpath.bootstrapcdn.com/bootstrap/4.4.1/css/bootstrap.min.css\";
 
integrity=\"sha384-Vkoo8x4CGsO3+Hhxv8T/Q5PaXtkKtu6ug5TOeNV6gBiFeWPGFN9MuhOf23Q9Ifjh\"
 crossorigin=\"anonymous\">\n",
+              "            <div 
id=\"progress_indicator_4486e01c01f75e7a68a4a5fefa9ecd2c\" 
class=\"spinner-border text-info\" role=\"status\">\n",
+              "            </div>"
+            ],
+            "text/plain": [
+              "<IPython.core.display.HTML object>"
+            ]
+          },
+          "metadata": {}
+        },
+        {
+          "output_type": "display_data",
+          "data": {
+            "application/javascript": "\n        if (typeof 
window.interactive_beam_jquery == 'undefined') {\n          var jqueryScript = 
document.createElement('script');\n          jqueryScript.src = 
'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n          
jqueryScript.type = 'text/javascript';\n          jqueryScript.onload = 
function() {\n            var datatableScript = 
document.createElement('script');\n            datatableScript.src = 
'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n            
datatableScript.type = 'text/javascript';\n            datatableScript.onload = 
function() {\n              window.interactive_beam_jquery = 
jQuery.noConflict(true);\n              
window.interactive_beam_jquery(document).ready(function($){\n                \n 
           
$(\"#progress_indicator_4486e01c01f75e7a68a4a5fefa9ecd2c\").remove();\n         
     });\n            }\n            
document.head.appendChild(datatableScript);\n          };\n          docum
 ent.head.appendChild(jqueryScript);\n        } else {\n          
window.interactive_beam_jquery(document).ready(function($){\n            \n     
       $(\"#progress_indicator_4486e01c01f75e7a68a4a5fefa9ecd2c\").remove();\n  
        });\n        }"
+          },
+          "metadata": {}
+        },
+        {
+          "output_type": "execute_result",
+          "data": {
+            "text/html": [
+              "<div>\n",
+              "<style scoped>\n",
+              "    .dataframe tbody tr th:only-of-type {\n",
+              "        vertical-align: middle;\n",
+              "    }\n",
+              "\n",
+              "    .dataframe tbody tr th {\n",
+              "        vertical-align: top;\n",
+              "    }\n",
+              "\n",
+              "    .dataframe thead th {\n",
+              "        text-align: right;\n",
+              "    }\n",
+              "</style>\n",
+              "<table border=\"1\" class=\"dataframe\">\n",
+              "  <thead>\n",
+              "    <tr style=\"text-align: right;\">\n",
+              "      <th></th>\n",
+              "      <th>timestamp</th>\n",
+              "      <th>event</th>\n",
+              "    </tr>\n",
+              "  </thead>\n",
+              "  <tbody>\n",
+              "    <tr>\n",
+              "      <th>solar_events.csv:0</th>\n",
+              "      <td>2021-03-20 09:37:00</td>\n",
+              "      <td>March Equinox</td>\n",
+              "    </tr>\n",
+              "    <tr>\n",
+              "      <th>solar_events.csv:1</th>\n",
+              "      <td>2021-06-21 03:32:00</td>\n",
+              "      <td>June Solstice</td>\n",
+              "    </tr>\n",
+              "  </tbody>\n",
+              "</table>\n",
+              "</div>"
+            ],
+            "text/plain": [
+              "                              timestamp          event\n",
+              "solar_events.csv:0  2021-03-20 09:37:00  March Equinox\n",
+              "solar_events.csv:1  2021-06-21 03:32:00  June Solstice"
+            ]
+          },
+          "metadata": {},
+          "execution_count": 4
+        }
+      ],
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/";,
+          "height": 138
+        },
+        "id": "8haEu6_9iTi7",
+        "outputId": "a1e07bdc-c66d-45e5-efff-90b93219c648"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "> ℹ️ Note that `collect` is _only_ accessible if you’re using\n",
+        "[Interactive 
Beam](https://beam.apache.org/releases/pydoc/current/apache_beam.runners.interactive.interactive_beam.html)"
+      ],
+      "metadata": {
+        "id": "ZkthQ13pwpm0"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "# Beam DataFrames to PCollections\n",
+        "\n",
+        "If you have your data as a Beam DataFrame, you can convert it into a 
regular PCollection with\n",
+        
"[`to_pcollection`](https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.convert.html#apache_beam.dataframe.convert.to_pcollection).\n",
+        "\n",
+        "Converting a Beam DataFrame into a PCollection gives us each element 
as a\n",
+        
"[`namedtuple`](https://docs.python.org/3/library/collections.html#collections.namedtuple)\n",
+        "instance.\n",
+        "This allows us to easily access each property, for example 
`element.event` and `element.timestamp`.\n",
+        "\n",
+        "Sometimes it's more convenient to convert the named tuples to Python 
dictionaries.\n",
+        "We can do that with the\n",
+        
"[`_asdict`](https://docs.python.org/3/library/collections.html#collections.somenamedtuple._asdict)\n",
+        "method."
+      ],
+      "metadata": {
+        "id": "ujRm4K0iP8SX"
+      }
+    },
+    {
+      "cell_type": "code",
+      "execution_count": 5,
+      "source": [
+        "import apache_beam as beam\n",
+        "from apache_beam.dataframe import convert\n",
+        "\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  beam_df = pipeline | 'Read CSV' >> 
beam.dataframe.io.read_csv('solar_events.csv')\n",
+        "\n",
+        "  (\n",
+        "      # Convert the Beam DataFrame to a PCollection.\n",
+        "      convert.to_pcollection(beam_df)\n",
+        "\n",
+        "      # We get named tuples, we can convert them to dictionaries like 
this.\n",
+        "      | 'To dictionaries' >> beam.Map(lambda x: dict(x._asdict()))\n",
+        "\n",
+        "      # Print the elements in the PCollection.\n",
+        "      | 'Print' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "outputs": [
+        {
+          "output_type": "stream",
+          "name": "stderr",
+          "text": [
+            "WARNING:root:Make sure that locally built Python SDK docker image 
has Python 3.7 interpreter.\n"
+          ]
+        },
+        {
+          "output_type": "stream",
+          "name": "stdout",
+          "text": [
+            "{'timestamp': '2021-03-20 09:37:00', 'event': 'March Equinox'}\n",
+            "{'timestamp': '2021-06-21 03:32:00', 'event': 'June Solstice'}\n",
+            "{'timestamp': '2021-09-22 19:21:00', 'event': 'September 
Equinox'}\n",
+            "{'timestamp': '2021-12-21 15:59:00', 'event': 'December 
Solstice'}\n"
+          ]
+        }
+      ],
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/";
+        },
+        "id": "g22op8rZPvB3",
+        "outputId": "bba88b0b-4d19-4d61-dac7-2c168998a2e4"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "# Pandas DataFrames to PCollections\n",
+        "\n",
+        "If you have your data as a Pandas DataFrame, you can convert it into 
a regular PCollection with\n",
+        
"[`to_pcollection`](https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.convert.html#apache_beam.dataframe.convert.to_pcollection).\n",
+        "\n",
+        "Since Pandas DataFrames are not part of any Beam pipeline, we must 
provide the `pipeline` explicitly."
+      ],
+      "metadata": {
+        "id": "t6xNIO0iPwtn"
+      }
+    },
+    {
+      "cell_type": "code",
+      "execution_count": 6,
+      "source": [
+        "import pandas as pd\n",
+        "import apache_beam as beam\n",
+        "from apache_beam.dataframe import convert\n",
+        "\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  df = pd.read_csv('solar_events.csv')\n",
+        "\n",
+        "  (\n",
+        "      # Convert the Pandas DataFrame to a PCollection.\n",
+        "      convert.to_pcollection(df, pipeline=pipeline)\n",
+        "\n",
+        "      # We get named tuples, we can convert them to dictionaries like 
this.\n",
+        "      | 'To dictionaries' >> beam.Map(lambda x: dict(x._asdict()))\n",
+        "\n",
+        "      # Print the elements in the PCollection.\n",
+        "      | 'Print' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "outputs": [
+        {
+          "output_type": "stream",
+          "name": "stderr",
+          "text": [
+            "WARNING:root:Make sure that locally built Python SDK docker image 
has Python 3.7 interpreter.\n"
+          ]
+        },
+        {
+          "output_type": "stream",
+          "name": "stdout",
+          "text": [
+            "{'timestamp': '2021-03-20 09:37:00', 'event': 'March Equinox'}\n",
+            "{'timestamp': '2021-06-21 03:32:00', 'event': 'June Solstice'}\n",
+            "{'timestamp': '2021-09-22 19:21:00', 'event': 'September 
Equinox'}\n",
+            "{'timestamp': '2021-12-21 15:59:00', 'event': 'December 
Solstice'}\n"
+          ]
+        }
+      ],
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/";
+        },
+        "id": "YWYVFkvFuksz",
+        "outputId": "a3e3e6fa-85ce-4891-95a0-389fba4461a6"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "If you have your data as a PCollection of Pandas DataFrames, you can 
convert them into a PCollection with\n",
+        
"[`FlatMap`](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap).\n",
+        "\n",
+        "> ℹ️ If the number of elements in each DataFrame can be very 
different (that is, some DataFrames might contain thousands of elements while 
others contain only a handful of elements), it might be a good idea to\n",
+        "> 
[`Reshuffle`](https://beam.apache.org/documentation/transforms/python/other/reshuffle).\n",
+        "> This basically rebalances the elements in the PCollection, which 
helps make sure all the workers have a balanced number of elements."
+      ],
+      "metadata": {
+        "id": "z6Q_tyWszkMC"
+      }
+    },
+    {
+      "cell_type": "code",
+      "execution_count": 7,
+      "source": [
+        "import pandas as pd\n",
+        "import apache_beam as beam\n",
+        "\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Filename' >> beam.Create(['solar_events.csv'])\n",
+        "\n",
+        "      # Each element is a Pandas DataFrame, so we can do any Pandas 
operation.\n",
+        "      | 'Read CSV' >> beam.Map(pd.read_csv)\n",
+        "\n",
+        "      # We yield each element of all the DataFrames into a 
PCollection of dictionaries.\n",
+        "      | 'To dictionaries' >> beam.FlatMap(lambda df: 
df.to_dict('records'))\n",
+        "\n",
+        "      # Reshuffle to make sure parallelization is balanced.\n",
+        "      | 'Reshuffle' >> beam.Reshuffle()\n",
+        "\n",
+        "      # Print the elements in the PCollection.\n",
+        "      | 'Print' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "outputs": [
+        {
+          "output_type": "stream",
+          "name": "stderr",
+          "text": [
+            "WARNING:root:Make sure that locally built Python SDK docker image 
has Python 3.7 interpreter.\n"
+          ]
+        },
+        {
+          "output_type": "stream",
+          "name": "stdout",
+          "text": [
+            "{'timestamp': '2021-03-20 09:37:00', 'event': 'March Equinox'}\n",
+            "{'timestamp': '2021-06-21 03:32:00', 'event': 'June Solstice'}\n",
+            "{'timestamp': '2021-09-22 19:21:00', 'event': 'September 
Equinox'}\n",
+            "{'timestamp': '2021-12-21 15:59:00', 'event': 'December 
Solstice'}\n"
+          ]
+        }
+      ],
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/";
+        },
+        "id": "fVWjO2Zfziqu",
+        "outputId": "c5db7be4-f764-487a-bc3b-bd5cbad4e396"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "# PCollections to Beam DataFrames\n",
+        "\n",
+        "If you have your data as a PCollection, you can convert it into a 
deferred Beam DataFrame with\n",
+        
"[`to_dataframe`](https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.convert.html#apache_beam.dataframe.convert.to_dataframe).\n",
+        "\n",
+        "> ℹ️ To convert a PCollection to a Beam DataFrame, each element 
_must_ be a\n",
+        
"[`beam.Row`](https://beam.apache.org/releases/pydoc/current/apache_beam.pvalue.html#apache_beam.pvalue.Row)."

Review comment:
       ```suggestion
           "> ℹ️ To convert a PCollection to a Beam DataFrame, each element 
_must_ have a\n",
           
"[schema](https://beam.apache.org/documentation/programming-guide/#what-is-a-schema)."
   ```
   
   Technically the requirement is just that the PCollection has a schema, 
beam.Row is one way to do that though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to