[
https://issues.apache.org/jira/browse/SPARK-35896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jungtaek Lim resolved SPARK-35896.
----------------------------------
Fix Version/s: 3.2.0
Resolution: Fixed
Issue resolved by pull request 33091
[https://github.com/apache/spark/pull/33091]
> [SS] Include more granular metrics for stateful operators in
> StreamingQueryProgress
> -----------------------------------------------------------------------------------
>
> Key: SPARK-35896
> URL: https://issues.apache.org/jira/browse/SPARK-35896
> Project: Spark
> Issue Type: Improvement
> Components: Structured Streaming
> Affects Versions: 3.1.2
> Reporter: Venki Korukanti
> Assignee: Venki Korukanti
> Priority: Major
> Fix For: 3.2.0
>
>
> Currently the streaming progress is missing a few important stateful operator
> metrics in {{StateOperatorProgress}}. Each stateful operator consists of
> multiple steps. Ex: {{flatMapGroupsWithState}} has two major steps: 1)
> processing the input and 2) timeout processing to remove entries from the
> state which have expired. The main motivation is to track down the time it
> took for each individual step (such as timeout processing, watermark
> processing etc) and how much data is processed to pinpoint the bottlenecks
> and compare for reasoning why some microbatches are slow compared to others
> in the same job.
> Below are the final metrics common to all stateful operators (the one in
> _*bold-italic*_ are proposed new). These metrics are in
> {{StateOperatorProgress}} which is part of {{StreamingQueryProgress}}.
> * _*operatorName*_ - State operator name. Can help us identify any operator
> specific slowness and state store usage patterns. Ex. "dedupe" (derived using
> {{StateStoreWriter.shortName}})
> * _numRowsTotal_ - number of rows in the state store across all tasks in a
> stage where the operator has executed.
> * _numRowsUpdated_ - number of rows added to or update in the store
> * _*allUpdatesTimeMs*_ - time taken to add new rows or update existing state
> store rows across all tasks in a stage where the operator has executed.
> * _*numRowsRemoved*_ - number of rows deleted from state store as part of
> the state cleanup mechanism across all tasks in a stage where the operator
> has executed. This number helps measure the state store deletions and impact
> on checkpoint commit and other latencies.
> * _*allRemovalsTimeMs*_ - time taken to remove the rows from the state store
> as part of state (also includes the iterating through the entire state store
> to find which rows to delete) across all tasks in a stage where the operator
> has executed. If we see jobs spending significant time here, it may justify a
> better layout in the state store to read only the required rows than the
> entire state store that is read currently.
> * _*commitTimeMs*_ - time taken to commit the state store changes to
> external storage for checkpointing. This is cumulative across all tasks in a
> stage where this operator has executed.
> * _*numShufflePartitions*_ - number of shuffle partitions this state
> operator is part of. Currently the metrics like times are aggregated across
> all tasks in a stage where the operator has executed. Having the number
> shuffle partitions (corresponds to number of tasks) helps us find the average
> task contribution to the metric.
> * _*numStateStores*_ - number of state stores in the operator across all
> tasks in the stage. Some stateful operators have more than one state store
> (eg. stream-stream join). Tracking this number helps us find correlations
> between state stores instances and microbatch latency.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]