[
https://issues.apache.org/jira/browse/SPARK-38204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jungtaek Lim updated SPARK-38204:
---------------------------------
Labels: correctness releasenotes (was: correctness)
> All state operators are at a risk of inconsistency between state partitioning
> and operator partitioning
> -------------------------------------------------------------------------------------------------------
>
> Key: SPARK-38204
> URL: https://issues.apache.org/jira/browse/SPARK-38204
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 2.2.3, 2.3.4, 2.4.8, 3.0.3, 3.1.2, 3.2.1, 3.3.0
> Reporter: Jungtaek Lim
> Assignee: Jungtaek Lim
> Priority: Blocker
> Labels: correctness, releasenotes
> Fix For: 3.3.0
>
>
> Except stream-stream join, all stateful operators use ClusteredDistribution
> as a requirement of child distribution.
> ClusteredDistribution is very relaxed one - any output partitioning can
> satisfy the distribution if the partitioning can ensure all tuples having
> same grouping keys are placed in same partition.
> To illustrate an example, support we do streaming aggregation like below code:
> {code:java}
> df
> .withWatermark("timestamp", "30 minutes")
> .groupBy("group1", "group2", window("timestamp", "10 minutes"))
> .agg(count("*")) {code}
> In the code, streaming aggregation operator will be involved in physical
> plan, which would have ClusteredDistribution("group1", "group2", "window").
> The problem is, various output partitionings can satisfy this distribution:
> * RangePartitioning
> ** This accepts exact and subset of the grouping key, with any order of keys
> (combination), with any sort order (asc/desc)
> * HashPartitioning
> ** This accepts exact and subset of the grouping key, with any order of keys
> (combination)
> * (upcoming Spark 3.3.0+) DataSourcePartitioning
> ** output partitioning provided by data source will be able to satisfy
> ClusteredDistribution, which will make things worse (assuming data source can
> provide different output partitioning relatively easier)
> e.g. even we only consider HashPartitioning, HashPartitioning("group1"),
> HashPartitioning("group2"), HashPartitioning("group1", "group2"),
> HashPartitioning("group2", "group1"), HashPartitioning("group1", "group2",
> "window"), etc.
> The requirement of state partitioning is much more strict, since we should
> not change the partitioning once it is partitioned and built. *It should
> ensure that all tuples having same grouping keys are placed in same partition
> (same partition ID) across query lifetime.*
> *The impedance of distribution requirement between ClusteredDistribution and
> state partitioning leads correctness issue silently.*
> For example, let's assume we have a streaming query like below:
> {code:java}
> df
> .withWatermark("timestamp", "30 minutes")
> .repartition("group2")
> .groupBy("group1", "group2", window("timestamp", "10 minutes"))
> .agg(count("*")) {code}
> repartition("group2") satisfies ClusteredDistribution("group1", "group2",
> "window"), so Spark won't introduce additional shuffle there, and state
> partitioning would be HashPartitioning("group2").
> we run this query for a while, and stop the query, and change the manual
> partitioning like below:
> {code:java}
> df
> .withWatermark("timestamp", "30 minutes")
> .repartition("group1")
> .groupBy("group1", "group2", window("timestamp", "10 minutes"))
> .agg(count("*")) {code}
> repartition("group1") also satisfies ClusteredDistribution("group1",
> "group2", "window"), so Spark won't introduce additional shuffle there. That
> said, child output partitioning of streaming aggregation operator would be
> HashPartitioning("group1"), whereas state partitioning is
> HashPartitioning("group2").
> [https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovery-semantics-after-changes-in-a-streaming-query]
> In SS guide doc we enumerate the unsupported modifications of the query
> during the lifetime of streaming query, but there is no notion of this.
> Making this worse, Spark doesn't store any information on state partitioning
> (that said, there is no way to validate), so *Spark simply allows this change
> and brings up correctness issue while the streaming query runs like no
> problem at all.* The only way to indicate the correctness is from the result
> of the query.
> We have no idea whether end users already suffer from this in their queries
> or not. *The only way to look into is to list up all state rows and apply
> hash function with expected grouping keys, and confirm all rows provide the
> exact partition ID where they are in.* If it turns out as broken, we will
> have to have a tool to “re”partition the state correctly, or in worst case,
> have to ask throwing out checkpoint and reprocess.
> {*}This issue has been laid from the introduction of stateful operators
> (Spark 2.2+){*}, since HashClusteredDistribution (strict requirement) had
> introduced in Spark 2.3 and we didn't change stateful operators to use this
> distribution. stream-stream join hopefully used HashClusteredDistribution
> from Spark 2.3, so it seems to be safe.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]