This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new 06f48855d7b9 [SPARK-45631][SS][PYSPARK] Remove @abstractmethod from
onQueryIdle in PySpark StreamingQueryListener
06f48855d7b9 is described below
commit 06f48855d7b9e0cd0b02b6f7884af39ce1a5f68c
Author: Jungtaek Lim <[email protected]>
AuthorDate: Mon Oct 23 21:12:01 2023 +0900
[SPARK-45631][SS][PYSPARK] Remove @abstractmethod from onQueryIdle in
PySpark StreamingQueryListener
### What changes were proposed in this pull request?
Credit to anish-db for the initial investigation and the fix.
This PR proposes to remove `abstractmethod` annotation from `onQueryIdle`
in PySpark StreamingQueryListener.
The function `onQueryIdle` was added with the annotation `abstractmethod`,
which does not pick up default implementation and enforces users to implement
the new method. This breaks all existing streaming query listener
implementations and enforces them to add the dummy function implementation at
least.
This PR re-allows existing implementations to work properly without
explicitly adding a new function `onQueryIdle`.
### Why are the changes needed?
We broke backward compatibility in
[SPARK-43183](https://issues.apache.org/jira/browse/SPARK-43183) and we want to
fix it.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Modified tests. Now tests are verifying two different implementations
covering old interface vs new interface.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #43483 from HeartSaVioR/SPARK-45631.
Lead-authored-by: Jungtaek Lim <[email protected]>
Co-authored-by: Anish Shrigondekar <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
(cherry picked from commit 75bc5ac9f2b07bc894091b8b15682ee906a19356)
Signed-off-by: Jungtaek Lim <[email protected]>
---
python/pyspark/sql/streaming/listener.py | 4 +-
.../sql/tests/streaming/test_streaming_listener.py | 117 ++++++++++++++-------
2 files changed, 82 insertions(+), 39 deletions(-)
diff --git a/python/pyspark/sql/streaming/listener.py
b/python/pyspark/sql/streaming/listener.py
index 16f40396490c..3a0f30872dc8 100644
--- a/python/pyspark/sql/streaming/listener.py
+++ b/python/pyspark/sql/streaming/listener.py
@@ -107,7 +107,9 @@ class StreamingQueryListener(ABC):
"""
pass
- @abstractmethod
+ # NOTE: Do not mark this as abstract method, since we released this
abstract class without
+ # this method in prior version and marking this as abstract method would
break existing
+ # implementations.
def onQueryIdle(self, event: "QueryIdleEvent") -> None:
"""
Called when the query is idle and waiting for new data to process.
diff --git a/python/pyspark/sql/tests/streaming/test_streaming_listener.py
b/python/pyspark/sql/tests/streaming/test_streaming_listener.py
index 87d0dae00d8b..05c1ec71675c 100644
--- a/python/pyspark/sql/tests/streaming/test_streaming_listener.py
+++ b/python/pyspark/sql/tests/streaming/test_streaming_listener.py
@@ -251,7 +251,23 @@ class StreamingListenerTests(StreamingListenerTestsMixin,
ReusedSQLTestCase):
progress_event = None
terminated_event = None
- class TestListener(StreamingQueryListener):
+ # V1: Initial interface of StreamingQueryListener containing methods
`onQueryStarted`,
+ # `onQueryProgress`, `onQueryTerminated`. It is prior to Spark 3.5.
+ class TestListenerV1(StreamingQueryListener):
+ def onQueryStarted(self, event):
+ nonlocal start_event
+ start_event = event
+
+ def onQueryProgress(self, event):
+ nonlocal progress_event
+ progress_event = event
+
+ def onQueryTerminated(self, event):
+ nonlocal terminated_event
+ terminated_event = event
+
+ # V2: The interface after the method `onQueryIdle` is added. It is
Spark 3.5+.
+ class TestListenerV2(StreamingQueryListener):
def onQueryStarted(self, event):
nonlocal start_event
start_event = event
@@ -267,48 +283,71 @@ class StreamingListenerTests(StreamingListenerTestsMixin,
ReusedSQLTestCase):
nonlocal terminated_event
terminated_event = event
- test_listener = TestListener()
+ def verify(test_listener):
+ nonlocal start_event
+ nonlocal progress_event
+ nonlocal terminated_event
- try:
- self.spark.streams.addListener(test_listener)
+ start_event = None
+ progress_event = None
+ terminated_event = None
- df = self.spark.readStream.format("rate").option("rowsPerSecond",
10).load()
+ try:
+ self.spark.streams.addListener(test_listener)
- # check successful stateful query
- df_stateful = df.groupBy().count() # make query stateful
- q = (
- df_stateful.writeStream.format("noop")
- .queryName("test")
- .outputMode("complete")
- .start()
- )
- self.assertTrue(q.isActive)
- time.sleep(10)
- q.stop()
+ df =
self.spark.readStream.format("rate").option("rowsPerSecond", 10).load()
- # Make sure all events are empty
- self.spark.sparkContext._jsc.sc().listenerBus().waitUntilEmpty()
+ # check successful stateful query
+ df_stateful = df.groupBy().count() # make query stateful
+ q = (
+ df_stateful.writeStream.format("noop")
+ .queryName("test")
+ .outputMode("complete")
+ .start()
+ )
+ self.assertTrue(q.isActive)
+ time.sleep(10)
+ q.stop()
- self.check_start_event(start_event)
- self.check_progress_event(progress_event)
- self.check_terminated_event(terminated_event)
+ # Make sure all events are empty
+
self.spark.sparkContext._jsc.sc().listenerBus().waitUntilEmpty()
- # Check query terminated with exception
- from pyspark.sql.functions import col, udf
+ self.check_start_event(start_event)
+ self.check_progress_event(progress_event)
+ self.check_terminated_event(terminated_event)
- bad_udf = udf(lambda x: 1 / 0)
- q =
df.select(bad_udf(col("value"))).writeStream.format("noop").start()
- time.sleep(5)
- q.stop()
- self.spark.sparkContext._jsc.sc().listenerBus().waitUntilEmpty()
- self.check_terminated_event(terminated_event, "ZeroDivisionError")
+ # Check query terminated with exception
+ from pyspark.sql.functions import col, udf
- finally:
- self.spark.streams.removeListener(test_listener)
+ bad_udf = udf(lambda x: 1 / 0)
+ q =
df.select(bad_udf(col("value"))).writeStream.format("noop").start()
+ time.sleep(5)
+ q.stop()
+
self.spark.sparkContext._jsc.sc().listenerBus().waitUntilEmpty()
+ self.check_terminated_event(terminated_event,
"ZeroDivisionError")
+
+ finally:
+ self.spark.streams.removeListener(test_listener)
+
+ verify(TestListenerV1())
+ verify(TestListenerV2())
def test_remove_listener(self):
# SPARK-38804: Test StreamingQueryManager.removeListener
- class TestListener(StreamingQueryListener):
+ # V1: Initial interface of StreamingQueryListener containing methods
`onQueryStarted`,
+ # `onQueryProgress`, `onQueryTerminated`. It is prior to Spark 3.5.
+ class TestListenerV1(StreamingQueryListener):
+ def onQueryStarted(self, event):
+ pass
+
+ def onQueryProgress(self, event):
+ pass
+
+ def onQueryTerminated(self, event):
+ pass
+
+ # V2: The interface after the method `onQueryIdle` is added. It is
Spark 3.5+.
+ class TestListenerV2(StreamingQueryListener):
def onQueryStarted(self, event):
pass
@@ -321,13 +360,15 @@ class StreamingListenerTests(StreamingListenerTestsMixin,
ReusedSQLTestCase):
def onQueryTerminated(self, event):
pass
- test_listener = TestListener()
+ def verify(test_listener):
+ num_listeners = len(self.spark.streams._jsqm.listListeners())
+ self.spark.streams.addListener(test_listener)
+ self.assertEqual(num_listeners + 1,
len(self.spark.streams._jsqm.listListeners()))
+ self.spark.streams.removeListener(test_listener)
+ self.assertEqual(num_listeners,
len(self.spark.streams._jsqm.listListeners()))
- num_listeners = len(self.spark.streams._jsqm.listListeners())
- self.spark.streams.addListener(test_listener)
- self.assertEqual(num_listeners + 1,
len(self.spark.streams._jsqm.listListeners()))
- self.spark.streams.removeListener(test_listener)
- self.assertEqual(num_listeners,
len(self.spark.streams._jsqm.listListeners()))
+ verify(TestListenerV1())
+ verify(TestListenerV2())
def test_query_started_event_fromJson(self):
start_event = """
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]