This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d067fc6c1635 Revert "[SPARK-48567][SS] StreamingQuery.lastProgress
should return the actual StreamingQueryProgress"
d067fc6c1635 is described below
commit d067fc6c1635dfe7730223021e912e78637bb791
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Wed Jun 19 12:15:02 2024 +0900
Revert "[SPARK-48567][SS] StreamingQuery.lastProgress should return the
actual StreamingQueryProgress"
This reverts commit 042804ad545c88afe69c149b25baea00fc213708.
---
python/pyspark/sql/connect/streaming/query.py | 9 +-
python/pyspark/sql/streaming/listener.py | 228 ++++++++-------------
python/pyspark/sql/streaming/query.py | 13 +-
.../pyspark/sql/tests/streaming/test_streaming.py | 44 +---
.../sql/tests/streaming/test_streaming_listener.py | 32 +--
5 files changed, 99 insertions(+), 227 deletions(-)
diff --git a/python/pyspark/sql/connect/streaming/query.py
b/python/pyspark/sql/connect/streaming/query.py
index cc1e2e220188..98ecdc4966c7 100644
--- a/python/pyspark/sql/connect/streaming/query.py
+++ b/python/pyspark/sql/connect/streaming/query.py
@@ -33,7 +33,6 @@ from pyspark.sql.streaming.listener import (
QueryProgressEvent,
QueryIdleEvent,
QueryTerminatedEvent,
- StreamingQueryProgress,
)
from pyspark.sql.streaming.query import (
StreamingQuery as PySparkStreamingQuery,
@@ -111,21 +110,21 @@ class StreamingQuery:
status.__doc__ = PySparkStreamingQuery.status.__doc__
@property
- def recentProgress(self) -> List[StreamingQueryProgress]:
+ def recentProgress(self) -> List[Dict[str, Any]]:
cmd = pb2.StreamingQueryCommand()
cmd.recent_progress = True
progress =
self._execute_streaming_query_cmd(cmd).recent_progress.recent_progress_json
- return [StreamingQueryProgress.fromJson(json.loads(p)) for p in
progress]
+ return [json.loads(p) for p in progress]
recentProgress.__doc__ = PySparkStreamingQuery.recentProgress.__doc__
@property
- def lastProgress(self) -> Optional[StreamingQueryProgress]:
+ def lastProgress(self) -> Optional[Dict[str, Any]]:
cmd = pb2.StreamingQueryCommand()
cmd.last_progress = True
progress =
self._execute_streaming_query_cmd(cmd).recent_progress.recent_progress_json
if len(progress) > 0:
- return StreamingQueryProgress.fromJson(json.loads(progress[-1]))
+ return json.loads(progress[-1])
else:
return None
diff --git a/python/pyspark/sql/streaming/listener.py
b/python/pyspark/sql/streaming/listener.py
index 6cc2cc3fa2b8..2aa63cdb91ab 100644
--- a/python/pyspark/sql/streaming/listener.py
+++ b/python/pyspark/sql/streaming/listener.py
@@ -397,13 +397,10 @@ class QueryTerminatedEvent:
return self._errorClassOnException
-class StreamingQueryProgress(dict):
+class StreamingQueryProgress:
"""
.. versionadded:: 3.4.0
- .. versionchanged:: 4.0.0
- Becomes a subclass of dict
-
Notes
-----
This API is evolving.
@@ -429,25 +426,23 @@ class StreamingQueryProgress(dict):
jprogress: Optional["JavaObject"] = None,
jdict: Optional[Dict[str, Any]] = None,
):
- super().__init__(
- id=id,
- runId=runId,
- name=name,
- timestamp=timestamp,
- batchId=batchId,
- batchDuration=batchDuration,
- durationMs=durationMs,
- eventTime=eventTime,
- stateOperators=stateOperators,
- sources=sources,
- sink=sink,
- numInputRows=numInputRows,
- inputRowsPerSecond=inputRowsPerSecond,
- processedRowsPerSecond=processedRowsPerSecond,
- observedMetrics=observedMetrics,
- )
self._jprogress: Optional["JavaObject"] = jprogress
self._jdict: Optional[Dict[str, Any]] = jdict
+ self._id: uuid.UUID = id
+ self._runId: uuid.UUID = runId
+ self._name: Optional[str] = name
+ self._timestamp: str = timestamp
+ self._batchId: int = batchId
+ self._batchDuration: int = batchDuration
+ self._durationMs: Dict[str, int] = durationMs
+ self._eventTime: Dict[str, str] = eventTime
+ self._stateOperators: List[StateOperatorProgress] = stateOperators
+ self._sources: List[SourceProgress] = sources
+ self._sink: SinkProgress = sink
+ self._numInputRows: int = numInputRows
+ self._inputRowsPerSecond: float = inputRowsPerSecond
+ self._processedRowsPerSecond: float = processedRowsPerSecond
+ self._observedMetrics: Dict[str, Row] = observedMetrics
@classmethod
def fromJObject(cls, jprogress: "JavaObject") -> "StreamingQueryProgress":
@@ -494,11 +489,9 @@ class StreamingQueryProgress(dict):
stateOperators=[StateOperatorProgress.fromJson(s) for s in
j["stateOperators"]],
sources=[SourceProgress.fromJson(s) for s in j["sources"]],
sink=SinkProgress.fromJson(j["sink"]),
- numInputRows=j["numInputRows"] if "numInputRows" in j else None,
- inputRowsPerSecond=j["inputRowsPerSecond"] if "inputRowsPerSecond"
in j else None,
- processedRowsPerSecond=j["processedRowsPerSecond"]
- if "processedRowsPerSecond" in j
- else None,
+ numInputRows=j["numInputRows"],
+ inputRowsPerSecond=j["inputRowsPerSecond"],
+ processedRowsPerSecond=j["processedRowsPerSecond"],
observedMetrics={
k: Row(*row_dict.keys())(*row_dict.values()) # Assume no
nested rows
for k, row_dict in j["observedMetrics"].items()
@@ -513,10 +506,7 @@ class StreamingQueryProgress(dict):
A unique query id that persists across restarts. See
py:meth:`~pyspark.sql.streaming.StreamingQuery.id`.
"""
- # Before Spark 4.0, StreamingQuery.lastProgress returns a dict, which
casts id and runId
- # to string. But here they are UUID.
- # To prevent breaking change, do not cast them to string when accessed
with attribute.
- return super().__getitem__("id")
+ return self._id
@property
def runId(self) -> uuid.UUID:
@@ -524,24 +514,21 @@ class StreamingQueryProgress(dict):
A query id that is unique for every start/restart. See
py:meth:`~pyspark.sql.streaming.StreamingQuery.runId`.
"""
- # Before Spark 4.0, StreamingQuery.lastProgress returns a dict, which
casts id and runId
- # to string. But here they are UUID.
- # To prevent breaking change, do not cast them to string when accessed
with attribute.
- return super().__getitem__("runId")
+ return self._runId
@property
def name(self) -> Optional[str]:
"""
User-specified name of the query, `None` if not specified.
"""
- return self["name"]
+ return self._name
@property
def timestamp(self) -> str:
"""
The timestamp to start a query.
"""
- return self["timestamp"]
+ return self._timestamp
@property
def batchId(self) -> int:
@@ -551,21 +538,21 @@ class StreamingQueryProgress(dict):
Similarly, when there is no data to be processed, the batchId will not
be
incremented.
"""
- return self["batchId"]
+ return self._batchId
@property
def batchDuration(self) -> int:
"""
The process duration of each batch.
"""
- return self["batchDuration"]
+ return self._batchDuration
@property
def durationMs(self) -> Dict[str, int]:
"""
The amount of time taken to perform various operations in milliseconds.
"""
- return self["durationMs"]
+ return self._durationMs
@property
def eventTime(self) -> Dict[str, str]:
@@ -583,21 +570,21 @@ class StreamingQueryProgress(dict):
All timestamps are in ISO8601 format, i.e. UTC timestamps.
"""
- return self["eventTime"]
+ return self._eventTime
@property
def stateOperators(self) -> List["StateOperatorProgress"]:
"""
Information about operators in the query that store state.
"""
- return self["stateOperators"]
+ return self._stateOperators
@property
def sources(self) -> List["SourceProgress"]:
"""
detailed statistics on data being read from each of the streaming
sources.
"""
- return self["sources"]
+ return self._sources
@property
def sink(self) -> "SinkProgress":
@@ -605,41 +592,32 @@ class StreamingQueryProgress(dict):
A unique query id that persists across restarts. See
py:meth:`~pyspark.sql.streaming.StreamingQuery.id`.
"""
- return self["sink"]
+ return self._sink
@property
def observedMetrics(self) -> Dict[str, Row]:
- return self["observedMetrics"]
+ return self._observedMetrics
@property
def numInputRows(self) -> int:
"""
The aggregate (across all sources) number of records processed in a
trigger.
"""
- if self["numInputRows"] is not None:
- return self["numInputRows"]
- else:
- return sum(s.numInputRows for s in self.sources)
+ return self._numInputRows
@property
def inputRowsPerSecond(self) -> float:
"""
The aggregate (across all sources) rate of data arriving.
"""
- if self["inputRowsPerSecond"] is not None:
- return self["inputRowsPerSecond"]
- else:
- return sum(s.inputRowsPerSecond for s in self.sources)
+ return self._inputRowsPerSecond
@property
def processedRowsPerSecond(self) -> float:
"""
The aggregate (across all sources) rate at which Spark is processing
data.
"""
- if self["processedRowsPerSecond"] is not None:
- return self["processedRowsPerSecond"]
- else:
- return sum(s.processedRowsPerSecond for s in self.sources)
+ return self._processedRowsPerSecond
@property
def json(self) -> str:
@@ -663,29 +641,14 @@ class StreamingQueryProgress(dict):
else:
return json.dumps(self._jdict, indent=4)
- def __getitem__(self, key: str) -> Any:
- # Before Spark 4.0, StreamingQuery.lastProgress returns a dict, which
casts id and runId
- # to string. But here they are UUID.
- # To prevent breaking change, also cast them to string when accessed
with __getitem__.
- if key == "id" or key == "runId":
- return str(super().__getitem__(key))
- else:
- return super().__getitem__(key)
-
def __str__(self) -> str:
return self.prettyJson
- def __repr__(self) -> str:
- return self.prettyJson
-
-class StateOperatorProgress(dict):
+class StateOperatorProgress:
"""
.. versionadded:: 3.4.0
- .. versionchanged:: 4.0.0
- Becomes a subclass of dict
-
Notes
-----
This API is evolving.
@@ -708,22 +671,20 @@ class StateOperatorProgress(dict):
jprogress: Optional["JavaObject"] = None,
jdict: Optional[Dict[str, Any]] = None,
):
- super().__init__(
- operatorName=operatorName,
- numRowsTotal=numRowsTotal,
- numRowsUpdated=numRowsUpdated,
- numRowsRemoved=numRowsRemoved,
- allUpdatesTimeMs=allUpdatesTimeMs,
- allRemovalsTimeMs=allRemovalsTimeMs,
- commitTimeMs=commitTimeMs,
- memoryUsedBytes=memoryUsedBytes,
- numRowsDroppedByWatermark=numRowsDroppedByWatermark,
- numShufflePartitions=numShufflePartitions,
- numStateStoreInstances=numStateStoreInstances,
- customMetrics=customMetrics,
- )
self._jprogress: Optional["JavaObject"] = jprogress
self._jdict: Optional[Dict[str, Any]] = jdict
+ self._operatorName: str = operatorName
+ self._numRowsTotal: int = numRowsTotal
+ self._numRowsUpdated: int = numRowsUpdated
+ self._numRowsRemoved: int = numRowsRemoved
+ self._allUpdatesTimeMs: int = allUpdatesTimeMs
+ self._allRemovalsTimeMs: int = allRemovalsTimeMs
+ self._commitTimeMs: int = commitTimeMs
+ self._memoryUsedBytes: int = memoryUsedBytes
+ self._numRowsDroppedByWatermark: int = numRowsDroppedByWatermark
+ self._numShufflePartitions: int = numShufflePartitions
+ self._numStateStoreInstances: int = numStateStoreInstances
+ self._customMetrics: Dict[str, int] = customMetrics
@classmethod
def fromJObject(cls, jprogress: "JavaObject") -> "StateOperatorProgress":
@@ -763,51 +724,51 @@ class StateOperatorProgress(dict):
@property
def operatorName(self) -> str:
- return self["operatorName"]
+ return self._operatorName
@property
def numRowsTotal(self) -> int:
- return self["numRowsTotal"]
+ return self._numRowsTotal
@property
def numRowsUpdated(self) -> int:
- return self["numRowsUpdated"]
+ return self._numRowsUpdated
@property
def allUpdatesTimeMs(self) -> int:
- return self["allUpdatesTimeMs"]
+ return self._allUpdatesTimeMs
@property
def numRowsRemoved(self) -> int:
- return self["numRowsRemoved"]
+ return self._numRowsRemoved
@property
def allRemovalsTimeMs(self) -> int:
- return self["allRemovalsTimeMs"]
+ return self._allRemovalsTimeMs
@property
def commitTimeMs(self) -> int:
- return self["commitTimeMs"]
+ return self._commitTimeMs
@property
def memoryUsedBytes(self) -> int:
- return self["memoryUsedBytes"]
+ return self._memoryUsedBytes
@property
def numRowsDroppedByWatermark(self) -> int:
- return self["numRowsDroppedByWatermark"]
+ return self._numRowsDroppedByWatermark
@property
def numShufflePartitions(self) -> int:
- return self["numShufflePartitions"]
+ return self._numShufflePartitions
@property
def numStateStoreInstances(self) -> int:
- return self["numStateStoreInstances"]
+ return self._numStateStoreInstances
@property
- def customMetrics(self) -> dict:
- return self["customMetrics"]
+ def customMetrics(self) -> Dict[str, int]:
+ return self._customMetrics
@property
def json(self) -> str:
@@ -834,17 +795,11 @@ class StateOperatorProgress(dict):
def __str__(self) -> str:
return self.prettyJson
- def __repr__(self) -> str:
- return self.prettyJson
-
-class SourceProgress(dict):
+class SourceProgress:
"""
.. versionadded:: 3.4.0
- .. versionchanged:: 4.0.0
- Becomes a subclass of dict
-
Notes
-----
This API is evolving.
@@ -863,18 +818,16 @@ class SourceProgress(dict):
jprogress: Optional["JavaObject"] = None,
jdict: Optional[Dict[str, Any]] = None,
) -> None:
- super().__init__(
- description=description,
- startOffset=startOffset,
- endOffset=endOffset,
- latestOffset=latestOffset,
- numInputRows=numInputRows,
- inputRowsPerSecond=inputRowsPerSecond,
- processedRowsPerSecond=processedRowsPerSecond,
- metrics=metrics,
- )
self._jprogress: Optional["JavaObject"] = jprogress
self._jdict: Optional[Dict[str, Any]] = jdict
+ self._description: str = description
+ self._startOffset: str = startOffset
+ self._endOffset: str = endOffset
+ self._latestOffset: str = latestOffset
+ self._numInputRows: int = numInputRows
+ self._inputRowsPerSecond: float = inputRowsPerSecond
+ self._processedRowsPerSecond: float = processedRowsPerSecond
+ self._metrics: Dict[str, str] = metrics
@classmethod
def fromJObject(cls, jprogress: "JavaObject") -> "SourceProgress":
@@ -909,53 +862,53 @@ class SourceProgress(dict):
"""
Description of the source.
"""
- return self["description"]
+ return self._description
@property
def startOffset(self) -> str:
"""
The starting offset for data being read.
"""
- return self["startOffset"]
+ return self._startOffset
@property
def endOffset(self) -> str:
"""
The ending offset for data being read.
"""
- return self["endOffset"]
+ return self._endOffset
@property
def latestOffset(self) -> str:
"""
The latest offset from this source.
"""
- return self["latestOffset"]
+ return self._latestOffset
@property
def numInputRows(self) -> int:
"""
The number of records read from this source.
"""
- return self["numInputRows"]
+ return self._numInputRows
@property
def inputRowsPerSecond(self) -> float:
"""
The rate at which data is arriving from this source.
"""
- return self["inputRowsPerSecond"]
+ return self._inputRowsPerSecond
@property
def processedRowsPerSecond(self) -> float:
"""
The rate at which data from this source is being processed by Spark.
"""
- return self["processedRowsPerSecond"]
+ return self._processedRowsPerSecond
@property
- def metrics(self) -> dict:
- return self["metrics"]
+ def metrics(self) -> Dict[str, str]:
+ return self._metrics
@property
def json(self) -> str:
@@ -982,17 +935,11 @@ class SourceProgress(dict):
def __str__(self) -> str:
return self.prettyJson
- def __repr__(self) -> str:
- return self.prettyJson
-
-class SinkProgress(dict):
+class SinkProgress:
"""
.. versionadded:: 3.4.0
- .. versionchanged:: 4.0.0
- Becomes a subclass of dict
-
Notes
-----
This API is evolving.
@@ -1006,13 +953,11 @@ class SinkProgress(dict):
jprogress: Optional["JavaObject"] = None,
jdict: Optional[Dict[str, Any]] = None,
) -> None:
- super().__init__(
- description=description,
- numOutputRows=numOutputRows,
- metrics=metrics,
- )
self._jprogress: Optional["JavaObject"] = jprogress
self._jdict: Optional[Dict[str, Any]] = jdict
+ self._description: str = description
+ self._numOutputRows: int = numOutputRows
+ self._metrics: Dict[str, str] = metrics
@classmethod
def fromJObject(cls, jprogress: "JavaObject") -> "SinkProgress":
@@ -1037,7 +982,7 @@ class SinkProgress(dict):
"""
Description of the source.
"""
- return self["description"]
+ return self._description
@property
def numOutputRows(self) -> int:
@@ -1045,11 +990,11 @@ class SinkProgress(dict):
Number of rows written to the sink or -1 for Continuous Mode
(temporarily)
or Sink V1 (until decommissioned).
"""
- return self["numOutputRows"]
+ return self._numOutputRows
@property
def metrics(self) -> Dict[str, str]:
- return self["metrics"]
+ return self._metrics
@property
def json(self) -> str:
@@ -1076,9 +1021,6 @@ class SinkProgress(dict):
def __str__(self) -> str:
return self.prettyJson
- def __repr__(self) -> str:
- return self.prettyJson
-
def _test() -> None:
import sys
diff --git a/python/pyspark/sql/streaming/query.py
b/python/pyspark/sql/streaming/query.py
index 916f96a5b2c2..d3d58da3562b 100644
--- a/python/pyspark/sql/streaming/query.py
+++ b/python/pyspark/sql/streaming/query.py
@@ -22,10 +22,7 @@ from pyspark.errors import StreamingQueryException,
PySparkValueError
from pyspark.errors.exceptions.captured import (
StreamingQueryException as CapturedStreamingQueryException,
)
-from pyspark.sql.streaming.listener import (
- StreamingQueryListener,
- StreamingQueryProgress,
-)
+from pyspark.sql.streaming.listener import StreamingQueryListener
if TYPE_CHECKING:
from py4j.java_gateway import JavaObject
@@ -254,7 +251,7 @@ class StreamingQuery:
return json.loads(self._jsq.status().json())
@property
- def recentProgress(self) -> List[StreamingQueryProgress]:
+ def recentProgress(self) -> List[Dict[str, Any]]:
"""
Returns an array of the most recent [[StreamingQueryProgress]] updates
for this query.
The number of progress updates retained for each stream is configured
by Spark session
@@ -283,10 +280,10 @@ class StreamingQuery:
>>> sq.stop()
"""
- return [StreamingQueryProgress.fromJObject(p) for p in
self._jsq.recentProgress()]
+ return [json.loads(p.json()) for p in self._jsq.recentProgress()]
@property
- def lastProgress(self) -> Optional[StreamingQueryProgress]:
+ def lastProgress(self) -> Optional[Dict[str, Any]]:
"""
Returns the most recent :class:`StreamingQueryProgress` update of this
streaming query or
None if there were no progress updates
@@ -314,7 +311,7 @@ class StreamingQuery:
"""
lastProgress = self._jsq.lastProgress()
if lastProgress:
- return StreamingQueryProgress.fromJObject(lastProgress)
+ return json.loads(lastProgress.json())
else:
return None
diff --git a/python/pyspark/sql/tests/streaming/test_streaming.py
b/python/pyspark/sql/tests/streaming/test_streaming.py
index 00d1fbf53885..e284d052d9ae 100644
--- a/python/pyspark/sql/tests/streaming/test_streaming.py
+++ b/python/pyspark/sql/tests/streaming/test_streaming.py
@@ -29,7 +29,7 @@ from pyspark.errors import PySparkValueError
class StreamingTestsMixin:
def test_streaming_query_functions_basic(self):
- df =
self.spark.readStream.format("text").load("python/test_support/sql/streaming")
+ df = self.spark.readStream.format("rate").option("rowsPerSecond",
10).load()
query = (
df.writeStream.format("memory")
.queryName("test_streaming_query_functions_basic")
@@ -43,8 +43,8 @@ class StreamingTestsMixin:
self.assertEqual(query.exception(), None)
self.assertFalse(query.awaitTermination(1))
query.processAllAvailable()
- lastProgress = query.lastProgress
recentProgress = query.recentProgress
+ lastProgress = query.lastProgress
self.assertEqual(lastProgress["name"], query.name)
self.assertEqual(lastProgress["id"], query.id)
self.assertTrue(any(p == lastProgress for p in recentProgress))
@@ -59,46 +59,6 @@ class StreamingTestsMixin:
finally:
query.stop()
- def test_streaming_progress(self):
- """
- Should be able to access fields using attributes in lastProgress /
recentProgress
- e.g. q.lastProgress.id
- """
- df =
self.spark.readStream.format("text").load("python/test_support/sql/streaming")
- query = df.writeStream.format("noop").start()
- try:
- query.processAllAvailable()
- lastProgress = query.lastProgress
- recentProgress = query.recentProgress
- self.assertEqual(lastProgress["name"], query.name)
- # Return str when accessed using dict get.
- self.assertEqual(lastProgress["id"], query.id)
- # SPARK-48567 Use attribute to access fields in q.lastProgress
- self.assertEqual(lastProgress.name, query.name)
- # Return uuid when accessed using attribute.
- self.assertEqual(str(lastProgress.id), query.id)
- self.assertTrue(any(p == lastProgress for p in recentProgress))
- self.assertTrue(lastProgress.numInputRows > 0)
- # Also access source / sink progress with attributes
- self.assertTrue(len(lastProgress.sources) > 0)
- self.assertTrue(lastProgress.sources[0].numInputRows > 0)
- self.assertTrue(lastProgress["sources"][0]["numInputRows"] > 0)
- self.assertTrue(lastProgress.sink.numOutputRows > 0)
- self.assertTrue(lastProgress["sink"]["numOutputRows"] > 0)
- # In Python, for historical reasons, changing field value
- # in StreamingQueryProgress is allowed.
- new_name = "myNewQuery"
- lastProgress["name"] = new_name
- self.assertEqual(lastProgress.name, new_name)
-
- except Exception as e:
- self.fail(
- "Streaming query functions sanity check shouldn't throw any
error. "
- "Error message: " + str(e)
- )
- finally:
- query.stop()
-
def test_streaming_query_name_edge_case(self):
# Query name should be None when not specified
q1 =
self.spark.readStream.format("rate").load().writeStream.format("noop").start()
diff --git a/python/pyspark/sql/tests/streaming/test_streaming_listener.py
b/python/pyspark/sql/tests/streaming/test_streaming_listener.py
index 0f13450849c5..762fc335b56a 100644
--- a/python/pyspark/sql/tests/streaming/test_streaming_listener.py
+++ b/python/pyspark/sql/tests/streaming/test_streaming_listener.py
@@ -227,9 +227,9 @@ class StreamingListenerTestsMixin:
"my_event", count(lit(1)).alias("rc"),
count(col("error")).alias("erc")
)
- q = observed_ds.writeStream.format("noop").start()
+ q = observed_ds.writeStream.format("console").start()
- while q.lastProgress is None or q.lastProgress.batchId == 0:
+ while q.lastProgress is None or q.lastProgress["batchId"] == 0:
q.awaitTermination(0.5)
time.sleep(5)
@@ -241,32 +241,6 @@ class StreamingListenerTestsMixin:
q.stop()
self.spark.streams.removeListener(error_listener)
- def test_streaming_progress(self):
- try:
- # Test a fancier query with stateful operation and observed metrics
- df = self.spark.readStream.format("rate").option("rowsPerSecond",
10).load()
- df_observe = df.observe("my_event", count(lit(1)).alias("rc"))
- df_stateful = df_observe.groupBy().count() # make query stateful
- q = (
- df_stateful.writeStream.format("noop")
- .queryName("test")
- .outputMode("update")
- .trigger(processingTime="5 seconds")
- .start()
- )
-
- while q.lastProgress is None or q.lastProgress.batchId == 0:
- q.awaitTermination(0.5)
-
- q.stop()
-
- self.check_streaming_query_progress(q.lastProgress, True)
- for p in q.recentProgress:
- self.check_streaming_query_progress(p, True)
-
- finally:
- q.stop()
-
class StreamingListenerTests(StreamingListenerTestsMixin, ReusedSQLTestCase):
def test_number_of_public_methods(self):
@@ -381,7 +355,7 @@ class StreamingListenerTests(StreamingListenerTestsMixin,
ReusedSQLTestCase):
.start()
)
self.assertTrue(q.isActive)
- q.awaitTermination(10)
+ time.sleep(10)
q.stop()
# Make sure all events are empty
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]