This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 993fe32b9d4 [SPARK-40042][PYTHON][DOCS] Make
pyspark.sql.streaming.query examples self-contained
993fe32b9d4 is described below
commit 993fe32b9d4cd2cbb34067fc465eeedd53cd375a
Author: Qian.Sun <[email protected]>
AuthorDate: Tue Aug 16 08:41:02 2022 -0500
[SPARK-40042][PYTHON][DOCS] Make pyspark.sql.streaming.query examples
self-contained
### What changes were proposed in this pull request?
This PR proposes to improve the examples in `pyspark.sql.streaming.query`
by making each example self-contained with a brief explanation and a bit more
realistic example.
### Why are the changes needed?
To make the documentation more readable and able to copy and paste directly
in PySpark shell.
### Does this PR introduce _any_ user-facing change?
Yes, it changes the documentation.
### How was this patch tested?
Manually ran each doctest.
Closes #37482 from dcoliversun/SPARK-40042.
Authored-by: Qian.Sun <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
---
python/pyspark/sql/streaming/query.py | 247 +++++++++++++++++++++++++++++++---
1 file changed, 227 insertions(+), 20 deletions(-)
diff --git a/python/pyspark/sql/streaming/query.py
b/python/pyspark/sql/streaming/query.py
index 20c72a64de8..aaf0b677335 100644
--- a/python/pyspark/sql/streaming/query.py
+++ b/python/pyspark/sql/streaming/query.py
@@ -46,41 +46,92 @@ class StreamingQuery:
@property # type: ignore[misc]
@since(2.0)
def id(self) -> str:
- """Returns the unique id of this query that persists across restarts
from checkpoint data.
+ """
+ Returns the unique id of this query that persists across restarts from
checkpoint data.
That is, this id is generated when a query is started for the first
time, and
will be the same every time it is restarted from checkpoint data.
There can only be one query with the same id active in a Spark cluster.
Also see, `runId`.
+
+ Examples
+ --------
+ >>> sdf = spark.readStream.format("rate").load()
+ >>> sq =
sdf.writeStream.format('memory').queryName('this_query').start()
+
+ Get the unique id of this query that persists across restarts from
checkpoint data
+
+ >>> sq.id # doctest: +ELLIPSIS
+ '...'
+
+ >>> sq.stop()
"""
return self._jsq.id().toString()
@property # type: ignore[misc]
@since(2.1)
def runId(self) -> str:
- """Returns the unique id of this query that does not persist across
restarts. That is, every
+ """
+ Returns the unique id of this query that does not persist across
restarts. That is, every
query that is started (or restarted from checkpoint) will have a
different runId.
+
+ Examples
+ --------
+ >>> sdf = spark.readStream.format("rate").load()
+ >>> sq =
sdf.writeStream.format('memory').queryName('this_query').start()
+
+ Get the unique id of this query that does not persist across restarts
+
+ >>> sq.runId # doctest: +ELLIPSIS
+ '...'
+
+ >>> sq.stop()
"""
return self._jsq.runId().toString()
@property # type: ignore[misc]
@since(2.0)
def name(self) -> str:
- """Returns the user-specified name of the query, or null if not
specified.
+ """
+ Returns the user-specified name of the query, or null if not specified.
This name can be specified in the
`org.apache.spark.sql.streaming.DataStreamWriter`
as `dataframe.writeStream.queryName("query").start()`.
This name, if set, must be unique across all active queries.
+
+ Examples
+ --------
+ >>> sdf = spark.readStream.format("rate").load()
+ >>> sq =
sdf.writeStream.format('memory').queryName('this_query').start()
+
+ Get the user-specified name of the query, or null if not specified.
+
+ >>> sq.name
+ 'this_query'
+
+ >>> sq.stop()
"""
return self._jsq.name()
@property # type: ignore[misc]
@since(2.0)
def isActive(self) -> bool:
- """Whether this streaming query is currently active or not."""
+ """
+ Whether this streaming query is currently active or not.
+
+ Examples
+ --------
+ >>> sdf = spark.readStream.format("rate").load()
+ >>> sq =
sdf.writeStream.format('memory').queryName('this_query').start()
+ >>> sq.isActive
+ True
+
+ >>> sq.stop()
+ """
return self._jsq.isActive()
@since(2.0)
def awaitTermination(self, timeout: Optional[int] = None) ->
Optional[bool]:
- """Waits for the termination of `this` query, either by
:func:`query.stop()` or by an
+ """
+ Waits for the termination of `this` query, either by
:func:`query.stop()` or by an
exception. If the query has terminated with an exception, then the
exception will be thrown.
If `timeout` is set, it returns whether the query has terminated or
not within the
`timeout` seconds.
@@ -90,6 +141,18 @@ class StreamingQuery:
immediately (if the query has terminated with exception).
throws :class:`StreamingQueryException`, if `this` query has
terminated with an exception
+
+ Examples
+ --------
+ >>> sdf = spark.readStream.format("rate").load()
+ >>> sq =
sdf.writeStream.format('memory').queryName('query_awaitTermination').start()
+
+ Return wheter the query has terminated or not within 5 seconds
+
+ >>> sq.awaitTermination(5)
+ False
+
+ >>> sq.stop()
"""
if timeout is not None:
if not isinstance(timeout, (int, float)) or timeout < 0:
@@ -103,15 +166,40 @@ class StreamingQuery:
def status(self) -> Dict[str, Any]:
"""
Returns the current status of the query.
+
+ Examples
+ --------
+ >>> sdf = spark.readStream.format("rate").load()
+ >>> sq =
sdf.writeStream.format('memory').queryName('this_query').start()
+
+ Get the current status of the query
+
+ >>> sq.status # doctest: +ELLIPSIS
+ {'message': '...', 'isDataAvailable': ..., 'isTriggerActive': ...}
+
+ >>> sq.stop()
"""
return json.loads(self._jsq.status().json())
@property # type: ignore[misc]
@since(2.1)
def recentProgress(self) -> List[Dict[str, Any]]:
- """Returns an array of the most recent [[StreamingQueryProgress]]
updates for this query.
+ """
+ Returns an array of the most recent [[StreamingQueryProgress]] updates
for this query.
The number of progress updates retained for each stream is configured
by Spark session
configuration `spark.sql.streaming.numRecentProgressUpdates`.
+
+ Examples
+ --------
+ >>> sdf = spark.readStream.format("rate").load()
+ >>> sq =
sdf.writeStream.format('memory').queryName('this_query').start()
+
+ Get an array of the most recent query progress updates for this query
+
+ >>> sq.recentProgress # doctest: +ELLIPSIS
+ [...]
+
+ >>> sq.stop()
"""
return [json.loads(p.json()) for p in self._jsq.recentProgress()]
@@ -126,6 +214,16 @@ class StreamingQuery:
Returns
-------
dict
+
+ Examples
+ --------
+ >>> sdf = spark.readStream.format("rate").load()
+ >>> sq =
sdf.writeStream.format('memory').queryName('this_query').start()
+
+ Get the most recent query progress updates for this query
+
+ >>> sq.lastProgress
+ >>> sq.stop()
"""
lastProgress = self._jsq.lastProgress()
if lastProgress:
@@ -134,7 +232,8 @@ class StreamingQuery:
return None
def processAllAvailable(self) -> None:
- """Blocks until all available data in the source has been processed
and committed to the
+ """
+ Blocks until all available data in the source has been processed and
committed to the
sink. This method is intended for testing.
.. versionadded:: 2.0.0
@@ -145,16 +244,45 @@ class StreamingQuery:
Additionally, this method is only guaranteed to block until data that
has been
synchronously appended data to a stream source prior to invocation.
(i.e. `getOffset` must immediately reflect the addition).
+
+ Examples
+ --------
+ >>> sdf = spark.readStream.format("rate").load()
+ >>> sq =
sdf.writeStream.format('memory').queryName('this_query').start()
+
+ Blocks query until all available data in the source
+ has been processed and committed to the sink
+
+ >>> sq.processAllAvailable
+ <bound method StreamingQuery.processAllAvailable ...>
+
+ >>> sq.stop()
"""
return self._jsq.processAllAvailable()
@since(2.0)
def stop(self) -> None:
- """Stop this streaming query."""
+ """
+ Stop this streaming query.
+
+ Examples
+ --------
+ >>> sdf = spark.readStream.format("rate").load()
+ >>> sq =
sdf.writeStream.format('memory').queryName('this_query').start()
+ >>> sq.isActive
+ True
+
+ Stop streaming query
+
+ >>> sq.stop()
+ >>> sq.isActive
+ False
+ """
self._jsq.stop()
def explain(self, extended: bool = False) -> None:
- """Prints the (logical and physical) plans to the console for
debugging purpose.
+ """
+ Prints the (logical and physical) plans to the console for debugging
purpose.
.. versionadded:: 2.1.0
@@ -165,8 +293,17 @@ class StreamingQuery:
Examples
--------
+ >>> sdf = spark.readStream.format("rate").load()
+ >>> sdf.printSchema()
+ root
+ |-- timestamp: timestamp (nullable = true)
+ |-- value: long (nullable = true)
+
>>> sq =
sdf.writeStream.format('memory').queryName('query_explain').start()
>>> sq.processAllAvailable() # Wait a bit to generate the runtime
plans.
+
+ Explain the runtime plans
+
>>> sq.explain()
== Physical Plan ==
...
@@ -218,15 +355,24 @@ class StreamingQueryManager:
@property
def active(self) -> List[StreamingQuery]:
- """Returns a list of active queries associated with this SQLContext
+ """
+ Returns a list of active queries associated with this SparkSession
.. versionadded:: 2.0.0
Examples
--------
+ >>> sdf = spark.readStream.format("rate").load()
+ >>> sdf.printSchema()
+ root
+ |-- timestamp: timestamp (nullable = true)
+ |-- value: long (nullable = true)
+
>>> sq =
sdf.writeStream.format('memory').queryName('this_query').start()
>>> sqm = spark.streams
- >>> # get the list of active streaming queries
+
+ Get the list of active streaming queries
+
>>> [q.name for q in sqm.active]
['this_query']
>>> sq.stop()
@@ -234,29 +380,37 @@ class StreamingQueryManager:
return [StreamingQuery(jsq) for jsq in self._jsqm.active()]
def get(self, id: str) -> StreamingQuery:
- """Returns an active query from this SQLContext or throws exception if
an active query
+ """
+ Returns an active query from this SparkSession or throws exception if
an active query
with this name doesn't exist.
.. versionadded:: 2.0.0
Examples
--------
+ >>> sdf = spark.readStream.format("rate").load()
+ >>> sdf.printSchema()
+ root
+ |-- timestamp: timestamp (nullable = true)
+ |-- value: long (nullable = true)
+
>>> sq =
sdf.writeStream.format('memory').queryName('this_query').start()
>>> sq.name
'this_query'
+
+ Get an active query by id
+
>>> sq = spark.streams.get(sq.id)
>>> sq.isActive
True
- >>> sq = sqlContext.streams.get(sq.id)
- >>> sq.isActive
- True
>>> sq.stop()
"""
return StreamingQuery(self._jsqm.get(id))
@since(2.0)
def awaitAnyTermination(self, timeout: Optional[int] = None) ->
Optional[bool]:
- """Wait until any of the queries on the associated SQLContext has
terminated since the
+ """
+ Wait until any of the queries on the associated SparkSession has
terminated since the
creation of the context, or since :func:`resetTerminated()` was
called. If any query was
terminated with an exception, then the exception will be thrown.
If `timeout` is set, it returns whether the query has terminated or
not within the
@@ -274,6 +428,18 @@ class StreamingQueryManager:
then check the `query.exception()` for each query.
throws :class:`StreamingQueryException`, if `this` query has
terminated with an exception
+
+ Examples
+ --------
+ >>> sdf = spark.readStream.format("rate").load()
+ >>> sq =
sdf.writeStream.format('memory').queryName('this_query').start()
+
+ Return wheter any of the query on the associated SparkSession
+ has terminated or not within 5 seconds
+
+ >>> spark.streams.awaitAnyTermination(5)
+ True
+ >>> sq.stop()
"""
if timeout is not None:
if not isinstance(timeout, (int, float)) or timeout < 0:
@@ -283,7 +449,8 @@ class StreamingQueryManager:
return self._jsqm.awaitAnyTermination()
def resetTerminated(self) -> None:
- """Forget about past terminated queries so that
:func:`awaitAnyTermination()` can be used
+ """
+ Forget about past terminated queries so that
:func:`awaitAnyTermination()` can be used
again to wait for new terminations.
.. versionadded:: 2.0.0
@@ -300,6 +467,28 @@ class StreamingQueryManager:
:class:`~pyspark.sql.streaming.StreamingQuery`.
.. versionadded:: 3.4.0
+
+ Examples
+ --------
+ >>> from pyspark.sql.streaming import StreamingQueryListener
+ >>> class TestListener(StreamingQueryListener):
+ ... def onQueryStarted(self, event):
+ ... pass
+ ...
+ ... def onQueryProgress(self, event):
+ ... pass
+ ...
+ ... def onQueryTerminated(self, event):
+ ... pass
+ >>> test_listener = TestListener()
+
+ Register streaming query listener
+
+ >>> spark.streams.addListener(test_listener)
+
+ Deregister streaming query listener
+
+ >>> spark.streams.removeListener(test_listener)
"""
from pyspark import SparkContext
from pyspark.java_gateway import ensure_callback_server_started
@@ -316,6 +505,26 @@ class StreamingQueryManager:
Deregister a :class:`StreamingQueryListener`.
.. versionadded:: 3.4.0
+
+ >>> from pyspark.sql.streaming import StreamingQueryListener
+ >>> class TestListener(StreamingQueryListener):
+ ... def onQueryStarted(self, event):
+ ... pass
+ ...
+ ... def onQueryProgress(self, event):
+ ... pass
+ ...
+ ... def onQueryTerminated(self, event):
+ ... pass
+ >>> test_listener = TestListener()
+
+ Register streaming query listener
+
+ >>> spark.streams.addListener(test_listener)
+
+ Deregister streaming query listener
+
+ >>> spark.streams.removeListener(test_listener)
"""
self._jsqm.removeListener(listener._jlistener)
@@ -323,7 +532,7 @@ class StreamingQueryManager:
def _test() -> None:
import doctest
import os
- from pyspark.sql import SparkSession, SQLContext
+ from pyspark.sql import SparkSession
import pyspark.sql.streaming.query
from py4j.protocol import Py4JError
@@ -336,8 +545,6 @@ def _test() -> None:
spark = SparkSession(sc) # type: ignore[name-defined] # noqa: F821
globs["spark"] = spark
- globs["sqlContext"] = SQLContext.getOrCreate(spark.sparkContext)
- globs["sdf"] =
spark.readStream.format("text").load("python/test_support/sql/streaming")
(failure_count, test_count) = doctest.testmod(
pyspark.sql.streaming.query,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]