This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ae6288bdf8b [SPARK-43328][SS] Add latest timestamp on no-execution 
trigger for Idle event in streaming query listener
ae6288bdf8b is described below

commit ae6288bdf8b08e43c0b20a823f23750636a985f4
Author: Jungtaek Lim <kabhwan.opensou...@gmail.com>
AuthorDate: Tue May 2 14:51:27 2023 +0900

    [SPARK-43328][SS] Add latest timestamp on no-execution trigger for Idle 
event in streaming query listener
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to add the latest timestamp on no-execution trigger for 
QueryIdleEvent.
    
    ### Why are the changes needed?
    
    This adds a "human readable" timestamp which is useful for user side 
verification as well as be consistent with other events.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, but QueryIdleEvent is not released yet.
    
    ### How was this patch tested?
    
    Existing tests.
    
    Closes #41001 from HeartSaVioR/SPARK-43328.
    
    Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 python/pyspark/sql/streaming/listener.py                          | 8 ++++++++
 .../apache/spark/sql/execution/streaming/ProgressReporter.scala   | 5 +++--
 .../org/apache/spark/sql/streaming/StreamingQueryListener.scala   | 7 ++++++-
 3 files changed, 17 insertions(+), 3 deletions(-)

diff --git a/python/pyspark/sql/streaming/listener.py 
b/python/pyspark/sql/streaming/listener.py
index 83d866dcd49..5fdcab3dfaf 100644
--- a/python/pyspark/sql/streaming/listener.py
+++ b/python/pyspark/sql/streaming/listener.py
@@ -228,6 +228,7 @@ class QueryIdleEvent:
     def __init__(self, jevent: JavaObject) -> None:
         self._id: uuid.UUID = uuid.UUID(jevent.id().toString())
         self._runId: uuid.UUID = uuid.UUID(jevent.runId().toString())
+        self._timestamp: str = jevent.timestamp()
 
     @property
     def id(self) -> uuid.UUID:
@@ -245,6 +246,13 @@ class QueryIdleEvent:
         """
         return self._runId
 
+    @property
+    def timestamp(self) -> str:
+        """
+        The timestamp when the latest no-batch trigger happened.
+        """
+        return self._timestamp
+
 
 class QueryTerminatedEvent:
     """
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index aad4a29d85f..6dbecd186dc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -154,8 +154,9 @@ trait ProgressReporter extends Logging {
   }
 
   private def postIdleness(): Unit = {
-    postEvent(new QueryIdleEvent(id, runId))
-    logInfo("Streaming query has been idle and waiting for new data.")
+    postEvent(new QueryIdleEvent(id, runId, 
formatTimestamp(currentTriggerStartTimestamp)))
+    logInfo(s"Streaming query has been idle and waiting for new data more than 
" +
+      s"${noDataProgressEventInterval} ms.")
   }
 
   /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
index 7ffeee60cc8..aec5afe24c9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
@@ -135,12 +135,17 @@ object StreamingQueryListener {
 
   /**
    * Event representing that query is idle and waiting for new data to process.
+   *
+   * @param id    A unique query id that persists across restarts. See 
`StreamingQuery.id()`.
+   * @param runId A query id that is unique for every start/restart. See 
`StreamingQuery.runId()`.
+   * @param timestamp The timestamp when the latest no-batch trigger happened.
    * @since 3.5.0
    */
   @Evolving
   class QueryIdleEvent private[sql](
       val id: UUID,
-      val runId: UUID) extends Event
+      val runId: UUID,
+      val timestamp: String) extends Event
 
   /**
    * Event representing that termination of a query.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to