This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 993fe32b9d4 [SPARK-40042][PYTHON][DOCS] Make pyspark.sql.streaming.query examples self-contained 993fe32b9d4 is described below commit 993fe32b9d4cd2cbb34067fc465eeedd53cd375a Author: Qian.Sun <qian.sun2...@gmail.com> AuthorDate: Tue Aug 16 08:41:02 2022 -0500 [SPARK-40042][PYTHON][DOCS] Make pyspark.sql.streaming.query examples self-contained ### What changes were proposed in this pull request? This PR proposes to improve the examples in `pyspark.sql.streaming.query` by making each example self-contained with a brief explanation and a bit more realistic example. ### Why are the changes needed? To make the documentation more readable and able to copy and paste directly in PySpark shell. ### Does this PR introduce _any_ user-facing change? Yes, it changes the documentation. ### How was this patch tested? Manually ran each doctest. Closes #37482 from dcoliversun/SPARK-40042. Authored-by: Qian.Sun <qian.sun2...@gmail.com> Signed-off-by: Sean Owen <sro...@gmail.com> --- python/pyspark/sql/streaming/query.py | 247 +++++++++++++++++++++++++++++++--- 1 file changed, 227 insertions(+), 20 deletions(-) diff --git a/python/pyspark/sql/streaming/query.py b/python/pyspark/sql/streaming/query.py index 20c72a64de8..aaf0b677335 100644 --- a/python/pyspark/sql/streaming/query.py +++ b/python/pyspark/sql/streaming/query.py @@ -46,41 +46,92 @@ class StreamingQuery: @property # type: ignore[misc] @since(2.0) def id(self) -> str: - """Returns the unique id of this query that persists across restarts from checkpoint data. + """ + Returns the unique id of this query that persists across restarts from checkpoint data. That is, this id is generated when a query is started for the first time, and will be the same every time it is restarted from checkpoint data. There can only be one query with the same id active in a Spark cluster. Also see, `runId`. + + Examples + -------- + >>> sdf = spark.readStream.format("rate").load() + >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() + + Get the unique id of this query that persists across restarts from checkpoint data + + >>> sq.id # doctest: +ELLIPSIS + '...' + + >>> sq.stop() """ return self._jsq.id().toString() @property # type: ignore[misc] @since(2.1) def runId(self) -> str: - """Returns the unique id of this query that does not persist across restarts. That is, every + """ + Returns the unique id of this query that does not persist across restarts. That is, every query that is started (or restarted from checkpoint) will have a different runId. + + Examples + -------- + >>> sdf = spark.readStream.format("rate").load() + >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() + + Get the unique id of this query that does not persist across restarts + + >>> sq.runId # doctest: +ELLIPSIS + '...' + + >>> sq.stop() """ return self._jsq.runId().toString() @property # type: ignore[misc] @since(2.0) def name(self) -> str: - """Returns the user-specified name of the query, or null if not specified. + """ + Returns the user-specified name of the query, or null if not specified. This name can be specified in the `org.apache.spark.sql.streaming.DataStreamWriter` as `dataframe.writeStream.queryName("query").start()`. This name, if set, must be unique across all active queries. + + Examples + -------- + >>> sdf = spark.readStream.format("rate").load() + >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() + + Get the user-specified name of the query, or null if not specified. + + >>> sq.name + 'this_query' + + >>> sq.stop() """ return self._jsq.name() @property # type: ignore[misc] @since(2.0) def isActive(self) -> bool: - """Whether this streaming query is currently active or not.""" + """ + Whether this streaming query is currently active or not. + + Examples + -------- + >>> sdf = spark.readStream.format("rate").load() + >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() + >>> sq.isActive + True + + >>> sq.stop() + """ return self._jsq.isActive() @since(2.0) def awaitTermination(self, timeout: Optional[int] = None) -> Optional[bool]: - """Waits for the termination of `this` query, either by :func:`query.stop()` or by an + """ + Waits for the termination of `this` query, either by :func:`query.stop()` or by an exception. If the query has terminated with an exception, then the exception will be thrown. If `timeout` is set, it returns whether the query has terminated or not within the `timeout` seconds. @@ -90,6 +141,18 @@ class StreamingQuery: immediately (if the query has terminated with exception). throws :class:`StreamingQueryException`, if `this` query has terminated with an exception + + Examples + -------- + >>> sdf = spark.readStream.format("rate").load() + >>> sq = sdf.writeStream.format('memory').queryName('query_awaitTermination').start() + + Return wheter the query has terminated or not within 5 seconds + + >>> sq.awaitTermination(5) + False + + >>> sq.stop() """ if timeout is not None: if not isinstance(timeout, (int, float)) or timeout < 0: @@ -103,15 +166,40 @@ class StreamingQuery: def status(self) -> Dict[str, Any]: """ Returns the current status of the query. + + Examples + -------- + >>> sdf = spark.readStream.format("rate").load() + >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() + + Get the current status of the query + + >>> sq.status # doctest: +ELLIPSIS + {'message': '...', 'isDataAvailable': ..., 'isTriggerActive': ...} + + >>> sq.stop() """ return json.loads(self._jsq.status().json()) @property # type: ignore[misc] @since(2.1) def recentProgress(self) -> List[Dict[str, Any]]: - """Returns an array of the most recent [[StreamingQueryProgress]] updates for this query. + """ + Returns an array of the most recent [[StreamingQueryProgress]] updates for this query. The number of progress updates retained for each stream is configured by Spark session configuration `spark.sql.streaming.numRecentProgressUpdates`. + + Examples + -------- + >>> sdf = spark.readStream.format("rate").load() + >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() + + Get an array of the most recent query progress updates for this query + + >>> sq.recentProgress # doctest: +ELLIPSIS + [...] + + >>> sq.stop() """ return [json.loads(p.json()) for p in self._jsq.recentProgress()] @@ -126,6 +214,16 @@ class StreamingQuery: Returns ------- dict + + Examples + -------- + >>> sdf = spark.readStream.format("rate").load() + >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() + + Get the most recent query progress updates for this query + + >>> sq.lastProgress + >>> sq.stop() """ lastProgress = self._jsq.lastProgress() if lastProgress: @@ -134,7 +232,8 @@ class StreamingQuery: return None def processAllAvailable(self) -> None: - """Blocks until all available data in the source has been processed and committed to the + """ + Blocks until all available data in the source has been processed and committed to the sink. This method is intended for testing. .. versionadded:: 2.0.0 @@ -145,16 +244,45 @@ class StreamingQuery: Additionally, this method is only guaranteed to block until data that has been synchronously appended data to a stream source prior to invocation. (i.e. `getOffset` must immediately reflect the addition). + + Examples + -------- + >>> sdf = spark.readStream.format("rate").load() + >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() + + Blocks query until all available data in the source + has been processed and committed to the sink + + >>> sq.processAllAvailable + <bound method StreamingQuery.processAllAvailable ...> + + >>> sq.stop() """ return self._jsq.processAllAvailable() @since(2.0) def stop(self) -> None: - """Stop this streaming query.""" + """ + Stop this streaming query. + + Examples + -------- + >>> sdf = spark.readStream.format("rate").load() + >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() + >>> sq.isActive + True + + Stop streaming query + + >>> sq.stop() + >>> sq.isActive + False + """ self._jsq.stop() def explain(self, extended: bool = False) -> None: - """Prints the (logical and physical) plans to the console for debugging purpose. + """ + Prints the (logical and physical) plans to the console for debugging purpose. .. versionadded:: 2.1.0 @@ -165,8 +293,17 @@ class StreamingQuery: Examples -------- + >>> sdf = spark.readStream.format("rate").load() + >>> sdf.printSchema() + root + |-- timestamp: timestamp (nullable = true) + |-- value: long (nullable = true) + >>> sq = sdf.writeStream.format('memory').queryName('query_explain').start() >>> sq.processAllAvailable() # Wait a bit to generate the runtime plans. + + Explain the runtime plans + >>> sq.explain() == Physical Plan == ... @@ -218,15 +355,24 @@ class StreamingQueryManager: @property def active(self) -> List[StreamingQuery]: - """Returns a list of active queries associated with this SQLContext + """ + Returns a list of active queries associated with this SparkSession .. versionadded:: 2.0.0 Examples -------- + >>> sdf = spark.readStream.format("rate").load() + >>> sdf.printSchema() + root + |-- timestamp: timestamp (nullable = true) + |-- value: long (nullable = true) + >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() >>> sqm = spark.streams - >>> # get the list of active streaming queries + + Get the list of active streaming queries + >>> [q.name for q in sqm.active] ['this_query'] >>> sq.stop() @@ -234,29 +380,37 @@ class StreamingQueryManager: return [StreamingQuery(jsq) for jsq in self._jsqm.active()] def get(self, id: str) -> StreamingQuery: - """Returns an active query from this SQLContext or throws exception if an active query + """ + Returns an active query from this SparkSession or throws exception if an active query with this name doesn't exist. .. versionadded:: 2.0.0 Examples -------- + >>> sdf = spark.readStream.format("rate").load() + >>> sdf.printSchema() + root + |-- timestamp: timestamp (nullable = true) + |-- value: long (nullable = true) + >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() >>> sq.name 'this_query' + + Get an active query by id + >>> sq = spark.streams.get(sq.id) >>> sq.isActive True - >>> sq = sqlContext.streams.get(sq.id) - >>> sq.isActive - True >>> sq.stop() """ return StreamingQuery(self._jsqm.get(id)) @since(2.0) def awaitAnyTermination(self, timeout: Optional[int] = None) -> Optional[bool]: - """Wait until any of the queries on the associated SQLContext has terminated since the + """ + Wait until any of the queries on the associated SparkSession has terminated since the creation of the context, or since :func:`resetTerminated()` was called. If any query was terminated with an exception, then the exception will be thrown. If `timeout` is set, it returns whether the query has terminated or not within the @@ -274,6 +428,18 @@ class StreamingQueryManager: then check the `query.exception()` for each query. throws :class:`StreamingQueryException`, if `this` query has terminated with an exception + + Examples + -------- + >>> sdf = spark.readStream.format("rate").load() + >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() + + Return wheter any of the query on the associated SparkSession + has terminated or not within 5 seconds + + >>> spark.streams.awaitAnyTermination(5) + True + >>> sq.stop() """ if timeout is not None: if not isinstance(timeout, (int, float)) or timeout < 0: @@ -283,7 +449,8 @@ class StreamingQueryManager: return self._jsqm.awaitAnyTermination() def resetTerminated(self) -> None: - """Forget about past terminated queries so that :func:`awaitAnyTermination()` can be used + """ + Forget about past terminated queries so that :func:`awaitAnyTermination()` can be used again to wait for new terminations. .. versionadded:: 2.0.0 @@ -300,6 +467,28 @@ class StreamingQueryManager: :class:`~pyspark.sql.streaming.StreamingQuery`. .. versionadded:: 3.4.0 + + Examples + -------- + >>> from pyspark.sql.streaming import StreamingQueryListener + >>> class TestListener(StreamingQueryListener): + ... def onQueryStarted(self, event): + ... pass + ... + ... def onQueryProgress(self, event): + ... pass + ... + ... def onQueryTerminated(self, event): + ... pass + >>> test_listener = TestListener() + + Register streaming query listener + + >>> spark.streams.addListener(test_listener) + + Deregister streaming query listener + + >>> spark.streams.removeListener(test_listener) """ from pyspark import SparkContext from pyspark.java_gateway import ensure_callback_server_started @@ -316,6 +505,26 @@ class StreamingQueryManager: Deregister a :class:`StreamingQueryListener`. .. versionadded:: 3.4.0 + + >>> from pyspark.sql.streaming import StreamingQueryListener + >>> class TestListener(StreamingQueryListener): + ... def onQueryStarted(self, event): + ... pass + ... + ... def onQueryProgress(self, event): + ... pass + ... + ... def onQueryTerminated(self, event): + ... pass + >>> test_listener = TestListener() + + Register streaming query listener + + >>> spark.streams.addListener(test_listener) + + Deregister streaming query listener + + >>> spark.streams.removeListener(test_listener) """ self._jsqm.removeListener(listener._jlistener) @@ -323,7 +532,7 @@ class StreamingQueryManager: def _test() -> None: import doctest import os - from pyspark.sql import SparkSession, SQLContext + from pyspark.sql import SparkSession import pyspark.sql.streaming.query from py4j.protocol import Py4JError @@ -336,8 +545,6 @@ def _test() -> None: spark = SparkSession(sc) # type: ignore[name-defined] # noqa: F821 globs["spark"] = spark - globs["sqlContext"] = SQLContext.getOrCreate(spark.sparkContext) - globs["sdf"] = spark.readStream.format("text").load("python/test_support/sql/streaming") (failure_count, test_count) = doctest.testmod( pyspark.sql.streaming.query, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org