[ 
https://issues.apache.org/jira/browse/SPARK-38204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-38204:
------------------------------------

    Assignee: Apache Spark

> All state operators are at a risk of inconsistency between state partitioning 
> and operator partitioning
> -------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-38204
>                 URL: https://issues.apache.org/jira/browse/SPARK-38204
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.2.3, 2.3.4, 2.4.8, 3.0.3, 3.1.2, 3.2.1, 3.3.0
>            Reporter: Jungtaek Lim
>            Assignee: Apache Spark
>            Priority: Blocker
>              Labels: correctness
>
> Except stream-stream join, all stateful operators use ClusteredDistribution 
> as a requirement of child distribution.
> ClusteredDistribution is very relaxed one - any output partitioning can 
> satisfy the distribution if the partitioning can ensure all tuples having 
> same grouping keys are placed in same partition.
> To illustrate an example, support we do streaming aggregation like below code:
> {code:java}
> df
>   .withWatermark("timestamp", "30 minutes")
>   .groupBy("group1", "group2", window("timestamp", "10 minutes"))
>   .agg(count("*")) {code}
> In the code, streaming aggregation operator will be involved in physical 
> plan, which would have ClusteredDistribution("group1", "group2", "window").
> The problem is, various output partitionings can satisfy this distribution:
>  * RangePartitioning
>  ** This accepts exact and subset of the grouping key, with any order of keys 
> (combination), with any sort order (asc/desc)
>  * HashPartitioning
>  ** This accepts exact and subset of the grouping key, with any order of keys 
> (combination)
>  * (upcoming Spark 3.3.0+) DataSourcePartitioning
>  ** output partitioning provided by data source will be able to satisfy 
> ClusteredDistribution, which will make things worse (assuming data source can 
> provide different output partitioning relatively easier)
> e.g. even we only consider HashPartitioning, HashPartitioning("group1"), 
> HashPartitioning("group2"), HashPartitioning("group1", "group2"), 
> HashPartitioning("group2", "group1"), HashPartitioning("group1", "group2", 
> "window"), etc.
> The requirement of state partitioning is much more strict, since we should 
> not change the partitioning once it is partitioned and built. *It should 
> ensure that all tuples having same grouping keys are placed in same partition 
> (same partition ID) across query lifetime.*
> *The impedance of distribution requirement between ClusteredDistribution and 
> state partitioning leads correctness issue silently.*
> For example, let's assume we have a streaming query like below:
> {code:java}
> df
>   .withWatermark("timestamp", "30 minutes")
>   .repartition("group2")
>   .groupBy("group1", "group2", window("timestamp", "10 minutes"))
>   .agg(count("*")) {code}
> repartition("group2") satisfies ClusteredDistribution("group1", "group2", 
> "window"), so Spark won't introduce additional shuffle there, and state 
> partitioning would be HashPartitioning("group2").
> we run this query for a while, and stop the query, and change the manual 
> partitioning like below:
> {code:java}
> df
>   .withWatermark("timestamp", "30 minutes")
>   .repartition("group1")
>   .groupBy("group1", "group2", window("timestamp", "10 minutes"))
>   .agg(count("*")) {code}
> repartition("group1") also satisfies ClusteredDistribution("group1", 
> "group2", "window"), so Spark won't introduce additional shuffle there. That 
> said, child output partitioning of streaming aggregation operator would be 
> HashPartitioning("group1"), whereas state partitioning is 
> HashPartitioning("group2").
> [https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovery-semantics-after-changes-in-a-streaming-query]
> In SS guide doc we enumerate the unsupported modifications of the query 
> during the lifetime of streaming query, but there is no notion of this.
> Making this worse, Spark doesn't store any information on state partitioning 
> (that said, there is no way to validate), so *Spark simply allows this change 
> and brings up correctness issue while the streaming query runs like no 
> problem at all.* The only way to indicate the correctness is from the result 
> of the query.
> We have no idea whether end users already suffer from this in their queries 
> or not. *The only way to look into is to list up all state rows and apply 
> hash function with expected grouping keys, and confirm all rows provide the 
> exact partition ID where they are in.* If it turns out as broken, we will 
> have to have a tool to “re”partition the state correctly, or in worst case, 
> have to ask throwing out checkpoint and reprocess.
> {*}This issue has been laid from the introduction of stateful operators 
> (Spark 2.2+){*}, since HashClusteredDistribution (strict requirement) had 
> introduced in Spark 2.3 and we didn't change stateful operators to use this 
> distribution. stream-stream join hopefully used HashClusteredDistribution 
> from Spark 2.3, so it seems to be safe.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to