This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8f5d8d424644 [SPARK-50270][SS][PYTHON] Added custom state metrics for
TransformWithStateInPandas
8f5d8d424644 is described below
commit 8f5d8d424644a62c326b0c430bd311c834236e2b
Author: bogao007 <[email protected]>
AuthorDate: Sat Nov 9 17:22:07 2024 +0900
[SPARK-50270][SS][PYTHON] Added custom state metrics for
TransformWithStateInPandas
### What changes were proposed in this pull request?
- Added custom state metrics for TransformWithStateInPandas.
- Clean up TTL properly.
### Why are the changes needed?
Bring parity with Scala TransformWithState.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Python unit test.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #48808 from bogao007/state-metrics.
Authored-by: bogao007 <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../pandas/test_pandas_transform_with_state.py | 4 ++++
.../python/TransformWithStateInPandasExec.scala | 26 +++++++++++++++++++++-
2 files changed, 29 insertions(+), 1 deletion(-)
diff --git
a/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py
b/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py
index 46aad4b6bc60..384920f03f1a 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py
@@ -213,6 +213,10 @@ class TransformWithStateInPandasTestsMixin:
q.awaitTermination(10)
self.assertTrue(q.exception() is None)
+ # Verify custom metrics.
+
self.assertTrue(q.lastProgress.stateOperators[0].customMetrics["numValueStateVars"]
> 0)
+
self.assertTrue(q.lastProgress.stateOperators[0].customMetrics["numDeletedStateVars"]
> 0)
+
q.stop()
self._prepare_test_resource2(input_path)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala
index fda3d27e5eaa..7dd4d4647eeb 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.execution.{BinaryExecNode, CoGroupedIterator,
SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.python.PandasGroupUtils.{executePython,
groupAndProject, resolveArgOffsets}
-import org.apache.spark.sql.execution.streaming.{StatefulOperatorPartitioning,
StatefulOperatorStateInfo, StatefulProcessorHandleImpl, StateStoreWriter,
WatermarkSupport}
+import org.apache.spark.sql.execution.streaming.{StatefulOperatorCustomMetric,
StatefulOperatorCustomSumMetric, StatefulOperatorPartitioning,
StatefulOperatorStateInfo, StatefulProcessorHandleImpl, StateStoreWriter,
WatermarkSupport}
import
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.StateStoreAwareZipPartitionsHelper
import
org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec,
StateSchemaValidationResult, StateStore, StateStoreConf, StateStoreId,
StateStoreOps, StateStoreProviderId}
import org.apache.spark.sql.streaming.{OutputMode, TimeMode}
@@ -126,6 +126,29 @@ case class TransformWithStateInPandasExec(
List.empty
}
+ override def customStatefulOperatorMetrics:
Seq[StatefulOperatorCustomMetric] = {
+ Seq(
+ // metrics around state variables
+ StatefulOperatorCustomSumMetric("numValueStateVars", "Number of value
state variables"),
+ StatefulOperatorCustomSumMetric("numListStateVars", "Number of list
state variables"),
+ StatefulOperatorCustomSumMetric("numMapStateVars", "Number of map state
variables"),
+ StatefulOperatorCustomSumMetric("numDeletedStateVars", "Number of
deleted state variables"),
+ // metrics around timers
+ StatefulOperatorCustomSumMetric("numRegisteredTimers", "Number of
registered timers"),
+ StatefulOperatorCustomSumMetric("numDeletedTimers", "Number of deleted
timers"),
+ StatefulOperatorCustomSumMetric("numExpiredTimers", "Number of expired
timers"),
+ // metrics around TTL
+ StatefulOperatorCustomSumMetric("numValueStateWithTTLVars",
+ "Number of value state variables with TTL"),
+ StatefulOperatorCustomSumMetric("numListStateWithTTLVars",
+ "Number of list state variables with TTL"),
+ StatefulOperatorCustomSumMetric("numMapStateWithTTLVars",
+ "Number of map state variables with TTL"),
+ StatefulOperatorCustomSumMetric("numValuesRemovedDueToTTLExpiry",
+ "Number of values removed due to TTL expiry")
+ )
+ }
+
/**
* Produces the result of the query as an `RDD[InternalRow]`
*/
@@ -247,6 +270,7 @@ case class TransformWithStateInPandasExec(
// by the upstream (consumer) operators in addition to the processing in
this operator.
allUpdatesTimeMs += NANOSECONDS.toMillis(System.nanoTime -
updatesStartTimeNs)
commitTimeMs += timeTakenMs {
+ processorHandle.doTtlCleanup()
store.commit()
}
setStoreMetrics(store)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]