This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 72fc87b46734 [SPARK-53690][SS] Fix exponential formatting of 
avgOffsetsBehindLatest in Kafka sources object in progress json
72fc87b46734 is described below

commit 72fc87b46734d41e5174debf72978d0a22196b19
Author: jayant.sharma <[email protected]>
AuthorDate: Sat Oct 11 10:12:21 2025 +0900

    [SPARK-53690][SS] Fix exponential formatting of avgOffsetsBehindLatest in 
Kafka sources object in progress json
    
    ### What changes were proposed in this pull request?
    This PR fixes an issue where `avgOffsetsBehindLatest` metric of Kafka 
sources object from streaming progress metrics JSON were displayed in 
scientific notation (e.g., 2.70941269E8). The fix uses safe Decimal casting to 
ensure values are displayed in a more human-readable format.
    
    Before change:
    ```
    {
      "id" : "19815c70-c0f1-4e89-8360-2ef444a35b76",
      "runId" : "6c243067-fff6-47ef-99d8-41db0e278949",
      "name" : "KafkaMetricsTest",
      "timestamp" : "2025-09-23T06:00:00.000Z",
      "batchId" : 1250,
      "batchDuration" : 111255,
      "numInputRows" : 800000,
      "inputRowsPerSecond" : 75291.2831516931
      "processedRowsPerSecond" : 71906.88058963642,
      "durationMs" : {
        "addBatch" : 110481,
        "commitBatch" : 410,
        "commitOffsets" : 107,
        "getBatch" : 0,
        "latestOffset" : 2,
        "queryPlanning" : 179,
        "triggerExecution" : 111255,
        "walCommit" : 74
      },
      "stateOperators" : [ ],
      "sources" : [ {
        "description" : "KafkaV2[Subscribe[bigdata_omsTrade_cdc]]",
        "startOffset" : {
          "bigdata_omsTrade_cdc" : {
            "0" : 18424809459
          }
        },
        "endOffset" : {
          "bigdata_omsTrade_cdc" : {
            "0" : 18432809459
          }
        },
        "latestOffset" : {
          "bigdata_omsTrade_cdc" : {
            "0" : 18703750728
          }
        },
        "numInputRows" : 800000,
        "inputRowsPerSecond" : 75291.2831516931,
        "processedRowsPerSecond" : 71906.88058963642,
        "metrics" : {
          "avgOffsetsBehindLatest" : "2.70941269E8",
          "maxOffsetsBehindLatest" : "270941269",
          "minOffsetsBehindLatest" : "270941269"
        }
      } ],
      "sink" : {
        "description" : 
"DeltaSink[s3://<masked-storage/__unitystorage/schemas/75bc9f38-9af3-4af9-852b-7077489f93d3/tables/c963dc05-f685-4fe8-a744-500cb40ce28a]",
        "numOutputRows" : -1
      }
    }
    ```
    After change:
    ```
    {
      "id" : "19815c70-c0f1-4e89-8360-2ef444a35b76",
      "runId" : "6c243067-fff6-47ef-99d8-41db0e278949",
      "name" : "KafkaMetricsTest",
      "timestamp" : "2025-09-23T06:00:00.000Z",
      "batchId" : 1250,
      "batchDuration" : 111255,
      "numInputRows" : 800000,
      "inputRowsPerSecond" : 75291.3,
      "processedRowsPerSecond" : 71906.9,
      "durationMs" : {
        "addBatch" : 110481,
        "commitBatch" : 410,
        "commitOffsets" : 107,
        "getBatch" : 0,
        "latestOffset" : 2,
        "queryPlanning" : 179,
        "triggerExecution" : 111255,
        "walCommit" : 74
      },
      "stateOperators" : [ ],
      "sources" : [ {
        "description" : "KafkaV2[Subscribe[bigdata_omsTrade_cdc]]",
        "startOffset" : {
          "bigdata_omsTrade_cdc" : {
            "0" : 18424809459
          }
        },
        "endOffset" : {
          "bigdata_omsTrade_cdc" : {
            "0" : 18432809459
          }
        },
        "latestOffset" : {
          "bigdata_omsTrade_cdc" : {
            "0" : 18703750728
          }
        },
        "numInputRows" : 800000,
        "inputRowsPerSecond" : 75291.3,
        "processedRowsPerSecond" : 71906.9,
        "metrics" : {
          "avgOffsetsBehindLatest" : "270941269.0",
          "maxOffsetsBehindLatest" : "270941269",
          "minOffsetsBehindLatest" : "270941269"
        }
      } ],
      "sink" : {
        "description" : 
"DeltaSink[s3://<masked-storage/__unitystorage/schemas/75bc9f38-9af3-4af9-852b-7077489f93d3/tables/c963dc05-f685-4fe8-a744-500cb40ce28a]",
        "numOutputRows" : -1
      }
    }
    ```
    
    ### Why are the changes needed?
    Current formatting is not user-friendly. A user can easily interpret 
