[
https://issues.apache.org/jira/browse/SPARK-21977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16162482#comment-16162482
]
Apache Spark commented on SPARK-21977:
--------------------------------------
User 'brkyvz' has created a pull request for this issue:
https://github.com/apache/spark/pull/19196
> SinglePartition optimizations break certain Streaming Stateful Aggregation
> requirements
> ---------------------------------------------------------------------------------------
>
> Key: SPARK-21977
> URL: https://issues.apache.org/jira/browse/SPARK-21977
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 2.2.0
> Reporter: Burak Yavuz
> Assignee: Burak Yavuz
>
> This is a bit hard to explain as there are several issues here, I'll try my
> best. Here are the requirements:
> 1. A StructuredStreaming Source that can generate empty RDDs with 0
> partitions
> 2. A StructuredStreaming query that uses the above source, performs a
> stateful aggregation (mapGroupsWithState, groupBy.count, ...), and coalesce's
> by 1
> The crux of the problem is that when a dataset has a `coalesce(1)` call, it
> receives a `SinglePartition` partitioning scheme. This scheme satisfies most
> required distributions used for aggregations such as HashAggregateExec. This
> causes a world of problems:
> Symptom 1. If the input RDD has 0 partitions, the whole lineage will receive
> 0 partitions, nothing will be executed, the state store will not create any
> delta files. When this happens, the next trigger fails, because the
> StateStore fails to load the delta file for the previous trigger
> Symptom 2. Let's say that there was data. Then in this case, if you stop
> your stream, and change `coalesce(1)` with `coalesce(2)`, then restart your
> stream, your stream will fail, because `spark.sql.shuffle.partitions - 1`
> number of StateStores will fail to find its delta files.
> To fix the issues above, we must check that the partitioning of the child of
> a `StatefulOperator` satisfies:
> If the grouping expressions are empty:
> a) AllTuple distribution
> b) Single physical partition
> If the grouping expressions are non empty:
> a) Clustered distribution
> b) spark.sql.shuffle.partition # of partitions
> whether or not coalesce(1) exists in the plan, and whether or not the input
> RDD for the trigger has any data.
> Once you fix the above problem by adding an Exchange to the plan, you come
> across the following bug:
> If you call `coalesce(1).groupBy().count()` on a Streaming DataFrame, and if
> you have a trigger with no data, `StateStoreRestoreExec` doesn't return the
> prior state. However, for this specific aggregation, `HashAggregateExec`
> after the restore returns a (0, 0) row, since we're performing a count, and
> there is no data. Then this data gets stored in `StateStoreSaveExec` causing
> the previous counts to be overwritten and lost.
>
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]