This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7606ad32d61 [SPARK-44484][SS] Add batchDuration to
StreamingQueryProgress json method
7606ad32d61 is described below
commit 7606ad32d6163d0219bec62176d185815de4eebc
Author: Wei Liu <[email protected]>
AuthorDate: Fri Jul 21 13:49:03 2023 +0900
[SPARK-44484][SS] Add batchDuration to StreamingQueryProgress json method
### What changes were proposed in this pull request?
Add the missing field batchDuration to StreamingQueryProgress json method.
Also modify tests accordingly
### Why are the changes needed?
Add a missing field
### Does this PR introduce _any_ user-facing change?
Probably yes - in their call to `query.lastProgress` or
`query.recentProgress` and inside listener this new field will show up
### How was this patch tested?
Existing unit tests
Closes #42077 from WweiL/SPARK-44484-missing-json-field-progress.
Authored-by: Wei Liu <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala | 1 +
.../org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala | 1 +
sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala | 1 +
.../spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala | 2 ++
4 files changed, 5 insertions(+)
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index 123b3306f2a..8370a336abb 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -182,6 +182,7 @@ class StreamingQueryProgress private[spark] (
("name" -> JString(name)) ~
("timestamp" -> JString(timestamp)) ~
("batchId" -> JInt(batchId)) ~
+ ("batchDuration" -> JInt(batchDuration)) ~
("numInputRows" -> JInt(numInputRows)) ~
("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond))
~
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala
index a6a44c1bd71..2911e4e016e 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala
@@ -161,6 +161,7 @@ class StreamingQueryProgressSuite extends ConnectFunSuite {
| "name" : "myName",
| "timestamp" : "2016-12-05T20:54:20.827Z",
| "batchId" : 2,
+ | "batchDuration" : 0,
| "numInputRows" : 1467,
| "inputRowsPerSecond" : 22.0,
| "durationMs" : {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index 5cce52b9c0f..385157a09c1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -166,6 +166,7 @@ class StreamingQueryProgress private[spark](
("name" -> JString(name)) ~
("timestamp" -> JString(timestamp)) ~
("batchId" -> JInt(batchId)) ~
+ ("batchDuration" -> JInt(batchDuration)) ~
("numInputRows" -> JInt(numInputRows)) ~
("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
index d016b334627..1b6005257c0 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
@@ -46,6 +46,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest
with Eventually {
| "name" : "myName",
| "timestamp" : "2016-12-05T20:54:20.827Z",
| "batchId" : 2,
+ | "batchDuration" : 0,
| "numInputRows" : 678,
| "inputRowsPerSecond" : 10.0,
| "durationMs" : {
@@ -112,6 +113,7 @@ class StreamingQueryStatusAndProgressSuite extends
StreamTest with Eventually {
| "name" : null,
| "timestamp" : "2016-12-05T20:54:20.827Z",
| "batchId" : 2,
+ | "batchDuration" : 0,
| "numInputRows" : 678,
| "durationMs" : {
| "total" : 0
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]