`2.70941269E8` as `2.7` instead of `270,941,269`, as E can be missed to be 
spotted. This fix will improve the readability of Spark Structured Streaming 
progress metrics JSON.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Run this Maven test:
    ```
    ./build/mvn -pl sql/core,sql/api \
    -am test \
    
-DwildcardSuites=org.apache.spark.sql.streaming.StreamingQueryStatusAndProgressSuite
 \
    -DwildcardTestName="SPARK-53690"
    ```
    Results:
    ```
    Run completed in 8 seconds, 917 milliseconds.
    Total number of tests run: 13
    Suites: completed 2, aborted 0
    Tests: succeeded 13, failed 0, canceled 0, ignored 0, pending 0
    All tests passed.
    [INFO] 
------------------------------------------------------------------------
    [INFO] Reactor Summary for Spark Project Parent POM 4.1.0-SNAPSHOT:
    [INFO]
    [INFO] Spark Project Parent POM ........................... SUCCESS [  
1.134 s]
    [INFO] Spark Project Tags ................................. SUCCESS [  
1.429 s]
    [INFO] Spark Project Sketch ............................... SUCCESS [  
1.427 s]
    [INFO] Spark Project Common Java Utils .................... SUCCESS [  
1.689 s]
    [INFO] Spark Project Common Utils ......................... SUCCESS [  
3.001 s]
    [INFO] Spark Project Local DB ............................. SUCCESS [  
4.117 s]
    [INFO] Spark Project Networking ........................... SUCCESS [ 
52.545 s]
    [INFO] Spark Project Shuffle Streaming Service ............ SUCCESS [  
5.946 s]
    [INFO] Spark Project Variant .............................. SUCCESS [  
0.956 s]
    [INFO] Spark Project Unsafe ............................... SUCCESS [  
3.161 s]
    [INFO] Spark Project Connect Shims ........................ SUCCESS [  
0.750 s]
    [INFO] Spark Project Launcher ............................. SUCCESS [  
3.660 s]
    [INFO] Spark Project Core ................................. SUCCESS [ 
27.959 s]
    [INFO] Spark Project SQL API .............................. SUCCESS [  
1.985 s]
    [INFO] Spark Project Catalyst ............................. SUCCESS [  
6.554 s]
    [INFO] Spark Project SQL .................................. SUCCESS [ 
42.743 s]
    [INFO] 
------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] 
------------------------------------------------------------------------
    [INFO] Total time:  02:39 min
    [INFO] Finished at: 2025-10-06T10:11:39+05:30
    [INFO] 
------------------------------------------------------------------------
    ```
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #52445 from jayantdb/SPARK-53690-fix-formatting.
    
    Authored-by: jayant.sharma <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../org/apache/spark/sql/streaming/progress.scala  |  48 ++++++--
 .../StreamingQueryStatusAndProgressSuite.scala     | 123 +++++++++++++++++++++
 2 files changed, 161 insertions(+), 10 deletions(-)

diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index 1c6be6c2b1f0..2f5b9475a750 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -186,12 +186,12 @@ class StreamingQueryProgress private[spark] (
       ("numInputRows" -> JInt(numInputRows)) ~
       ("inputRowsPerSecond" -> safeDecimalToJValue(inputRowsPerSecond)) ~
       ("processedRowsPerSecond" -> 
safeDecimalToJValue(processedRowsPerSecond)) ~
-      ("durationMs" -> safeMapToJValue[JLong](durationMs, v => 
JInt(v.toLong))) ~
-      ("eventTime" -> safeMapToJValue[String](eventTime, s => JString(s))) ~
+      ("durationMs" -> safeMapToJValue[JLong](durationMs, (_, v) => 
JInt(v.toLong))) ~
+      ("eventTime" -> safeMapToJValue[String](eventTime, (_, s) => 
JString(s))) ~
       ("stateOperators" -> JArray(stateOperators.map(_.jsonValue).toList)) ~
       ("sources" -> JArray(sources.map(_.jsonValue).toList)) ~
       ("sink" -> sink.jsonValue) ~
-      ("observedMetrics" -> safeMapToJValue[Row](observedMetrics, row => 
row.jsonValue))
+      ("observedMetrics" -> safeMapToJValue[Row](observedMetrics, (_, row) => 
row.jsonValue))
   }
 }
 
