This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ae6288bdf8b [SPARK-43328][SS] Add latest timestamp on no-execution
trigger for Idle event in streaming query listener
ae6288bdf8b is described below
commit ae6288bdf8b08e43c0b20a823f23750636a985f4
Author: Jungtaek Lim <[email protected]>
AuthorDate: Tue May 2 14:51:27 2023 +0900
[SPARK-43328][SS] Add latest timestamp on no-execution trigger for Idle
event in streaming query listener
### What changes were proposed in this pull request?
This PR proposes to add the latest timestamp on no-execution trigger for
QueryIdleEvent.
### Why are the changes needed?
This adds a "human readable" timestamp which is useful for user side
verification as well as be consistent with other events.
### Does this PR introduce _any_ user-facing change?
Yes, but QueryIdleEvent is not released yet.
### How was this patch tested?
Existing tests.
Closes #41001 from HeartSaVioR/SPARK-43328.
Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
python/pyspark/sql/streaming/listener.py | 8 ++++++++
.../apache/spark/sql/execution/streaming/ProgressReporter.scala | 5 +++--
.../org/apache/spark/sql/streaming/StreamingQueryListener.scala | 7 ++++++-
3 files changed, 17 insertions(+), 3 deletions(-)
diff --git a/python/pyspark/sql/streaming/listener.py
b/python/pyspark/sql/streaming/listener.py
index 83d866dcd49..5fdcab3dfaf 100644
--- a/python/pyspark/sql/streaming/listener.py
+++ b/python/pyspark/sql/streaming/listener.py
@@ -228,6 +228,7 @@ class QueryIdleEvent:
def __init__(self, jevent: JavaObject) -> None:
self._id: uuid.UUID = uuid.UUID(jevent.id().toString())
self._runId: uuid.UUID = uuid.UUID(jevent.runId().toString())
+ self._timestamp: str = jevent.timestamp()
@property
def id(self) -> uuid.UUID:
@@ -245,6 +246,13 @@ class QueryIdleEvent:
"""
return self._runId
+ @property
+ def timestamp(self) -> str:
+ """
+ The timestamp when the latest no-batch trigger happened.
+ """
+ return self._timestamp
+
class QueryTerminatedEvent:
"""
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index aad4a29d85f..6dbecd186dc 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -154,8 +154,9 @@ trait ProgressReporter extends Logging {
}
private def postIdleness(): Unit = {
- postEvent(new QueryIdleEvent(id, runId))
- logInfo("Streaming query has been idle and waiting for new data.")
+ postEvent(new QueryIdleEvent(id, runId,
formatTimestamp(currentTriggerStartTimestamp)))
+ logInfo(s"Streaming query has been idle and waiting for new data more than
" +
+ s"${noDataProgressEventInterval} ms.")
}
/**
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
index 7ffeee60cc8..aec5afe24c9 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
@@ -135,12 +135,17 @@ object StreamingQueryListener {
/**
* Event representing that query is idle and waiting for new data to process.
+ *
+ * @param id A unique query id that persists across restarts. See
`StreamingQuery.id()`.
+ * @param runId A query id that is unique for every start/restart. See
`StreamingQuery.runId()`.
+ * @param timestamp The timestamp when the latest no-batch trigger happened.
* @since 3.5.0
*/
@Evolving
class QueryIdleEvent private[sql](
val id: UUID,
- val runId: UUID) extends Event
+ val runId: UUID,
+ val timestamp: String) extends Event
/**
* Event representing that termination of a query.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]