This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d3119fac0a09 [SPARK-50378][SS] Add custom metric for tracking spent 
for proc initial state in transformWithState
d3119fac0a09 is described below

commit d3119fac0a09a2c6290762c9ba573378ccf30dfc
Author: Anish Shrigondekar <[email protected]>
AuthorDate: Fri Nov 22 12:05:22 2024 +0900

    [SPARK-50378][SS] Add custom metric for tracking spent for proc initial 
state in transformWithState
    
    ### What changes were proposed in this pull request?
    Add custom metric for tracking spent for proc initial state in 
transformWithState
    
    ### Why are the changes needed?
    Adds tracking for time spent in populating initial state
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added unit tests
    
    ```
    [info] Run completed in 2 minutes, 38 seconds.
    [info] Total number of tests run: 22
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 22, failed 0, canceled 0, ignored 0, pending 0
    [info] All tests passed.
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #48913 from anishshri-db/task/SPARK-50378.
    
    Authored-by: Anish Shrigondekar <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../spark/sql/execution/streaming/TransformWithStateExec.scala    | 6 ++++++
 .../spark/sql/streaming/TransformWithStateInitialStateSuite.scala | 8 ++++++++
 2 files changed, 14 insertions(+)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
index 2b26d18019d1..107f98b09f85 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
@@ -410,6 +410,9 @@ case class TransformWithStateExec(
   // operator specific metrics
   override def customStatefulOperatorMetrics: 
Seq[StatefulOperatorCustomMetric] = {
     Seq(
+      // metrics around initial state
+      StatefulOperatorCustomSumMetric("initialStateProcessingTimeMs",
+        "Number of milliseconds taken to process all initial state"),
       // metrics around state variables
       StatefulOperatorCustomSumMetric("numValueStateVars", "Number of value 
state variables"),
       StatefulOperatorCustomSumMetric("numListStateVars", "Number of list 
state variables"),
@@ -655,6 +658,8 @@ case class TransformWithStateExec(
     statefulProcessor.init(outputMode, timeMode)
     processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
 
+    val initialStateProcTimeMs = longMetric("initialStateProcessingTimeMs")
+    val initialStateStartTimeNs = System.nanoTime
     // Check if is first batch
     // Only process initial states for first batch
     if (processorHandle.getQueryInfo().getBatchId == 0) {
@@ -667,6 +672,7 @@ case class TransformWithStateExec(
           processInitialStateRows(keyRow.asInstanceOf[UnsafeRow], valueRowIter)
       }
     }
+    initialStateProcTimeMs += NANOSECONDS.toMillis(System.nanoTime - 
initialStateStartTimeNs)
 
     processDataWithPartition(childDataIterator, store, processorHandle)
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala
index 360656a76f35..806d2f19f6f5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala
@@ -395,6 +395,10 @@ class TransformWithStateInitialStateSuite extends 
StateStoreMetricsTest
         AddData(inputData, InitInputRow("k2", "update", 40.0)),
         AddData(inputData, InitInputRow("non-exist", "getOption", -1.0)),
         CheckNewAnswer(("non-exist", "getOption", -1.0)),
+        Execute { q =>
+          assert(q.lastProgress
+            
.stateOperators(0).customMetrics.get("initialStateProcessingTimeMs") > 0)
+        },
         AddData(inputData, InitInputRow("k1", "appendList", 37.0)),
         AddData(inputData, InitInputRow("k2", "appendList", 40.0)),
         AddData(inputData, InitInputRow("non-exist", "getList", -1.0)),
@@ -514,6 +518,10 @@ class TransformWithStateInitialStateSuite extends 
StateStoreMetricsTest
         AdvanceManualClock(1 * 1000),
         // registered timer for "a" and "b" is 6000, first batch is processed 
at ts = 1000
         CheckNewAnswer(("c", "1")),
+        Execute { q =>
+          assert(q.lastProgress
+            
.stateOperators(0).customMetrics.get("initialStateProcessingTimeMs") > 0)
+        },
 
         AddData(inputData, "c"),
         AdvanceManualClock(6 * 1000), // ts = 7000, "a" expires


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to