This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 72fc87b46734 [SPARK-53690][SS] Fix exponential formatting of
avgOffsetsBehindLatest in Kafka sources object in progress json
72fc87b46734 is described below
commit 72fc87b46734d41e5174debf72978d0a22196b19
Author: jayant.sharma <[email protected]>
AuthorDate: Sat Oct 11 10:12:21 2025 +0900
[SPARK-53690][SS] Fix exponential formatting of avgOffsetsBehindLatest in
Kafka sources object in progress json
### What changes were proposed in this pull request?
This PR fixes an issue where `avgOffsetsBehindLatest` metric of Kafka
sources object from streaming progress metrics JSON were displayed in
scientific notation (e.g., 2.70941269E8). The fix uses safe Decimal casting to
ensure values are displayed in a more human-readable format.
Before change:
```
{
"id" : "19815c70-c0f1-4e89-8360-2ef444a35b76",
"runId" : "6c243067-fff6-47ef-99d8-41db0e278949",
"name" : "KafkaMetricsTest",
"timestamp" : "2025-09-23T06:00:00.000Z",
"batchId" : 1250,
"batchDuration" : 111255,
"numInputRows" : 800000,
"inputRowsPerSecond" : 75291.2831516931
"processedRowsPerSecond" : 71906.88058963642,
"durationMs" : {
"addBatch" : 110481,
"commitBatch" : 410,
"commitOffsets" : 107,
"getBatch" : 0,
"latestOffset" : 2,
"queryPlanning" : 179,
"triggerExecution" : 111255,
"walCommit" : 74
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[bigdata_omsTrade_cdc]]",
"startOffset" : {
"bigdata_omsTrade_cdc" : {
"0" : 18424809459
}
},
"endOffset" : {
"bigdata_omsTrade_cdc" : {
"0" : 18432809459
}
},
"latestOffset" : {
"bigdata_omsTrade_cdc" : {
"0" : 18703750728
}
},
"numInputRows" : 800000,
"inputRowsPerSecond" : 75291.2831516931,
"processedRowsPerSecond" : 71906.88058963642,
"metrics" : {
"avgOffsetsBehindLatest" : "2.70941269E8",
"maxOffsetsBehindLatest" : "270941269",
"minOffsetsBehindLatest" : "270941269"
}
} ],
"sink" : {
"description" :
"DeltaSink[s3://<masked-storage/__unitystorage/schemas/75bc9f38-9af3-4af9-852b-7077489f93d3/tables/c963dc05-f685-4fe8-a744-500cb40ce28a]",
"numOutputRows" : -1
}
}
```
After change:
```
{
"id" : "19815c70-c0f1-4e89-8360-2ef444a35b76",
"runId" : "6c243067-fff6-47ef-99d8-41db0e278949",
"name" : "KafkaMetricsTest",
"timestamp" : "2025-09-23T06:00:00.000Z",
"batchId" : 1250,
"batchDuration" : 111255,
"numInputRows" : 800000,
"inputRowsPerSecond" : 75291.3,
"processedRowsPerSecond" : 71906.9,
"durationMs" : {
"addBatch" : 110481,
"commitBatch" : 410,
"commitOffsets" : 107,
"getBatch" : 0,
"latestOffset" : 2,
"queryPlanning" : 179,
"triggerExecution" : 111255,
"walCommit" : 74
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[bigdata_omsTrade_cdc]]",
"startOffset" : {
"bigdata_omsTrade_cdc" : {
"0" : 18424809459
}
},
"endOffset" : {
"bigdata_omsTrade_cdc" : {
"0" : 18432809459
}
},
"latestOffset" : {
"bigdata_omsTrade_cdc" : {
"0" : 18703750728
}
},
"numInputRows" : 800000,
"inputRowsPerSecond" : 75291.3,
"processedRowsPerSecond" : 71906.9,
"metrics" : {
"avgOffsetsBehindLatest" : "270941269.0",
"maxOffsetsBehindLatest" : "270941269",
"minOffsetsBehindLatest" : "270941269"
}
} ],
"sink" : {
"description" :
"DeltaSink[s3://<masked-storage/__unitystorage/schemas/75bc9f38-9af3-4af9-852b-7077489f93d3/tables/c963dc05-f685-4fe8-a744-500cb40ce28a]",
"numOutputRows" : -1
}
}
```
### Why are the changes needed?
Current formatting is not user-friendly. A user can easily interpret
`2.70941269E8` as `2.7` instead of `270,941,269`, as E can be missed to be
spotted. This fix will improve the readability of Spark Structured Streaming
progress metrics JSON.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Run this Maven test:
```
./build/mvn -pl sql/core,sql/api \
-am test \
-DwildcardSuites=org.apache.spark.sql.streaming.StreamingQueryStatusAndProgressSuite
\
-DwildcardTestName="SPARK-53690"
```
Results:
```
Run completed in 8 seconds, 917 milliseconds.
Total number of tests run: 13
Suites: completed 2, aborted 0
Tests: succeeded 13, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
[INFO]
------------------------------------------------------------------------
[INFO] Reactor Summary for Spark Project Parent POM 4.1.0-SNAPSHOT:
[INFO]
[INFO] Spark Project Parent POM ........................... SUCCESS [
1.134 s]
[INFO] Spark Project Tags ................................. SUCCESS [
1.429 s]
[INFO] Spark Project Sketch ............................... SUCCESS [
1.427 s]
[INFO] Spark Project Common Java Utils .................... SUCCESS [
1.689 s]
[INFO] Spark Project Common Utils ......................... SUCCESS [
3.001 s]
[INFO] Spark Project Local DB ............................. SUCCESS [
4.117 s]
[INFO] Spark Project Networking ........................... SUCCESS [
52.545 s]
[INFO] Spark Project Shuffle Streaming Service ............ SUCCESS [
5.946 s]
[INFO] Spark Project Variant .............................. SUCCESS [
0.956 s]
[INFO] Spark Project Unsafe ............................... SUCCESS [
3.161 s]
[INFO] Spark Project Connect Shims ........................ SUCCESS [
0.750 s]
[INFO] Spark Project Launcher ............................. SUCCESS [
3.660 s]
[INFO] Spark Project Core ................................. SUCCESS [
27.959 s]
[INFO] Spark Project SQL API .............................. SUCCESS [
1.985 s]
[INFO] Spark Project Catalyst ............................. SUCCESS [
6.554 s]
[INFO] Spark Project SQL .................................. SUCCESS [
42.743 s]
[INFO]
------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO]
------------------------------------------------------------------------
[INFO] Total time: 02:39 min
[INFO] Finished at: 2025-10-06T10:11:39+05:30
[INFO]
------------------------------------------------------------------------
```
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #52445 from jayantdb/SPARK-53690-fix-formatting.
Authored-by: jayant.sharma <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../org/apache/spark/sql/streaming/progress.scala | 48 ++++++--
.../StreamingQueryStatusAndProgressSuite.scala | 123 +++++++++++++++++++++
2 files changed, 161 insertions(+), 10 deletions(-)
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index 1c6be6c2b1f0..2f5b9475a750 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -186,12 +186,12 @@ class StreamingQueryProgress private[spark] (
("numInputRows" -> JInt(numInputRows)) ~
("inputRowsPerSecond" -> safeDecimalToJValue(inputRowsPerSecond)) ~
("processedRowsPerSecond" ->
safeDecimalToJValue(processedRowsPerSecond)) ~
- ("durationMs" -> safeMapToJValue[JLong](durationMs, v =>
JInt(v.toLong))) ~
- ("eventTime" -> safeMapToJValue[String](eventTime, s => JString(s))) ~
+ ("durationMs" -> safeMapToJValue[JLong](durationMs, (_, v) =>
JInt(v.toLong))) ~
+ ("eventTime" -> safeMapToJValue[String](eventTime, (_, s) =>
JString(s))) ~
("stateOperators" -> JArray(stateOperators.map(_.jsonValue).toList)) ~
("sources" -> JArray(sources.map(_.jsonValue).toList)) ~
("sink" -> sink.jsonValue) ~
- ("observedMetrics" -> safeMapToJValue[Row](observedMetrics, row =>
row.jsonValue))
+ ("observedMetrics" -> safeMapToJValue[Row](observedMetrics, (_, row) =>
row.jsonValue))
}
}
@@ -258,13 +258,26 @@ class SourceProgress protected[spark] (
("numInputRows" -> JInt(numInputRows)) ~
("inputRowsPerSecond" -> safeDecimalToJValue(inputRowsPerSecond)) ~
("processedRowsPerSecond" ->
safeDecimalToJValue(processedRowsPerSecond)) ~
- ("metrics" -> safeMapToJValue[String](metrics, s => JString(s)))
+ ("metrics" -> safeMapToJValue[String](
+ metrics,
+ (metricsName, s) =>
+ // SPARK-53690:
+ // Convert the metric value to a formatted decimal string to avoid
exponentials
+ // This ensures that large numbers are represented as fixed-point
decimals
+ // instead of scientific notation to improving readability.
+ // Any metrics which is generated as double needs to be added in if
condition
+ // for converting it to fixed-point decimals in representation.
+ JString(if (metricsName == "avgOffsetsBehindLatest") {
+ BigDecimal(s).setScale(1, RoundingMode.HALF_UP).toString
+ } else {
+ s
+ })))
}
- private def tryParse(json: String) = try {
+ private def tryParse(json: String): JValue = try {
parse(json)
} catch {
- case NonFatal(e) => JString(json)
+ case NonFatal(_) => JString(json)
}
}
@@ -302,7 +315,7 @@ class SinkProgress protected[spark] (
private[sql] def jsonValue: JValue = {
("description" -> JString(description)) ~
("numOutputRows" -> JInt(numOutputRows)) ~
- ("metrics" -> safeMapToJValue[String](metrics, s => JString(s)))
+ ("metrics" -> safeMapToJValue[String](metrics, (_, s) => JString(s)))
}
}
@@ -323,11 +336,26 @@ private object SafeJsonSerializer {
if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
}
- /** Convert map to JValue while handling empty maps. Also, this sorts the
keys. */
- def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: T => JValue):
JValue = {
+ /**
+ * Convert map to JValue while handling empty maps. Also, this sorts the
keys. Function is
+ * written as generic (T) to handle variety of types
+ * - Returns `JNothing` if the map is null or empty.
+ * - Sorts the keys alphabetically to ensure deterministic JSON output.
+ * - Converts each map entry to a JValue using the provided function,
which also receives the
+ * key.
+ * - Combines all entries into a single `JObject`.
+ *
+ * @param map
+ * A Java Map[String, T] to convert
+ * @param valueToJValue
+ * Function that takes a key and value and returns a corresponding JValue
+ * @return
+ * A JObject representing the map, or JNothing if the map is null or empty
+ */
+ def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: (String, T) =>
JValue): JValue = {
if (map == null || map.isEmpty) return JNothing
val keys = map.asScala.keySet.toSeq.sorted
- keys.map { k => k -> valueToJValue(map.get(k)): JObject }.reduce(_ ~ _)
+ keys.map { k => k -> valueToJValue(k, map.get(k)): JObject }.reduce(_ ~ _)
}
/** Convert BigDecimal to JValue while handling empty or infinite values */
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
index ccda24be297c..9c1e16460879 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
@@ -436,6 +436,89 @@ class StreamingQueryStatusAndProgressSuite extends
StreamTest with Eventually wi
processedRowsPerSecondJSON shouldBe processedRowsPerSecondExpected +-
epsilon
}
+ test("SPARK-53690: avgOffsetsBehindLatest should never be in scientific
notation") {
+ val progress = testProgress5.jsonValue
+ val progressPretty = testProgress5.prettyJson
+
+ // Actual values
+ val avgOffsetsBehindLatest: Double = 2.70941269E8
+
+ // Get values from progress metrics JSON and cast back to Double
+ // for numeric comparison
+ val metricsJSON = (progress \ "sources")(0) \ "metrics"
+ val avgOffsetsBehindLatestJSON = (metricsJSON \ "avgOffsetsBehindLatest")
+ .values.toString
+
+ // Get expected values after type casting
+ val avgOffsetsBehindLatestExpected = BigDecimal(avgOffsetsBehindLatest)
+ .setScale(1, RoundingMode.HALF_UP)
+
+ // This should fail if avgOffsetsBehindLatest contains E notation
+ avgOffsetsBehindLatestJSON should not include "E"
+
+ // Value in progress metrics should be equal to the Decimal conversion of
the same
+ // Using epsilon to compare floating-point values
+ val epsilon = 1e-6
+ avgOffsetsBehindLatestJSON.toDouble shouldBe
avgOffsetsBehindLatestExpected.toDouble +- epsilon
+
+ // Validating that the pretty JSON of metrics reported is same as defined
+ progressPretty shouldBe
+ s"""
+ |{
+ | "id" : "${testProgress5.id.toString}",
+ | "runId" : "${testProgress5.runId.toString}",
+ | "name" : "KafkaMetricsTest",
+ | "timestamp" : "2025-09-23T06:00:00.000Z",
+ | "batchId" : 1250,
+ | "batchDuration" : 111255,
+ | "numInputRows" : 800000,
+ | "inputRowsPerSecond" : 75291.3,
+ | "processedRowsPerSecond" : 71906.9,
+ | "durationMs" : {
+ | "addBatch" : 110481,
+ | "commitBatch" : 410,
+ | "commitOffsets" : 107,
+ | "getBatch" : 0,
+ | "latestOffset" : 2,
+ | "queryPlanning" : 179,
+ | "triggerExecution" : 111255,
+ | "walCommit" : 74
+ | },
+ | "stateOperators" : [ ],
+ | "sources" : [ {
+ | "description" : "KafkaV2[Subscribe[bigdata_omsTrade_cdc]]",
+ | "startOffset" : {
+ | "bigdata_omsTrade_cdc" : {
+ | "0" : 18424809459
+ | }
+ | },
+ | "endOffset" : {
+ | "bigdata_omsTrade_cdc" : {
+ | "0" : 18432809459
+ | }
+ | },
+ | "latestOffset" : {
+ | "bigdata_omsTrade_cdc" : {
+ | "0" : 18703750728
+ | }
+ | },
+ | "numInputRows" : 800000,
+ | "inputRowsPerSecond" : 75291.3,
+ | "processedRowsPerSecond" : 71906.9,
+ | "metrics" : {
+ | "avgOffsetsBehindLatest" : "270941269.0",
+ | "maxOffsetsBehindLatest" : "270941269",
+ | "minOffsetsBehindLatest" : "270941269"
+ | }
+ | } ],
+ | "sink" : {
+ | "description" : "DeltaSink[s3://<masked-storage>/delta-source]",
+ | "numOutputRows" : -1
+ | }
+ |}
+ """.stripMargin.trim
+ }
+
def waitUntilBatchProcessed: AssertOnQuery = Execute { q =>
eventually(Timeout(streamingTimeout)) {
if (q.exception.isEmpty) {
@@ -596,6 +679,46 @@ object StreamingQueryStatusAndProgressSuite {
"event2" -> row(schema2, 1L, "hello", "world")).asJava)
)
+ val testProgress5 = new StreamingQueryProgress(
+ id = UUID.randomUUID,
+ runId = UUID.randomUUID,
+ name = "KafkaMetricsTest",
+ timestamp = "2025-09-23T06:00:00.000Z",
+ batchId = 1250,
+ batchDuration = 111255,
+ durationMs = new java.util.HashMap(
+ Map(
+ "addBatch" -> 110481,
+ "commitBatch" -> 410,
+ "commitOffsets" -> 107,
+ "getBatch" -> 0,
+ "latestOffset" -> 2,
+ "queryPlanning" -> 179,
+ "triggerExecution" -> 111255,
+ "walCommit" -> 74
+ ).transform((_, v) => long2Long(v)).asJava
+ ),
+ eventTime = new java.util.HashMap(),
+ stateOperators = Array.empty,
+ sources = Array(
+ new SourceProgress(
+ description = "KafkaV2[Subscribe[bigdata_omsTrade_cdc]]",
+ startOffset = "{\n \"bigdata_omsTrade_cdc\" : {\n \"0\" :
18424809459\n }\n}",
+ endOffset = "{\n \"bigdata_omsTrade_cdc\" : {\n \"0\" :
18432809459\n }\n}",
+ latestOffset = "{\n \"bigdata_omsTrade_cdc\" : {\n \"0\" :
18703750728\n }\n}",
+ numInputRows = 800000L,
+ inputRowsPerSecond = 75291.2831516931,
+ processedRowsPerSecond = 71906.88058963642,
+ metrics = new java.util.HashMap(Map(
+ "avgOffsetsBehindLatest" -> "2.70941269E8",
+ "minOffsetsBehindLatest" -> "270941269",
+ "maxOffsetsBehindLatest" -> "270941269").asJava))),
+ sink = SinkProgress(
+ "DeltaSink[s3://<masked-storage>/delta-source]"
+ , None
+ ),
+ observedMetrics = new java.util.HashMap())
+
val testStatus = new StreamingQueryStatus("active", true, false)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]