[ 
https://issues.apache.org/jira/browse/SPARK-21977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Burak Yavuz updated SPARK-21977:
--------------------------------
    Description: 
This is a bit hard to explain as there are several issues here, I'll try my 
best. Here are the requirements:

  1. A StructuredStreaming Source that can generate empty RDDs with 0 partitions
  2. A StructuredStreaming query that uses the above source, performs a 
stateful aggregation (mapGroupsWithState, groupBy.count, ...), and coalesce's 
by 1

The crux of the problem is that when a dataset has a `coalesce(1)` call, it 
receives a `SinglePartition` partitioning scheme. This scheme satisfies most 
required distributions used for aggregations such as HashAggregateExec. This 
causes a world of problems:

 Symptom 1. If the input RDD has 0 partitions, the whole lineage will receive 0 
partitions, nothing will be executed, the state store will not create any delta 
files. When this happens, the next trigger fails, because the StateStore fails 
to load the delta file for the previous trigger

 Symptom 2. Let's say that there was data. Then in this case, if you stop your 
stream, and change `coalesce(1)` with `coalesce(2)`, then restart your stream, 
your stream will fail, because `spark.sql.shuffle.partitions - 1` number of 
StateStores will fail to find its delta files.

To fix the issues above, we must check that the partitioning of the child of a 
`StatefulOperator` satisfies:
  If the grouping expressions are empty:
    a) AllTuple distribution
    b) Single physical partition
  If the grouping expressions are non empty:
    a) Clustered distribution
    b) spark.sql.shuffle.partition # of partitions
whether or not coalesce(1) exists in the plan, and whether or not the input RDD 
for the trigger has any data.

Once you fix the above problem by adding an Exchange to the plan, you come 
across the following bug:

If you call `coalesce(1).groupBy().count()` on a Streaming DataFrame, and if 
you have a trigger with no data, `StateStoreRestoreExec` doesn't return the 
prior state. However, for this specific aggregation, `HashAggregateExec` after 
the restore returns a (0, 0) row, since we're performing a count, and there is 
no data. Then this data gets stored in `StateStoreSaveExec` causing the 
previous counts to be overwritten and lost.
  

  was:This is a bit hard to explain as there are several issues here


> SinglePartition optimizations break certain Streaming Stateful Aggregation 
> requirements
> ---------------------------------------------------------------------------------------
>
>                 Key: SPARK-21977
>                 URL: https://issues.apache.org/jira/browse/SPARK-21977
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.2.0
>            Reporter: Burak Yavuz
>            Assignee: Burak Yavuz
>
> This is a bit hard to explain as there are several issues here, I'll try my 
> best. Here are the requirements:
>   1. A StructuredStreaming Source that can generate empty RDDs with 0 
> partitions
>   2. A StructuredStreaming query that uses the above source, performs a 
> stateful aggregation (mapGroupsWithState, groupBy.count, ...), and coalesce's 
> by 1
> The crux of the problem is that when a dataset has a `coalesce(1)` call, it 
> receives a `SinglePartition` partitioning scheme. This scheme satisfies most 
> required distributions used for aggregations such as HashAggregateExec. This 
> causes a world of problems:
>  Symptom 1. If the input RDD has 0 partitions, the whole lineage will receive 
> 0 partitions, nothing will be executed, the state store will not create any 
> delta files. When this happens, the next trigger fails, because the 
> StateStore fails to load the delta file for the previous trigger
>  Symptom 2. Let's say that there was data. Then in this case, if you stop 
> your stream, and change `coalesce(1)` with `coalesce(2)`, then restart your 
> stream, your stream will fail, because `spark.sql.shuffle.partitions - 1` 
> number of StateStores will fail to find its delta files.
> To fix the issues above, we must check that the partitioning of the child of 
> a `StatefulOperator` satisfies:
>   If the grouping expressions are empty:
>     a) AllTuple distribution
>     b) Single physical partition
>   If the grouping expressions are non empty:
>     a) Clustered distribution
>     b) spark.sql.shuffle.partition # of partitions
> whether or not coalesce(1) exists in the plan, and whether or not the input 
> RDD for the trigger has any data.
> Once you fix the above problem by adding an Exchange to the plan, you come 
> across the following bug:
> If you call `coalesce(1).groupBy().count()` on a Streaming DataFrame, and if 
> you have a trigger with no data, `StateStoreRestoreExec` doesn't return the 
> prior state. However, for this specific aggregation, `HashAggregateExec` 
> after the restore returns a (0, 0) row, since we're performing a count, and 
> there is no data. Then this data gets stored in `StateStoreSaveExec` causing 
> the previous counts to be overwritten and lost.
>   



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to