This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7606ad32d61 [SPARK-44484][SS] Add batchDuration to 
StreamingQueryProgress json method
7606ad32d61 is described below

commit 7606ad32d6163d0219bec62176d185815de4eebc
Author: Wei Liu <wei....@databricks.com>
AuthorDate: Fri Jul 21 13:49:03 2023 +0900

    [SPARK-44484][SS] Add batchDuration to StreamingQueryProgress json method
    
    ### What changes were proposed in this pull request?
    
    Add the missing field batchDuration to StreamingQueryProgress json method. 
Also modify tests accordingly
    
    ### Why are the changes needed?
    
    Add a missing field
    
    ### Does this PR introduce _any_ user-facing change?
    
    Probably yes - in their call to `query.lastProgress` or 
`query.recentProgress` and inside listener this new field will show up
    
    ### How was this patch tested?
    
    Existing unit tests
    
    Closes #42077 from WweiL/SPARK-44484-missing-json-field-progress.
    
    Authored-by: Wei Liu <wei....@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala    | 1 +
 .../org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala    | 1 +
 sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala   | 1 +
 .../spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala      | 2 ++
 4 files changed, 5 insertions(+)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index 123b3306f2a..8370a336abb 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -182,6 +182,7 @@ class StreamingQueryProgress private[spark] (
       ("name" -> JString(name)) ~
       ("timestamp" -> JString(timestamp)) ~
       ("batchId" -> JInt(batchId)) ~
+      ("batchDuration" -> JInt(batchDuration)) ~
       ("numInputRows" -> JInt(numInputRows)) ~
       ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
       ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) 
~
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala
index a6a44c1bd71..2911e4e016e 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala
@@ -161,6 +161,7 @@ class StreamingQueryProgressSuite extends ConnectFunSuite {
         |  "name" : "myName",
         |  "timestamp" : "2016-12-05T20:54:20.827Z",
         |  "batchId" : 2,
+        |  "batchDuration" : 0,
         |  "numInputRows" : 1467,
         |  "inputRowsPerSecond" : 22.0,
         |  "durationMs" : {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index 5cce52b9c0f..385157a09c1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -166,6 +166,7 @@ class StreamingQueryProgress private[spark](
     ("name" -> JString(name)) ~
     ("timestamp" -> JString(timestamp)) ~
     ("batchId" -> JInt(batchId)) ~
+    ("batchDuration" -> JInt(batchDuration)) ~
     ("numInputRows" -> JInt(numInputRows)) ~
     ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
     ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
index d016b334627..1b6005257c0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
@@ -46,6 +46,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest 
with Eventually {
         |  "name" : "myName",
         |  "timestamp" : "2016-12-05T20:54:20.827Z",
         |  "batchId" : 2,
+        |  "batchDuration" : 0,
         |  "numInputRows" : 678,
         |  "inputRowsPerSecond" : 10.0,
         |  "durationMs" : {
@@ -112,6 +113,7 @@ class StreamingQueryStatusAndProgressSuite extends 
StreamTest with Eventually {
          |  "name" : null,
          |  "timestamp" : "2016-12-05T20:54:20.827Z",
          |  "batchId" : 2,
+         |  "batchDuration" : 0,
          |  "numInputRows" : 678,
          |  "durationMs" : {
          |    "total" : 0


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to