@@ -258,13 +258,26 @@ class SourceProgress protected[spark] (
       ("numInputRows" -> JInt(numInputRows)) ~
       ("inputRowsPerSecond" -> safeDecimalToJValue(inputRowsPerSecond)) ~
       ("processedRowsPerSecond" -> 
safeDecimalToJValue(processedRowsPerSecond)) ~
-      ("metrics" -> safeMapToJValue[String](metrics, s => JString(s)))
+      ("metrics" -> safeMapToJValue[String](
+        metrics,
+        (metricsName, s) =>
+          // SPARK-53690:
+          // Convert the metric value to a formatted decimal string to avoid 
exponentials
+          // This ensures that large numbers are represented as fixed-point 
decimals
+          // instead of scientific notation to improving readability.
+          // Any metrics which is generated as double needs to be added in if 
condition
+          // for converting it to fixed-point decimals in representation.
+          JString(if (metricsName == "avgOffsetsBehindLatest") {
+            BigDecimal(s).setScale(1, RoundingMode.HALF_UP).toString
+          } else {
+            s
+          })))
   }
 
-  private def tryParse(json: String) = try {
+  private def tryParse(json: String): JValue = try {
     parse(json)
   } catch {
-    case NonFatal(e) => JString(json)
+    case NonFatal(_) => JString(json)
   }
 }
 
@@ -302,7 +315,7 @@ class SinkProgress protected[spark] (
   private[sql] def jsonValue: JValue = {
     ("description" -> JString(description)) ~
       ("numOutputRows" -> JInt(numOutputRows)) ~
-      ("metrics" -> safeMapToJValue[String](metrics, s => JString(s)))
+      ("metrics" -> safeMapToJValue[String](metrics, (_, s) => JString(s)))
   }
 }
 
@@ -323,11 +336,26 @@ private object SafeJsonSerializer {
     if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
   }
 
-  /** Convert map to JValue while handling empty maps. Also, this sorts the 
keys. */
-  def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: T => JValue): 
JValue = {
+  /**
+   * Convert map to JValue while handling empty maps. Also, this sorts the 
keys. Function is
+   * written as generic (T) to handle variety of types
+   *   - Returns `JNothing` if the map is null or empty.
+   *   - Sorts the keys alphabetically to ensure deterministic JSON output.
+   *   - Converts each map entry to a JValue using the provided function, 
which also receives the
+   *     key.
+   *   - Combines all entries into a single `JObject`.
+   *
+   * @param map
+   *   A Java Map[String, T] to convert
+   * @param valueToJValue
+   *   Function that takes a key and value and returns a corresponding JValue
+   * @return
+   *   A JObject representing the map, or JNothing if the map is null or empty
+   */
+  def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: (String, T) => 
JValue): JValue = {
     if (map == null || map.isEmpty) return JNothing
     val keys = map.asScala.keySet.toSeq.sorted
-    keys.map { k => k -> valueToJValue(map.get(k)): JObject }.reduce(_ ~ _)
+    keys.map { k => k -> valueToJValue(k, map.get(k)): JObject }.reduce(_ ~ _)
   }
 
   /** Convert BigDecimal to JValue while handling empty or infinite values */
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
index ccda24be297c..9c1e16460879 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
@@ -436,6 +436,89 @@ class StreamingQueryStatusAndProgressSuite extends 
StreamTest with Eventually wi
     processedRowsPerSecondJSON shouldBe processedRowsPerSecondExpected +- 
epsilon
   }
 
