This is an automated email from the ASF dual-hosted git repository.

ashrigondekar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a309a0570279 [SPARK-54390][SS] Fix JSON Deserialize in 
StreamingQueryListenerBus
a309a0570279 is described below

commit a309a05702795df575c1e86639ccb8d6948f6c5f
Author: Dylan Wong <[email protected]>
AuthorDate: Fri Jan 2 11:53:33 2026 -0800

    [SPARK-54390][SS] Fix JSON Deserialize in StreamingQueryListenerBus
    
    ### What changes were proposed in this pull request?
    
    The PR changes a few things:
    1. Convert JSON objects and array to strings when deserializing using 
`ObjectToStringDeserializer`. (main fix)
    2. Sets the default values for `inputRowsPerSecond` and 
`processedRowsPerSecond` to be `Double.NaN`. This fixes another silent 
deserialization issue. (fix makes testing easier)
    3. When getting `jsonValue ` for `"observedMetrics"` in 
`StreamingQueryProgress` we return `JNothing` if parsing fails. (fix makes 
testing easier, but is also another issue that needs to be addressed in the 
future)
    
    ### Why are the changes needed?
    
    When using Spark Connect, JSON based offsets will not be deserialized 
properly.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, this corrects the issues with the StreamingQueryListener with Spark 
Connect.
    
    ### How was this patch tested?
    
    Added a new test progress to unit tests which contains Object and Array 
based source offsets.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #53102 from dylanwong250/SPARK-54390.
    
    Authored-by: Dylan Wong <[email protected]>
    Signed-off-by: Anish Shrigondekar <[email protected]>
---
 .../org/apache/spark/sql/streaming/progress.scala  |  41 +++-
 .../streaming/StreamingQueryListenerSuite.scala    |  22 ++
 .../StreamingQueryStatusAndProgressSuite.scala     | 253 ++++++++++++++++++++-
 3 files changed, 302 insertions(+), 14 deletions(-)

diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index 2f5b9475a750..5652545ea567 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -25,7 +25,8 @@ import scala.jdk.CollectionConverters._
 import scala.math.BigDecimal.RoundingMode
 import scala.util.control.NonFatal
 
-import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.core.JsonParser
+import com.fasterxml.jackson.databind.{DeserializationContext, 
DeserializationFeature, JsonDeserializer, JsonNode, ObjectMapper}
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize
 import com.fasterxml.jackson.module.scala.{ClassTagExtensions, 
DefaultScalaModule}
 import org.json4s._
@@ -191,7 +192,19 @@ class StreamingQueryProgress private[spark] (
       ("stateOperators" -> JArray(stateOperators.map(_.jsonValue).toList)) ~
       ("sources" -> JArray(sources.map(_.jsonValue).toList)) ~
       ("sink" -> sink.jsonValue) ~
-      ("observedMetrics" -> safeMapToJValue[Row](observedMetrics, (_, row) => 
row.jsonValue))
+      ("observedMetrics" -> {
+        // TODO: SPARK-54391
+        // In Spark connect, the observedMetrics is serialized but is not 
deserialized properly when
+        // being sent back to the client and the schema is null. So calling 
row.jsonValue will throw
+        // an exception so we need to catch the exception and return JNothing.
+        // This is because the Row.jsonValue method is a one way method and 
there is no reverse
+        // method to convert the JSON back to a Row.
+        try {
+          safeMapToJValue[Row](observedMetrics, (_, row) => row.jsonValue)
+        } catch {
+          case NonFatal(e) => JNothing
+        }
+      })
   }
 }
 
@@ -210,6 +223,19 @@ private[spark] object StreamingQueryProgress {
     mapper.readValue[StreamingQueryProgress](json)
 }
 
