This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f5d8d424644 [SPARK-50270][SS][PYTHON] Added custom state metrics for 
TransformWithStateInPandas
8f5d8d424644 is described below

commit 8f5d8d424644a62c326b0c430bd311c834236e2b
Author: bogao007 <[email protected]>
AuthorDate: Sat Nov 9 17:22:07 2024 +0900

    [SPARK-50270][SS][PYTHON] Added custom state metrics for 
TransformWithStateInPandas
    
    ### What changes were proposed in this pull request?
    
    - Added custom state metrics for TransformWithStateInPandas.
    - Clean up TTL properly.
    
    ### Why are the changes needed?
    
    Bring parity with Scala TransformWithState.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Python unit test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #48808 from bogao007/state-metrics.
    
    Authored-by: bogao007 <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../pandas/test_pandas_transform_with_state.py     |  4 ++++
 .../python/TransformWithStateInPandasExec.scala    | 26 +++++++++++++++++++++-
 2 files changed, 29 insertions(+), 1 deletion(-)

diff --git 
a/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py 
b/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py
index 46aad4b6bc60..384920f03f1a 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py
@@ -213,6 +213,10 @@ class TransformWithStateInPandasTestsMixin:
         q.awaitTermination(10)
         self.assertTrue(q.exception() is None)
 
+        # Verify custom metrics.
+        
self.assertTrue(q.lastProgress.stateOperators[0].customMetrics["numValueStateVars"]
 > 0)
+        
self.assertTrue(q.lastProgress.stateOperators[0].customMetrics["numDeletedStateVars"]
 > 0)
+
         q.stop()
 
         self._prepare_test_resource2(input_path)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala
index fda3d27e5eaa..7dd4d4647eeb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.types.DataTypeUtils
 import org.apache.spark.sql.execution.{BinaryExecNode, CoGroupedIterator, 
SparkPlan}
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.execution.python.PandasGroupUtils.{executePython, 
groupAndProject, resolveArgOffsets}
-import org.apache.spark.sql.execution.streaming.{StatefulOperatorPartitioning, 
StatefulOperatorStateInfo, StatefulProcessorHandleImpl, StateStoreWriter, 
WatermarkSupport}
+import org.apache.spark.sql.execution.streaming.{StatefulOperatorCustomMetric, 
StatefulOperatorCustomSumMetric, StatefulOperatorPartitioning, 
StatefulOperatorStateInfo, StatefulProcessorHandleImpl, StateStoreWriter, 
WatermarkSupport}
 import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.StateStoreAwareZipPartitionsHelper
 import 
org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, 
StateSchemaValidationResult, StateStore, StateStoreConf, StateStoreId, 
StateStoreOps, StateStoreProviderId}
 import org.apache.spark.sql.streaming.{OutputMode, TimeMode}
@@ -126,6 +126,29 @@ case class TransformWithStateInPandasExec(
     List.empty
   }
 
+  override def customStatefulOperatorMetrics: 
Seq[StatefulOperatorCustomMetric] = {
+    Seq(
+      // metrics around state variables
+      StatefulOperatorCustomSumMetric("numValueStateVars", "Number of value 
state variables"),
+      StatefulOperatorCustomSumMetric("numListStateVars", "Number of list 
state variables"),
+      StatefulOperatorCustomSumMetric("numMapStateVars", "Number of map state 
variables"),
+      StatefulOperatorCustomSumMetric("numDeletedStateVars", "Number of 
deleted state variables"),
+      // metrics around timers
+      StatefulOperatorCustomSumMetric("numRegisteredTimers", "Number of 
registered timers"),
+      StatefulOperatorCustomSumMetric("numDeletedTimers", "Number of deleted 
timers"),
+      StatefulOperatorCustomSumMetric("numExpiredTimers", "Number of expired 
timers"),
+      // metrics around TTL
+      StatefulOperatorCustomSumMetric("numValueStateWithTTLVars",
+        "Number of value state variables with TTL"),
+      StatefulOperatorCustomSumMetric("numListStateWithTTLVars",
+        "Number of list state variables with TTL"),
+      StatefulOperatorCustomSumMetric("numMapStateWithTTLVars",
+        "Number of map state variables with TTL"),
+      StatefulOperatorCustomSumMetric("numValuesRemovedDueToTTLExpiry",
+        "Number of values removed due to TTL expiry")
+    )
+  }
+
   /**
    * Produces the result of the query as an `RDD[InternalRow]`
    */
@@ -247,6 +270,7 @@ case class TransformWithStateInPandasExec(
       // by the upstream (consumer) operators in addition to the processing in 
this operator.
       allUpdatesTimeMs += NANOSECONDS.toMillis(System.nanoTime - 
updatesStartTimeNs)
       commitTimeMs += timeTakenMs {
+        processorHandle.doTtlCleanup()
         store.commit()
       }
       setStoreMetrics(store)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to