+  test("SPARK-53690: avgOffsetsBehindLatest should never be in scientific 
notation") {
+    val progress = testProgress5.jsonValue
+    val progressPretty = testProgress5.prettyJson
+
+    // Actual values
+    val avgOffsetsBehindLatest: Double = 2.70941269E8
+
+    // Get values from progress metrics JSON and cast back to Double
+    // for numeric comparison
+    val metricsJSON = (progress \ "sources")(0) \ "metrics"
+    val avgOffsetsBehindLatestJSON = (metricsJSON \ "avgOffsetsBehindLatest")
+      .values.toString
+
+    // Get expected values after type casting
+    val avgOffsetsBehindLatestExpected = BigDecimal(avgOffsetsBehindLatest)
+      .setScale(1, RoundingMode.HALF_UP)
+
+    // This should fail if avgOffsetsBehindLatest contains E notation
+    avgOffsetsBehindLatestJSON should not include "E"
+
+    // Value in progress metrics should be equal to the Decimal conversion of 
the same
+    // Using epsilon to compare floating-point values
+    val epsilon = 1e-6
+    avgOffsetsBehindLatestJSON.toDouble shouldBe 
avgOffsetsBehindLatestExpected.toDouble +- epsilon
+
+    // Validating that the pretty JSON of metrics reported is same as defined
+    progressPretty shouldBe
+      s"""
+         |{
+         |  "id" : "${testProgress5.id.toString}",
+         |  "runId" : "${testProgress5.runId.toString}",
+         |  "name" : "KafkaMetricsTest",
+         |  "timestamp" : "2025-09-23T06:00:00.000Z",
+         |  "batchId" : 1250,
+         |  "batchDuration" : 111255,
+         |  "numInputRows" : 800000,
+         |  "inputRowsPerSecond" : 75291.3,
+         |  "processedRowsPerSecond" : 71906.9,
+         |  "durationMs" : {
+         |    "addBatch" : 110481,
+         |    "commitBatch" : 410,
+         |    "commitOffsets" : 107,
+         |    "getBatch" : 0,
+         |    "latestOffset" : 2,
+         |    "queryPlanning" : 179,
+         |    "triggerExecution" : 111255,
+         |    "walCommit" : 74
+         |  },
+         |  "stateOperators" : [ ],
+         |  "sources" : [ {
+         |    "description" : "KafkaV2[Subscribe[bigdata_omsTrade_cdc]]",
+         |    "startOffset" : {
+         |      "bigdata_omsTrade_cdc" : {
+         |        "0" : 18424809459
+         |      }
+         |    },
+         |    "endOffset" : {
+         |      "bigdata_omsTrade_cdc" : {
+         |        "0" : 18432809459
+         |      }
+         |    },
+         |    "latestOffset" : {
+         |      "bigdata_omsTrade_cdc" : {
+         |        "0" : 18703750728
+         |      }
+         |    },
+         |    "numInputRows" : 800000,
+         |    "inputRowsPerSecond" : 75291.3,
+         |    "processedRowsPerSecond" : 71906.9,
+         |    "metrics" : {
+         |      "avgOffsetsBehindLatest" : "270941269.0",
+         |      "maxOffsetsBehindLatest" : "270941269",
+         |      "minOffsetsBehindLatest" : "270941269"
+         |    }
+         |  } ],
+         |  "sink" : {
+         |    "description" : "DeltaSink[s3://<masked-storage>/delta-source]",
+         |    "numOutputRows" : -1
+         |  }
+         |}
+  """.stripMargin.trim
+  }
+
   def waitUntilBatchProcessed: AssertOnQuery = Execute { q =>
     eventually(Timeout(streamingTimeout)) {
       if (q.exception.isEmpty) {
@@ -596,6 +679,46 @@ object StreamingQueryStatusAndProgressSuite {
       "event2" -> row(schema2, 1L, "hello", "world")).asJava)
   )
 
+  val testProgress5 = new StreamingQueryProgress(
+    id = UUID.randomUUID,
+    runId = UUID.randomUUID,
+    name = "KafkaMetricsTest",
+    timestamp = "2025-09-23T06:00:00.000Z",
+    batchId = 1250,
+    batchDuration = 111255,
+    durationMs = new java.util.HashMap(
+      Map(
+        "addBatch" -> 110481,
+        "commitBatch" -> 410,
+        "commitOffsets" -> 107,
+        "getBatch" -> 0,
+        "latestOffset" -> 2,
+        "queryPlanning" -> 179,
+        "triggerExecution" -> 111255,
+        "walCommit" -> 74
+      ).transform((_, v) => long2Long(v)).asJava
+    ),
+    eventTime = new java.util.HashMap(),
+    stateOperators = Array.empty,
+    sources = Array(
+      new SourceProgress(
+        description = "KafkaV2[Subscribe[bigdata_omsTrade_cdc]]",
+        startOffset = "{\n    \"bigdata_omsTrade_cdc\" : {\n        \"0\" : 
18424809459\n    }\n}",
+        endOffset = "{\n    \"bigdata_omsTrade_cdc\" : {\n        \"0\" : 
18432809459\n    }\n}",
+        latestOffset = "{\n    \"bigdata_omsTrade_cdc\" : {\n        \"0\" : 
18703750728\n    }\n}",
+        numInputRows = 800000L,
+        inputRowsPerSecond = 75291.2831516931,
+        processedRowsPerSecond = 71906.88058963642,
+        metrics = new java.util.HashMap(Map(
+          "avgOffsetsBehindLatest" -> "2.70941269E8",
+          "minOffsetsBehindLatest" -> "270941269",
+          "maxOffsetsBehindLatest" -> "270941269").asJava))),
+    sink = SinkProgress(
+      "DeltaSink[s3://<masked-storage>/delta-source]"
+      , None
+    ),
+    observedMetrics = new java.util.HashMap())
+
   val testStatus = new StreamingQueryStatus("active", true, false)
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to