+// SPARK-54390: Custom deserializer that converts JSON objects to strings for 
offset fields
+private class ObjectToStringDeserializer extends JsonDeserializer[String] {
+  override def deserialize(parser: JsonParser, context: 
DeserializationContext): String = {
+    val node: JsonNode = parser.readValueAsTree()
+    if (node.isTextual) {
+      node.asText()
+    } else {
+      // Convert JSON object/array to string representation
+      node.toString
+    }
+  }
+}
+
 /**
  * Information about progress made for a source in the execution of a 
[[StreamingQuery]] during a
  * trigger. See [[StreamingQueryProgress]] for more information.
@@ -233,12 +259,19 @@ private[spark] object StreamingQueryProgress {
 @Evolving
 class SourceProgress protected[spark] (
     val description: String,
+    // SPARK-54390: Use a custom deserializer to convert the JSON object to a 
string.
+    @JsonDeserialize(using = classOf[ObjectToStringDeserializer])
     val startOffset: String,
+    @JsonDeserialize(using = classOf[ObjectToStringDeserializer])
     val endOffset: String,
+    @JsonDeserialize(using = classOf[ObjectToStringDeserializer])
     val latestOffset: String,
     val numInputRows: Long,
-    val inputRowsPerSecond: Double,
-    val processedRowsPerSecond: Double,
+    // The NaN is used in deserialization to indicate the value was not set.
+    // The NaN is then used to not output this field in the JSON.
+    // In Spark connect, we need to ensure that the default value is 
Double.NaN instead of 0.0.
+    val inputRowsPerSecond: Double = Double.NaN,
+    val processedRowsPerSecond: Double = Double.NaN,
     val metrics: ju.Map[String, String] = Map[String, String]().asJava)
     extends Serializable {
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 4eabc82281e1..645dbef7abfd 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -21,6 +21,7 @@ import java.util.UUID
 
 import scala.collection.mutable
 
+import org.json4s.jackson.JsonMethods.{compact, parse, render}
 import org.scalactic.{Equality, TolerantNumerics}
 import org.scalatest.BeforeAndAfter
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
@@ -286,6 +287,12 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
     )
   }
 
+  private def removeFieldFromJson(jsonString: String, fieldName: String): 
String = {
+    val jv = parse(jsonString, useBigDecimalForDouble = true)
+    val removed = jv.removeField { case (name, _) => name == fieldName }
+    compact(render(removed))
+  }
+
   test("QueryProgressEvent serialization") {
     def testSerialization(event: QueryProgressEvent): Unit = {
       import scala.jdk.CollectionConverters._
@@ -294,9 +301,24 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
       assert(newEvent.progress.json === event.progress.json)  // json as a 
proxy for equality
       assert(newEvent.progress.durationMs.asScala === 
event.progress.durationMs.asScala)
       assert(newEvent.progress.eventTime.asScala === 
event.progress.eventTime.asScala)
+
+      // Verify we can get the event back from the JSON string, this is 
important for Spark Connect
+      // and the StreamingQueryListenerBus. This is the method that is used to 
deserialize the event
+      // in StreamingQueryListenerBus.queryEventHandler
+      val eventFromNewEvent = QueryProgressEvent.fromJson(newEvent.json)
+      // TODO: Remove after SC-206585 is fixed
+      // We remove the observedMetrics field because it is not serialized 
properly when being
+      // removed from the listener bus, so this test is to verify that 
everything expect the
+      // observedMetrics field is equal in the JSON string
+      val eventWithoutObservedMetrics = 
removeFieldFromJson(event.progress.json, "observedMetrics")
+      assert(eventFromNewEvent.progress.json === eventWithoutObservedMetrics)
     }
     testSerialization(new 
QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress1))
     testSerialization(new 
QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress2))
+    testSerialization(new 
QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress3))
+    testSerialization(new 
QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress4))
+    testSerialization(new 
QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress5))
+    testSerialization(new 
QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress6))
   }
 
   test("QueryTerminatedEvent serialization") {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
index 9c1e16460879..5b692e4c42c0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
@@ -169,21 +169,148 @@ class StreamingQueryStatusAndProgressSuite extends 
StreamTest with Eventually wi
          |}
       """.stripMargin.trim)
     assert(compact(parse(json2)) === testProgress2.json)
+
+    val json5 = testProgress5.prettyJson
+    assertJson(
+      json5,
+      s"""
+         |{
+         |  "id" : "${testProgress5.id.toString}",
+         |  "runId" : "${testProgress5.runId.toString}",
+         |  "name" : null,
+         |  "timestamp" : "2025-08-22T00:00:00.111Z",
+         |  "batchId" : 97,
+         |  "batchDuration" : 12,
+         |  "numInputRows" : 201,
+         |  "inputRowsPerSecond" : 20.1,
+         |  "processedRowsPerSecond" : 20.1,
+         |  "stateOperators" : [ ],
+         |  "sources" : [ {
+         |    "description" : "kafka",
+         |    "startOffset" : {
+         |      "topic" : {
+         |        "0" : 123
+         |      }
+         |    },
+         |    "endOffset" : {
+         |      "topic" : {
+         |        "0" : 456
+         |      }
+         |    },
+         |    "latestOffset" : {
+         |      "topic" : {
+         |        "0" : 789
+         |      }
+         |    },
+         |    "numInputRows" : 100,
+         |    "inputRowsPerSecond" : 10.0,
+         |    "processedRowsPerSecond" : 10.0
+         |  }, {
+         |    "description" : "kinesis",
+         |    "startOffset" : [ {
+         |      "shard" : {
+         |        "stream" : "stream1",
+         |        "shardId" : "shard1"
+         |      },
+         |      "firstSeqNum" : null,
+         |      "lastSeqNum" : "123",
+         |      "closed" : false,
+         |      "msBehindLatest" : null,
+         |      "lastRecordSeqNum" : null
+         |    } ],
+         |    "endOffset" : [ {
+         |      "shard" : {
+         |        "stream" : "stream1",
+         |        "shardId" : "shard1"
+         |      },
+         |      "firstSeqNum" : null,
+         |      "lastSeqNum" : "456",
+         |      "closed" : false,
+         |      "msBehindLatest" : null,
+         |      "lastRecordSeqNum" : null
+         |    } ],
+         |    "latestOffset" : [ {
+         |      "shard" : {
+         |        "stream" : "stream1",
+         |        "shardId" : "shard1"
+         |      },
+         |      "firstSeqNum" : null,
+         |      "lastSeqNum" : "789",
+         |      "closed" : false,
+         |      "msBehindLatest" : null,
+         |      "lastRecordSeqNum" : null
+         |    } ],
+         |    "numInputRows" : 101,
+         |    "inputRowsPerSecond" : 10.1,
+         |    "processedRowsPerSecond" : 10.1
+         |  } ],
+         |  "sink" : {
+         |    "description" : "sink",
+         |    "numOutputRows" : -1
+         |  }
+         |}
+      """.stripMargin.trim)
+    assert(compact(parse(json5)) === testProgress5.json)
+
+    val json6 = testProgress6.prettyJson
+    assertJson(
+      json6,
+      s"""
+         |{
+         |  "id" : "${testProgress6.id.toString}",
+         |  "runId" : "${testProgress6.runId.toString}",
+         |  "name" : "myName",
+         |  "timestamp" : "2025-09-19T00:00:00.111Z",
+         |  "batchId" : 97,
+         |  "batchDuration" : 12,
+         |  "numInputRows" : 1001,
+         |  "stateOperators" : [ ],
+         |  "sources" : [ {
+         |    "description" : "kafka",
+         |    "startOffset" : 1000,
+         |    "endOffset" : 2000,
+         |    "latestOffset" : 3000,
+         |    "numInputRows" : 1001
+         |  } ],
+         |  "sink" : {
+         |    "description" : "sink",
+         |    "numOutputRows" : -1
+         |  }
+         |}
+      """.stripMargin.trim)
+    assert(compact(parse(json6)) === testProgress6.json)
   }
 
   test("StreamingQueryProgress - json") {
     assert(compact(parse(testProgress1.json)) === testProgress1.json)
     assert(compact(parse(testProgress2.json)) === testProgress2.json)
     assert(compact(parse(testProgress3.json)) === testProgress3.json)
+    assert(compact(parse(testProgress4.json, useBigDecimalForDouble = true)) 
=== testProgress4.json)
+    assert(compact(parse(testProgress5.json)) === testProgress5.json)
+    assert(compact(parse(testProgress6.json)) === testProgress6.json)
+    assert(compact(parse(testProgress7.json)) === testProgress7.json)
   }
 
   test("StreamingQueryProgress - toString") {
     assert(testProgress1.toString === testProgress1.prettyJson)
     assert(testProgress2.toString === testProgress2.prettyJson)
+    assert(testProgress3.toString === testProgress3.prettyJson)
+    assert(testProgress4.toString === testProgress4.prettyJson)
+    assert(testProgress5.toString === testProgress5.prettyJson)
+    assert(testProgress6.toString === testProgress6.prettyJson)
+    assert(testProgress7.toString === testProgress7.prettyJson)
   }
 
   test("StreamingQueryProgress - jsonString and fromJson") {
-    Seq(testProgress1, testProgress2).foreach { input =>
+    Seq(
+      testProgress1,
+      testProgress2,
+      testProgress3,
+      testProgress4,
+      testProgress5,
+      testProgress6,
+      testProgress7
+    ).foreach { input =>
       val jsonString = StreamingQueryProgress.jsonString(input)
       val result = StreamingQueryProgress.fromJson(jsonString)
       assert(input.id == result.id)
@@ -221,7 +348,11 @@ class StreamingQueryStatusAndProgressSuite extends 
StreamTest with Eventually wi
         } else {
           assert(s1.inputRowsPerSecond == s2.inputRowsPerSecond)
         }
-        assert(s1.processedRowsPerSecond == s2.processedRowsPerSecond)
+        if (s1.processedRowsPerSecond.isNaN) {
+          assert(s2.processedRowsPerSecond.isNaN)
+        } else {
+          assert(s1.processedRowsPerSecond == s2.processedRowsPerSecond)
+        }
         assert(s1.metrics == s2.metrics)
       }
 
@@ -232,10 +363,14 @@ class StreamingQueryStatusAndProgressSuite extends 
StreamTest with Eventually wi
       }
 
       val resultObservedMetrics = result.observedMetrics
-      assert(input.observedMetrics.size() == resultObservedMetrics.size())
-      assert(input.observedMetrics.keySet() == resultObservedMetrics.keySet())
-      input.observedMetrics.entrySet().forEach { e =>
-        assert(e.getValue == resultObservedMetrics.get(e.getKey))
+      if (resultObservedMetrics != null) {
+        assert(input.observedMetrics.size() == resultObservedMetrics.size())
+        assert(input.observedMetrics.keySet() == 
resultObservedMetrics.keySet())
+        input.observedMetrics.entrySet().forEach { e =>
+          assert(e.getValue == resultObservedMetrics.get(e.getKey))
+        }
+      } else {
+        assert(input.observedMetrics == null)
       }
     }
   }
@@ -437,8 +572,8 @@ class StreamingQueryStatusAndProgressSuite extends 
StreamTest with Eventually wi
   }
 
   test("SPARK-53690: avgOffsetsBehindLatest should never be in scientific 
notation") {
-    val progress = testProgress5.jsonValue
-    val progressPretty = testProgress5.prettyJson
+    val progress = testProgress7.jsonValue
+    val progressPretty = testProgress7.prettyJson
 
     // Actual values
     val avgOffsetsBehindLatest: Double = 2.70941269E8
@@ -465,8 +600,8 @@ class StreamingQueryStatusAndProgressSuite extends 
StreamTest with Eventually wi
     progressPretty shouldBe
       s"""
          |{
-         |  "id" : "${testProgress5.id.toString}",
-         |  "runId" : "${testProgress5.runId.toString}",
+         |  "id" : "${testProgress7.id.toString}",
+         |  "runId" : "${testProgress7.runId.toString}",
          |  "name" : "KafkaMetricsTest",
          |  "timestamp" : "2025-09-23T06:00:00.000Z",
          |  "batchId" : 1250,
@@ -680,6 +815,104 @@ object StreamingQueryStatusAndProgressSuite {
   )
 
   val testProgress5 = new StreamingQueryProgress(
+    id = UUID.randomUUID,
+    runId = UUID.randomUUID,
+    name = null, // should not be present in the json
+    timestamp = "2025-08-22T00:00:00.111Z",
+    batchId = 97L,
+    batchDuration = 12L,
+    durationMs = null,
+    // empty maps should be handled correctly
+    eventTime = null,
+    stateOperators = Array(),
+    sources = Array(
+      new SourceProgress(
+        description = "kafka",
+        startOffset = """{"topic":{"0":123}}""",
+        endOffset = """{"topic":{"0":456}}""",
+        latestOffset = """{"topic":{"0":789}}""",
+        numInputRows = 100,
+        inputRowsPerSecond = 10.0,
+        processedRowsPerSecond = 10.0
+      ),
+      new SourceProgress(
+        description = "kinesis",
+        startOffset =
+          """
+            |[{
+            |  "shard": {
+            |    "stream": "stream1",
+            |    "shardId": "shard1"
+            |  },
+            |  "firstSeqNum": null,
+            |  "lastSeqNum": "123",
+            |  "closed": false,
+            |  "msBehindLatest": null,
+            |  "lastRecordSeqNum": null
+            |}]
+          """.stripMargin,
+        endOffset =
+          """
+            |[{
+            |  "shard": {
+            |    "stream": "stream1",
+            |    "shardId": "shard1"
+            |  },
+            |  "firstSeqNum": null,
+            |  "lastSeqNum": "456",
+            |  "closed": false,
+            |  "msBehindLatest": null,
+            |  "lastRecordSeqNum": null
+            |}]
+          """.stripMargin,
+        latestOffset =
+          """
+            |[{
+            |  "shard": {
+            |    "stream": "stream1",
+            |    "shardId": "shard1"
+            |  },
+            |  "firstSeqNum": null,
+            |  "lastSeqNum": "789",
+            |  "closed": false,
+            |  "msBehindLatest": null,
+            |  "lastRecordSeqNum": null
+            |}]
+          """.stripMargin,
+        numInputRows = 101,
+        inputRowsPerSecond = 10.1,
+        processedRowsPerSecond = 10.1
+      )
+    ),
+    sink = SinkProgress("sink", None),
+    observedMetrics = new java.util.HashMap(Map().asJava)
+  )
+
+  val testProgress6 = new StreamingQueryProgress(
+    id = UUID.randomUUID,
+    runId = UUID.randomUUID,
+    name = "myName",
+    timestamp = "2025-09-19T00:00:00.111Z",
+    batchId = 97L,
+    batchDuration = 12L,
+    durationMs = null,
+    eventTime = null,
+    stateOperators = Array(),
+    sources = Array(new SourceProgress(
+        description = "kafka",
+        startOffset = "1000",
+        endOffset = "2000",
+        latestOffset = "3000",
+        numInputRows = 1001
+        // inputRowsPerSecond and processedRowsPerSecond should be Double.NaN
+        // and not present in the json
+      )
+    ),
+    sink = SinkProgress("sink", None),
+    observedMetrics = new java.util.HashMap(Map().asJava)
+  )
+
+  val testProgress7 = new StreamingQueryProgress(
     id = UUID.randomUUID,
     runId = UUID.randomUUID,
     name = "KafkaMetricsTest",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to