This is an automated email from the ASF dual-hosted git repository.
ningk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a6bb4ae [BEAM-10708] Added an example notebook for beam_sql magic
new 919fdef Merge pull request #15518 from KevinGG/beam_sql_out_cache
a6bb4ae is described below
commit a6bb4ae1a3889759ec92d2a21bc52bd2b026cbec
Author: KevinGG <[email protected]>
AuthorDate: Wed Sep 15 14:18:11 2021 -0700
[BEAM-10708] Added an example notebook for beam_sql magic
1. Supported show/collect for beam_sql outputs.
2. Added a parser for beam_sql inputs.
---
.../interactive/display/pcoll_visualization.py | 32 +-
.../Run Beam SQL with beam_sql magic.ipynb | 549 +++++++++++++++++++++
.../runners/interactive/interactive_beam.py | 35 +-
.../runners/interactive/interactive_beam_test.py | 12 +-
.../runners/interactive/sql/beam_sql_magics.py | 132 +++--
.../interactive/sql/beam_sql_magics_test.py | 14 +
.../apache_beam/runners/interactive/sql/utils.py | 28 +-
.../apache_beam/runners/interactive/utils.py | 14 +
.../apache_beam/runners/interactive/utils_test.py | 9 +
9 files changed, 771 insertions(+), 54 deletions(-)
diff --git
a/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py
b/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py
index 9e071fe..f317978 100644
--- a/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py
+++ b/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py
@@ -27,6 +27,7 @@ import datetime
import html
import logging
from datetime import timedelta
+from typing import Optional
from dateutil import tz
@@ -238,19 +239,42 @@ def visualize(
def visualize_computed_pcoll(
- pcoll_name: str, pcoll: beam.pvalue.PCollection) -> None:
+ pcoll_name: str,
+ pcoll: beam.pvalue.PCollection,
+ max_n: int,
+ max_duration_secs: float,
+ dynamic_plotting_interval: Optional[int] = None,
+ include_window_info: bool = False,
+ display_facets: bool = False) -> None:
"""A simple visualize alternative.
When the pcoll_name and pcoll pair identifies a watched and computed
PCollection in the current interactive environment without ambiguity, an
- ElementStream can be built directly from cache.
+ ElementStream can be built directly from cache. Returns immediately, the
+ visualization is asynchronous, but guaranteed to end in the near future.
+
+ Args:
+ pcoll_name: the variable name of the PCollection.
+ pcoll: the PCollection to be visualized.
+ max_n: the maximum number of elements to visualize.
+ max_duration_secs: max duration of elements to read in seconds.
+ dynamic_plotting_interval: the interval in seconds between visualization
+ updates if provided; otherwise, no dynamic plotting.
+ include_window_info: whether to include windowing info in the elements.
+ display_facets: whether to display the facets widgets.
"""
pipeline = ie.current_env().user_pipeline(pcoll.pipeline)
rm = ie.current_env().get_recording_manager(pipeline, create_if_absent=True)
+
stream = rm.read(
- pcoll_name, pcoll, max_n=float('inf'), max_duration_secs=float('inf'))
+ pcoll_name, pcoll, max_n=max_n, max_duration_secs=max_duration_secs)
if stream:
- visualize(stream, element_type=pcoll.element_type)
+ visualize(
+ stream,
+ dynamic_plotting_interval=dynamic_plotting_interval,
+ include_window_info=include_window_info,
+ display_facets=display_facets,
+ element_type=pcoll.element_type)
class PCollectionVisualization(object):
diff --git a/sdks/python/apache_beam/runners/interactive/examples/Run Beam SQL
with beam_sql magic.ipynb
b/sdks/python/apache_beam/runners/interactive/examples/Run Beam SQL with
beam_sql magic.ipynb
new file mode 100644
index 0000000..d68fa8c
--- /dev/null
+++ b/sdks/python/apache_beam/runners/interactive/examples/Run Beam SQL with
beam_sql magic.ipynb
@@ -0,0 +1,549 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "id": "83acd0be",
+ "metadata": {},
+ "source": [
+ "Licensed under the Apache License, Version 2.0 (the \"License\");\n",
+ "<!--\n",
+ " Licensed to the Apache Software Foundation (ASF) under one\n",
+ " or more contributor license agreements. See the NOTICE file\n",
+ " distributed with this work for additional information\n",
+ " regarding copyright ownership. The ASF licenses this file\n",
+ " to you under the Apache License, Version 2.0 (the\n",
+ " \"License\"); you may not use this file except in compliance\n",
+ " with the License. You may obtain a copy of the License at\n",
+ "\n",
+ " http://www.apache.org/licenses/LICENSE-2.0\n",
+ "\n",
+ " Unless required by applicable law or agreed to in writing,\n",
+ " software distributed under the License is distributed on an\n",
+ " \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ " KIND, either express or implied. See the License for the\n",
+ " specific language governing permissions and limitations\n",
+ " under the License.\n",
+ "-->\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "5022179a",
+ "metadata": {},
+ "source": [
+ "# Run Beam SQL in notebooks\n",
+ "\n",
+ "[Beam SQL](https://beam.apache.org/documentation/dsls/sql/overview/)
allows a Beam user to query PCollections with SQL statements. Currently,
`InteractiveRunner` does not support `SqlTransform` due to
[BEAM-10708](https://issues.apache.org/jira/browse/BEAM-10708). However, a user
could use the `beam_sql` magic to run Beam SQL in the notebook and introspect
the result.\n",
+ "\n",
+ "`beam_sql` is an IPython [custom
magic](https://ipython.readthedocs.io/en/stable/config/custommagics.html). If
you're not familiar with magics, here are some [built-in
examples](https://ipython.readthedocs.io/en/stable/interactive/magics.html).
It's a convenient way to validate your queries locally against known/test data
sources when prototyping a Beam pipeline with SQL, before productionizing it on
remote cluster/services.\n",
+ "\n",
+ "First, let's load the `beam_sql` magic:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "c6b6e3c1",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "%load_ext apache_beam.runners.interactive.sql.beam_sql_magics"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "a7c43b84",
+ "metadata": {},
+ "source": [
+ "Since SQL support in Beam Python SDK is implemented through xLang
external transform, make sure you have below prerequisites:\n",
+ "- Have `docker` installed;\n",
+ "- Have jdk8 or jdk11 installed and $JAVA_HOME set;"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "b280710a",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "!docker image list\n",
+ "!java --version\n",
+ "!echo $JAVA_HOME"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "28b1b320",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# Optionally sets the logging level to reduce distraction.\n",
+ "import logging\n",
+ "\n",
+ "logging.root.setLevel(logging.ERROR)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "f6b8789f",
+ "metadata": {},
+ "source": [
+ "**Important**: if you're using Beam built from your local source code,
additionally:\n",
+ "\n",
+ "- Have the Java expansion service shadowjar built. Go to the root
directory of your local beam repo and then execute:\n",
+ " `./gradlew :sdks:java:extensions:sql:expansion-service:shadowJar`.\n",
+ "- Based on your jdk version, pull the docker image `docker pull
apache/beam_java11_sdk` or `docker pull apache/beam_java8_sdk`.\n",
+ "- Then tag the image with your current Beam dev version. You can check
the dev version under `apache_beam.version.__version__`. For example, if you're
using jdk11 and dev version is `x.x.x.dev`, execute `docker image tag
apache/beam_java11_sdk:latest apache/beam_java11_sdk:x.x.x.dev`."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "14c8967d",
+ "metadata": {},
+ "source": [
+ "## Query#1 - A simple static query\n",
+ "\n",
+ "The `beam_sql` magic can be used as either a line magic or a cell
magic.\n",
+ "You can check its usage by running:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "c212dd89",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "%beam_sql -h"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "7914c1aa",
+ "metadata": {},
+ "source": [
+ "You can run a simple SQL query (in Apache Calcite SQL
[syntax](https://beam.apache.org/documentation/dsls/sql/calcite/query-syntax/))
to create a [schema-aware
PCollection](https://beam.apache.org/documentation/programming-guide/#schemas)
from static values."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "895341fa",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "%%beam_sql -o query1_data\n",
+ "SELECT CAST(5 AS INT) AS `id`, CAST('foo' AS VARCHAR) AS `str`, CAST(3.14
AS DOUBLE) AS `flt`"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "c394ead5",
+ "metadata": {},
+ "source": [
+ "The `beam_sql` magic shows you the result of the SQL query.\n",
+ "\n",
+ "It also creates and outputs a PCollection named `query1_data` with
`element_type` like `BeamSchema_...(id: int32, str: str)`.\n",
+ "\n",
+ "Note that you have **not** explicitly created a Beam pipeline. You get a
PCollection because the `beam_sql` magic always **implicitly creates** a
pipeline to execute your SQL query. To hold the elements with each field's type
info, Beam automatically creates a schema as the `element_type` for the created
PCollection."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "981b2cc9",
+ "metadata": {},
+ "source": [
+ "To introspect the data again with more knobs, you can use `show`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "e97caf83",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "from apache_beam.runners.interactive import interactive_beam as ib\n",
+ "ib.show(query1_data)\n",
+ "# Uncomment below to set more args.\n",
+ "# ib.show(query1_data, visualize_data=True, include_window_info=True)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "f58b15a8",
+ "metadata": {},
+ "source": [
+ "To materialize the PCollection into a pandas
[DataFrame](https://pandas.pydata.org/pandas-docs/stable/user_guide/dsintro.html#dataframe)
object, you can use `collect`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "47b8da1a",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "ib.collect(query1_data)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "09b4f24c",
+ "metadata": {},
+ "source": [
+ "You can also additionally append some transforms such as writing to a
text file and print the elements:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "9a650bbb",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "import apache_beam as beam\n",
+ "\n",
+ "coder=beam.coders.registry.get_coder(query1_data.element_type)\n",
+ "print(coder)\n",
+ "query1_data | beam.io.textio.WriteToText('/tmp/query1_data',
coder=coder)\n",
+ "query1_data | beam.Map(print)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "6cf89704",
+ "metadata": {},
+ "source": [
+ "Execute the pipeline as a normal pipeline running on DirectRunner and
inspect the output file."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "d524e1a0",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "!rm -rf /tmp/query1_data*\n",
+ "query1_data.pipeline.run().wait_until_finish()\n",
+ "!ls /tmp/query1_data*\n",
+ "!cat /tmp/query1_data*"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "5600945a",
+ "metadata": {},
+ "source": [
+ "The coder in use is a `RowCoder`. The element is encoded and written to
the text file. When inspecting it directly, it may display garbled strings. The
file will be revisited later in Query#4."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "30aa1188",
+ "metadata": {},
+ "source": [
+ "### [Optional] Omit the `-o` option.\n",
+ "If the option is omitted, an output name is auto-generated based on the
SQL query and PCollection (if any) it queries. Optionally, you can also use the
`_[{execution_count}]` convention: `_` for last output and `_{execution_count}`
for a specific cell execution output.\n",
+ "\n",
+ "However, explicitly naming the output is recommended for better notebook
readability and to avoid unexpected errors.\n",
+ "\n",
+ "Below example outputs a PCollection named like `sql_output_...`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "b445e4f1",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "%%beam_sql\n",
+ "SELECT CAST(1 AS INT) AS `id`, CAST('foo' AS VARCHAR) AS `str`, CAST(3.14
AS DOUBLE) AS `flt`"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "c7b9e4fb",
+ "metadata": {},
+ "source": [
+ "Now that you are familiar with the `beam_sql` magic, you can build more
queries against PCollections.\n",
+ "\n",
+ "Let's install the `names` package to randomly generate some names."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "ef1ca0fc",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "%pip install names"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "1c0d5739",
+ "metadata": {},
+ "source": [
+ "Import all modules needed for this example."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "20cdf3b9",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "import names\n",
+ "import typing\n",
+ "\n",
+ "import apache_beam as beam\n",
+ "from apache_beam.runners.interactive.interactive_runner import
InteractiveRunner\n",
+ "from apache_beam.runners.interactive import interactive_beam as ib"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "00db1574",
+ "metadata": {},
+ "source": [
+ "Create a pipeline `p` with the `InteractiveRunner`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "24caeb60",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "p = beam.Pipeline(InteractiveRunner())"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "0a4ca6eb",
+ "metadata": {},
+ "source": [
+ "Then let's create a schema with `typing.NamedTuple`. Let's call it
`Person` with a field `id` and a field `name`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "23910a9d",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "class Person(typing.NamedTuple):\n",
+ " id: int\n",
+ " name: str"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "5c626d63",
+ "metadata": {},
+ "source": [
+ "With `beam_sql` magic, you can utilize all the Beam I/O connectors
(streaming is currently not supported due to `DirectRunner` not supporting
streaming pipeline with `SqlTransform`) as source of data, then build a SQL
query against all the data and check the output. If needed, you can sink the
output following the `WriteToText` example demonstrated above."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "2d892920",
+ "metadata": {},
+ "source": [
+ "## Query#2 - Querying a single PCollection\n",
+ "\n",
+ "Let's build a PCollection with 10 random `Person` typed elements."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "8a5fc9b9",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "persons = (p \n",
+ " | beam.Create([Person(id=x, name=names.get_full_name()) for x
in range(10)]))\n",
+ "ib.show(persons)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "84d64746",
+ "metadata": {},
+ "source": [
+ "You can look for all elements with `id < 5` in `persons` with the below
query and assign the output to `persons_id_lt_5`. Also, you can enable `-v`
option to see more details about the execution."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "07db1116",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "%%beam_sql -o persons_id_lt_5 -v\n",
+ "SELECT * FROM persons WHERE id <5"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "68afa962",
+ "metadata": {},
+ "source": [
+ "With `-v`, if it's the first time running this query, you might see a
warning message about\n",
+ "\n",
+ "```\n",
+ "Schema Person has not been registered to use a RowCoder. Automatically
registering it by running: beam.coders.registry.register_coder(Person,
beam.coders.RowCoder)\n",
+ "```\n",
+ "\n",
+ "The `beam_sql` magic helps registering a `RowCoder` for each schema you
define and use whenever it finds one. You can also explicitly run the same code
to do so.\n",
+ "\n",
+ "Note the output element type is `Person(id: int, name: str)` instead of
`BeamSchema_...` because you have selected all the fields from a single
PCollection of the known type `Person(id: int, name: str)`."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "79587515",
+ "metadata": {},
+ "source": [
+ "## Query#3 - Joining multiple PCollections\n",
+ "\n",
+ "You can build a `persons_2` PCollection with a different range of `id`s
and `name`s. "
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "c01fa39a",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "persons_2 = (p \n",
+ " | beam.Create([Person(id=x, name=names.get_full_name()) for
x in range(5, 15)]))\n",
+ "ib.show(persons_2)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "6904ff8e",
+ "metadata": {},
+ "source": [
+ "Then query for all `name`s from `persons` and `persons_2` with the same
`id`s and assign the output to `persons_with_common_id`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "2a0a60ff",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "%%beam_sql -o persons_with_common_id -v\n",
+ "SELECT * FROM persons JOIN persons_2 USING (id)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "4bb4df8a",
+ "metadata": {},
+ "source": [
+ "Note the output element type is now some `BeamSchema_...(id: int64, name:
str, name0: str)`. Because you have selected columns from both PCollections,
there is no known schema to hold the result. Beam automatically creates a
schema and differentiates conflicted field `name` by suffixing `0` to one of
them.\n",
+ "\n",
+ "And since `Person` is already previously registered with a `RowCoder`,
there is no more warning about registering it anymore even with `-v`."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "cfcfeb76",
+ "metadata": {},
+ "source": [
+ "## Query#4 - Join multiple PCollections, including I/O."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "ce8abc3d",
+ "metadata": {},
+ "source": [
+ "Let's read the file written by Query#1 and use it to join `persons` and
`persons_2` to find `name`s with the common `id` in all three of them. "
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "d1dea37b",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# Use the exact same coder used when WriteToText and explicitly set the
output types.\n",
+ "query1_result_in_file = p | beam.io.ReadFromText(\n",
+ " '/tmp/query1_data*', coder=coder).with_output_types(\n",
+ " query1_data.element_type)\n",
+ "\n",
+ "# Check all the data sources.\n",
+ "ib.show(query1_result_in_file)\n",
+ "ib.show(persons)\n",
+ "ib.show(persons_2)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "4bf6c422",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "%%beam_sql -o entry_with_common_id\n",
+ "\n",
+ "SELECT query1_result_in_file.id, persons.name AS `name_1`, persons_2.name
AS `name_2`\n",
+ "FROM query1_result_in_file JOIN persons ON query1_result_in_file.id =
persons.id\n",
+ "JOIN persons_2 ON query1_result_in_file.id = persons_2.id"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "282f6173",
+ "metadata": {},
+ "source": [
+ "You can also chain another `beam_sql` magic to get just `name_1`:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "d858dd6c",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "%%beam_sql -o name_found\n",
+ "SELECT name_1 AS `name` FROM entry_with_common_id"
+ ]
+ }
+ ],
+ "metadata": {
+ "language_info": {
+ "codemirror_mode": {
+ "name": "ipython",
+ "version": 3
+ },
+ "file_extension": ".py",
+ "mimetype": "text/x-python",
+ "name": "python",
+ "nbconvert_exporter": "python",
+ "pygments_lexer": "ipython3",
+ "version": "3.7.11"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 5
+}
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_beam.py
b/sdks/python/apache_beam/runners/interactive/interactive_beam.py
index 0b3f8a3..fc55bf1 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_beam.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_beam.py
@@ -41,9 +41,11 @@ from apache_beam.dataframe.frame_base import DeferredBase
from apache_beam.runners.interactive import interactive_environment as ie
from apache_beam.runners.interactive.display import pipeline_graph
from apache_beam.runners.interactive.display.pcoll_visualization import
visualize
+from apache_beam.runners.interactive.display.pcoll_visualization import
visualize_computed_pcoll
from apache_beam.runners.interactive.options import interactive_options
from apache_beam.runners.interactive.utils import deferred_df_to_pcollection
from apache_beam.runners.interactive.utils import elements_to_df
+from apache_beam.runners.interactive.utils import find_pcoll_name
from apache_beam.runners.interactive.utils import progress_indicated
from apache_beam.runners.runner import PipelineState
@@ -448,7 +450,7 @@ def show(
# Iterate through the given PCollections and convert any deferred DataFrames
# or Series into PCollections.
- pcolls = []
+ pcolls = set()
# The element type is used to help visualize the given PCollection. For the
# deferred DataFrame/Series case it is the proxy of the frame.
@@ -462,14 +464,14 @@ def show(
element_types[pcoll] = element_type
- pcolls.append(pcoll)
+ pcolls.add(pcoll)
assert isinstance(pcoll, beam.pvalue.PCollection), (
'{} is not an apache_beam.pvalue.PCollection.'.format(pcoll))
assert len(pcolls) > 0, (
'Need at least 1 PCollection to show data visualization.')
- user_pipeline = pcolls[0].pipeline
+ user_pipeline = ie.current_env().user_pipeline(next(iter(pcolls)).pipeline)
if isinstance(n, str):
assert n == 'inf', (
@@ -488,6 +490,20 @@ def show(
if duration == 'inf':
duration = float('inf')
+ previously_computed_pcolls = {
+ pcoll
+ for pcoll in pcolls if pcoll in ie.current_env().computed_pcollections
+ }
+ for pcoll in previously_computed_pcolls:
+ visualize_computed_pcoll(
+ find_pcoll_name(pcoll),
+ pcoll,
+ n,
+ duration,
+ include_window_info=include_window_info,
+ display_facets=visualize_data)
+ pcolls = pcolls - previously_computed_pcolls
+
recording_manager = ie.current_env().get_recording_manager(
user_pipeline, create_if_absent=True)
recording = recording_manager.record(pcolls, max_n=n, max_duration=duration)
@@ -509,7 +525,6 @@ def show(
stream,
include_window_info=include_window_info,
element_type=element_types[stream.pcoll])
-
if recording.is_computed():
return
@@ -591,10 +606,20 @@ def collect(pcoll, n='inf', duration='inf',
include_window_info=False):
if duration == 'inf':
duration = float('inf')
- user_pipeline = pcoll.pipeline
+ user_pipeline = ie.current_env().user_pipeline(pcoll.pipeline)
recording_manager = ie.current_env().get_recording_manager(
user_pipeline, create_if_absent=True)
+ # If already computed, directly read the stream and return.
+ if pcoll in ie.current_env().computed_pcollections:
+ pcoll_name = find_pcoll_name(pcoll)
+ elements = list(
+ recording_manager.read(pcoll_name, pcoll, n, duration).read())
+ return elements_to_df(
+ elements,
+ include_window_info=include_window_info,
+ element_type=element_type)
+
recording = recording_manager.record([pcoll], max_n=n, max_duration=duration)
try:
diff --git
a/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py
b/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py
index f12ab8e..b093aa2 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py
@@ -120,7 +120,9 @@ class InteractiveBeamTest(unittest.TestCase):
ib.show(pcoll)
self.assertTrue(pcoll in ie.current_env().computed_pcollections)
- @patch('apache_beam.runners.interactive.interactive_beam.visualize')
+ @patch((
+ 'apache_beam.runners.interactive.interactive_beam.'
+ 'visualize_computed_pcoll'))
def test_show_handles_dict_of_pcolls(self, mocked_visualize):
p = beam.Pipeline(ir.InteractiveRunner())
# pylint: disable=range-builtin-not-iterating
@@ -133,7 +135,9 @@ class InteractiveBeamTest(unittest.TestCase):
ib.show({'pcoll': pcoll})
mocked_visualize.assert_called_once()
- @patch('apache_beam.runners.interactive.interactive_beam.visualize')
+ @patch((
+ 'apache_beam.runners.interactive.interactive_beam.'
+ 'visualize_computed_pcoll'))
def test_show_handles_iterable_of_pcolls(self, mocked_visualize):
p = beam.Pipeline(ir.InteractiveRunner())
# pylint: disable=range-builtin-not-iterating
@@ -159,7 +163,9 @@ class InteractiveBeamTest(unittest.TestCase):
ib.show(deferred)
mocked_visualize.assert_called_once()
- @patch('apache_beam.runners.interactive.interactive_beam.visualize')
+ @patch((
+ 'apache_beam.runners.interactive.interactive_beam.'
+ 'visualize_computed_pcoll'))
def test_show_noop_when_pcoll_container_is_invalid(self, mocked_visualize):
class SomeRandomClass:
def __init__(self, pcoll):
diff --git a/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py
b/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py
index 6c0f8d3..bd40f13 100644
--- a/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py
+++ b/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py
@@ -20,10 +20,12 @@
Only works within an IPython kernel.
"""
+import argparse
import importlib
import keyword
import logging
from typing import Dict
+from typing import List
from typing import Optional
from typing import Tuple
from typing import Union
@@ -49,23 +51,22 @@ from apache_beam.testing import test_stream
from apache_beam.testing.test_stream_service import TestStreamServiceController
from apache_beam.transforms.sql import SqlTransform
from IPython.core.magic import Magics
-from IPython.core.magic import cell_magic
+from IPython.core.magic import line_cell_magic
from IPython.core.magic import magics_class
_LOGGER = logging.getLogger(__name__)
-_EXAMPLE_USAGE = """Usage:
- %%%%beam_sql [output_name]
- Calcite SQL statement
- Syntax:
https://beam.apache.org/documentation/dsls/sql/calcite/query-syntax/
- Please make sure that there is no conflicts between your variable names and
- the SQL keywords, such as "SELECT", "FROM", "WHERE" and etc.
-
- output_name is optional. If not supplied, a variable name is automatically
- assigned to the output of the magic.
-
- The output of the magic is usually a PCollection or similar PValue,
- depending on the SQL statement executed.
+_EXAMPLE_USAGE = """beam_sql magic to execute Beam SQL in notebooks
+---------------------------------------------------------
+%%beam_sql [-o OUTPUT_NAME] query
+---------------------------------------------------------
+Or
+---------------------------------------------------------
+%%%%beam_sql [-o OUTPUT_NAME] query-line#1
+query-line#2
+...
+query-line#N
+---------------------------------------------------------
"""
_NOT_SUPPORTED_MSG = """The query was valid and successfully applied.
@@ -82,38 +83,107 @@ _NOT_SUPPORTED_MSG = """The query was valid and
successfully applied.
"""
+class BeamSqlParser:
+ """A parser to parse beam_sql inputs."""
+ def __init__(self):
+ self._parser = argparse.ArgumentParser(usage=_EXAMPLE_USAGE)
+ self._parser.add_argument(
+ '-o',
+ '--output-name',
+ dest='output_name',
+ help=(
+ 'The output variable name of the magic, usually a PCollection. '
+ 'Auto-generated if omitted.'))
+ self._parser.add_argument(
+ '-v',
+ '--verbose',
+ action='store_true',
+ help='Display more details about the magic execution.')
+ self._parser.add_argument(
+ 'query',
+ type=str,
+ nargs='*',
+ help=(
+ 'The Beam SQL query to execute. '
+ 'Syntax: https://beam.apache.org/documentation/dsls/sql/calcite/'
+ 'query-syntax/. '
+ 'Please make sure that there is no conflict between your variable '
+ 'names and the SQL keywords, such as "SELECT", "FROM", "WHERE" and
'
+ 'etc.'))
+
+ def parse(self, args: List[str]) -> Optional[argparse.Namespace]:
+ """Parses a list of string inputs.
+
+ The parsed namespace contains these attributes:
+ output_name: Optional[str], the output variable name.
+ verbose: bool, whether to display more details of the magic execution.
+ query: Optional[List[str]], the beam SQL query to execute.
+
+ Returns:
+ The parsed args or None if fail to parse.
+ """
+ try:
+ return self._parser.parse_args(args)
+ except KeyboardInterrupt:
+ raise
+ except: # pylint: disable=bare-except
+ # -h or --help results in SystemExit 0. Do not raise.
+ return None
+
+ def print_help(self) -> None:
+ self._parser.print_help()
+
+
def on_error(error_msg, *args):
"""Logs the error and the usage example."""
_LOGGER.error(error_msg, *args)
- _LOGGER.info(_EXAMPLE_USAGE)
+ BeamSqlParser().print_help()
@magics_class
class BeamSqlMagics(Magics):
- @cell_magic
- def beam_sql(self, line: str, cell: str) -> Union[None, PValue]:
- """The beam_sql cell magic that executes a Beam SQL.
+ def __init__(self, shell):
+ super().__init__(shell)
+ # Eagerly initializes the environment.
+ _ = ie.current_env()
+ self._parser = BeamSqlParser()
+
+ @line_cell_magic
+ def beam_sql(self, line: str, cell: Optional[str] = None) ->
Optional[PValue]:
+ """The beam_sql line/cell magic that executes a Beam SQL.
Args:
- line: (optional) the string on the same line after the beam_sql magic.
- Used as the output variable name in the __main__ module.
- cell: everything else in the same notebook cell as a string. Used as a
- Beam SQL query.
+ line: the string on the same line after the beam_sql magic.
+ cell: everything else in the same notebook cell as a string. If None,
+ beam_sql is used as line magic. Otherwise, cell magic.
Returns None if running into an error, otherwise a PValue as if a
SqlTransform is applied.
"""
- if line and not line.strip().isidentifier() or keyword.iskeyword(
- line.strip()):
+ input_str = line
+ if cell:
+ input_str += ' ' + cell
+ parsed = self._parser.parse(input_str.strip().split())
+ if not parsed:
+ # Failed to parse inputs, let the parser handle the exit.
+ return
+ output_name = parsed.output_name
+ verbose = parsed.verbose
+ query = parsed.query
+
+ if output_name and not output_name.isidentifier() or keyword.iskeyword(
+ output_name):
on_error(
'The output_name "%s" is not a valid identifier. Please supply a '
'valid identifier that is not a Python keyword.',
line)
return
- if not cell or cell.isspace():
- on_error('Please supply the sql to be executed.')
+ if not query:
+ on_error('Please supply the SQL query to be executed.')
return
- found = find_pcolls(cell, pcoll_by_name())
+ query = ' '.join(query)
+
+ found = find_pcolls(query, pcoll_by_name(), verbose=verbose)
for _, pcoll in found.items():
if not is_namedtuple(pcoll.element_type):
on_error(
@@ -123,9 +193,9 @@ class BeamSqlMagics(Magics):
pcoll,
pcoll.element_type)
return
- register_coder_for_schema(pcoll.element_type)
+ register_coder_for_schema(pcoll.element_type, verbose=verbose)
- output_name, output = apply_sql(cell, line, found)
+ output_name, output = apply_sql(query, output_name, found)
cache_output(output_name, output)
return output
@@ -249,7 +319,8 @@ def _build_query_components(
PCollection, or the pipeline to execute the query.
"""
if found:
- user_pipeline = next(iter(found.values())).pipeline
+ user_pipeline = ie.current_env().user_pipeline(
+ next(iter(found.values())).pipeline)
sql_pipeline = beam.Pipeline(options=user_pipeline._options)
ie.current_env().add_derived_pipeline(user_pipeline, sql_pipeline)
sql_source = {}
@@ -295,7 +366,8 @@ def cache_output(output_name: str, output: PValue) -> None:
_LOGGER.warning(_NOT_SUPPORTED_MSG, e, output.pipeline.runner)
return
ie.current_env().mark_pcollection_computed([output])
- visualize_computed_pcoll(output_name, output)
+ visualize_computed_pcoll(
+ output_name, output, max_n=float('inf'), max_duration_secs=float('inf'))
def load_ipython_extension(ipython):
diff --git
a/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics_test.py
b/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics_test.py
index 7c4de77..538abbb 100644
--- a/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics_test.py
+++ b/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics_test.py
@@ -34,6 +34,7 @@ try:
from apache_beam.runners.interactive.sql.beam_sql_magics import
_build_query_components
from apache_beam.runners.interactive.sql.beam_sql_magics import
_generate_output_name
from apache_beam.runners.interactive.sql.beam_sql_magics import cache_output
+ from apache_beam.runners.interactive.sql.beam_sql_magics import BeamSqlParser
except (ImportError, NameError):
pass # The test is to be skipped because [interactive] dep not installed.
@@ -133,6 +134,19 @@ class BeamSqlMagicsTest(unittest.TestCase):
cache_manager.exists(
'full', CacheKey.from_pcoll('pcoll_co', pcoll_co).to_str()))
+ def test_parser_with_all_inputs(self):
+ parsed = BeamSqlParser().parse(
+ '-o output_name -v SELECT * FROM abc'.split())
+ self.assertTrue(parsed.verbose)
+ self.assertEqual('output_name', parsed.output_name)
+ self.assertEqual('SELECT * FROM abc', ' '.join(parsed.query))
+
+ def test_parser_with_no_input(self):
+ parsed = BeamSqlParser().parse([])
+ self.assertFalse(parsed.verbose)
+ self.assertIsNone(parsed.output_name)
+ self.assertFalse(parsed.query)
+
if __name__ == '__main__':
unittest.main()
diff --git a/sdks/python/apache_beam/runners/interactive/sql/utils.py
b/sdks/python/apache_beam/runners/interactive/sql/utils.py
index 1840e60..fb4e57d 100644
--- a/sdks/python/apache_beam/runners/interactive/sql/utils.py
+++ b/sdks/python/apache_beam/runners/interactive/sql/utils.py
@@ -39,7 +39,8 @@ def is_namedtuple(cls: type) -> bool:
hasattr(cls, '_fields') and hasattr(cls, '__annotations__'))
-def register_coder_for_schema(schema: NamedTuple) -> None:
+def register_coder_for_schema(
+ schema: NamedTuple, verbose: bool = False) -> None:
"""Registers a RowCoder for the given schema if hasn't.
Notifies the user of what code has been implicitly executed.
@@ -48,19 +49,21 @@ def register_coder_for_schema(schema: NamedTuple) -> None:
'Schema %s is not a typing.NamedTuple.' % schema)
coder = beam.coders.registry.get_coder(schema)
if not isinstance(coder, beam.coders.RowCoder):
- _LOGGER.warning(
- 'Schema %s has not been registered to use a RowCoder. '
- 'Automatically registering it by running: '
- 'beam.coders.registry.register_coder(%s, '
- 'beam.coders.RowCoder)',
- schema.__name__,
- schema.__name__)
+ if verbose:
+ _LOGGER.warning(
+ 'Schema %s has not been registered to use a RowCoder. '
+ 'Automatically registering it by running: '
+ 'beam.coders.registry.register_coder(%s, '
+ 'beam.coders.RowCoder)',
+ schema.__name__,
+ schema.__name__)
beam.coders.registry.register_coder(schema, beam.coders.RowCoder)
def find_pcolls(
- sql: str, pcolls: Dict[str,
- beam.PCollection]) -> Dict[str, beam.PCollection]:
+ sql: str,
+ pcolls: Dict[str, beam.PCollection],
+ verbose: bool = False) -> Dict[str, beam.PCollection]:
"""Finds all PCollections used in the given sql query.
It does a simple word by word match and calls ib.collect for each PCollection
@@ -71,8 +74,9 @@ def find_pcolls(
if word in pcolls:
found[word] = pcolls[word]
if found:
- _LOGGER.info('Found PCollections used in the magic: %s.', found)
- _LOGGER.info('Collecting data...')
+ if verbose:
+ _LOGGER.info('Found PCollections used in the magic: %s.', found)
+ _LOGGER.info('Collecting data...')
for name, pcoll in found.items():
try:
_ = ib.collect(pcoll)
diff --git a/sdks/python/apache_beam/runners/interactive/utils.py
b/sdks/python/apache_beam/runners/interactive/utils.py
index 2c75cc9..49b87ba 100644
--- a/sdks/python/apache_beam/runners/interactive/utils.py
+++ b/sdks/python/apache_beam/runners/interactive/utils.py
@@ -313,6 +313,20 @@ def pcoll_by_name() -> Dict[str, beam.PCollection]:
return pcolls
+def find_pcoll_name(pcoll: beam.PCollection) -> str:
+ """Finds the variable name of a PCollection defined by the user.
+
+ Returns None if not assigned to any variable.
+ """
+ from apache_beam.runners.interactive import interactive_environment as ie
+
+ inspectables = ie.current_env().inspector.inspectables
+ for _, inspectable in inspectables.items():
+ if inspectable['value'] is pcoll:
+ return inspectable['metadata']['name']
+ return None
+
+
def cacheables() -> Dict[CacheKey, Cacheable]:
"""Finds all Cacheables with their CacheKeys."""
from apache_beam.runners.interactive import interactive_environment as ie
diff --git a/sdks/python/apache_beam/runners/interactive/utils_test.py
b/sdks/python/apache_beam/runners/interactive/utils_test.py
index 5929c8e2..784081e 100644
--- a/sdks/python/apache_beam/runners/interactive/utils_test.py
+++ b/sdks/python/apache_beam/runners/interactive/utils_test.py
@@ -309,6 +309,15 @@ class GeneralUtilTest(unittest.TestCase):
_ = p | 'ReadBoundedSource' >> beam.io.ReadFromText(f.name)
self.assertFalse(utils.has_unbounded_sources(p))
+ def test_find_pcoll_name(self):
+ p = beam.Pipeline()
+ pcoll = p | beam.Create([1, 2, 3])
+ ib.watch({
+ 'p_test_find_pcoll_name': p,
+ 'pcoll_test_find_pcoll_name': pcoll,
+ })
+ self.assertEqual('pcoll_test_find_pcoll_name',
utils.find_pcoll_name(pcoll))
+
if __name__ == '__main__':
unittest.main()