[ 
https://issues.apache.org/jira/browse/SPARK-29438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Genmao Yu updated SPARK-29438:
------------------------------
    Description: 
Now, Spark use the `TaskPartitionId` to determine the StateStore path.
{code:java}
TaskPartitionId   \ 
StateStoreVersion  --> StoreProviderId -> StateStore
StateStoreName    /  
{code}
In spark stages, the task partition id is determined by the number of tasks. As 
we said the StateStore file path depends on the task partition id. So if 
stream-stream join task partition id is changed against last batch, it will get 
wrong StateStore data or fail with non-exist StateStore data. In some corner 
cases, it happened. Following is a sample pseudocode:
{code:java}
val df3 = streamDf1.join(streamDf2)
val df5 = streamDf3.join(batchDf4)
val df = df3.union(df5)
df.writeStream...start()
{code}
A simplified DAG like this:
{code:java}
DataSourceV2Scan   Scan Relation     DataSourceV2Scan   DataSourceV2Scan
 (streamDf3)            |               (streamDf1)        (streamDf2)
     |                  |                   |                 |
  Exchange(200)      Exchange(200)       Exchange(200)     Exchange(200)
     |                  |                   |                 | 
   Sort                Sort                 |                 |
     \                  /                   \                 /
      \                /                     \               /
        SortMergeJoin                    StreamingSymmetricHashJoin
                     \                 /
                       \             /
                         \         /
                            Union
{code}
Stream-Steam join task Id will start from 200 to 399 as they are in the same 
stage with `SortMergeJoin`. But when there is no new incoming data in 
`streamDf3` in some batch, it will generate a empty LocalRelation, and then the 
SortMergeJoin will be replaced with a BroadcastHashJoin. In this case, 
Stream-Steam join task Id will start from 1 to 200. Finally, it will get wrong 
StateStore path through TaskPartitionId, and failed with error reading state 
store delta file.
{code:java}
LocalTableScan   Scan Relation     DataSourceV2Scan   DataSourceV2Scan
     |                  |                   |                 |
BroadcastExchange       |              Exchange(200)     Exchange(200)
     |                  |                   |                 | 
     |                  |                   |                 |
      \                /                     \               /
       \             /                        \             /
      BroadcastHashJoin                 StreamingSymmetricHashJoin
                     \                 /
                       \             /
                         \         /
                            Union
{code}
In my job, I closed the auto BroadcastJoin feature (set 
spark.sql.autoBroadcastJoinThreshold=-1) to walk around this bug. We should 
make the StateStore path determinate but not depends on TaskPartitionId.

  was:
Now, Spark use the `TaskPartitionId` to determine the StateStore path.
{code:java}
TaskPartitionId   \ 
StateStoreVersion  --> StoreProviderId -> StateStore
StateStoreName    /  
{code}
In spark stages, the task partition id is determined by the number of tasks. As 
we said the StateStore file path depends on the task partition id. So if 
stream-stream join task partition id is changed against last batch, it will get 
wrong StateStore data or fail with non-exist StateStore data. In some corner 
cases, it happened:
{code:java}
val df3 = streamDf1.join(streamDf2)
val df5 = streamDf3.join(batchDf4)
val df = df3.union(df5)
df.writeStream...start()
{code}
A simplified DAG like this:
{code:java}
DataSourceV2Scan   Scan Relation     DataSourceV2Scan   DataSourceV2Scan
 (streamDf3)            |               (streamDf1)        (streamDf2)
     |                  |                   |                 |
  Exchange(200)      Exchange(200)       Exchange(200)     Exchange(200)
     |                  |                   |                 | 
   Sort                Sort                 |                 |
     \                  /                   \                 /
      \                /                     \               /
        SortMergeJoin                    StreamingSymmetricHashJoin
                     \                 /
                       \             /
                         \         /
                            Union
{code}
Stream-Steam join task Id will start from 200 to 399 as they are in the same 
stage with `SortMergeJoin`. But when there is no new incoming data in 
`streamDf3` in some batch, it will generate a empty LocalRelation, and then the 
SortMergeJoin will be replaced with a BroadcastHashJoin. In this case, 
Stream-Steam join task Id will start from 1 to 200. Finally, it will get wrong 
StateStore path through TaskPartitionId, and failed with error reading state 
store delta file.
{code:java}
LocalTableScan   Scan Relation     DataSourceV2Scan   DataSourceV2Scan
     |                  |                   |                 |
BroadcastExchange       |              Exchange(200)     Exchange(200)
     |                  |                   |                 | 
     |                  |                   |                 |
      \                /                     \               /
       \             /                        \             /
      BroadcastHashJoin                 StreamingSymmetricHashJoin
                     \                 /
                       \             /
                         \         /
                            Union
{code}
In my job, I closed the auto BroadcastJoin feature (set 
spark.sql.autoBroadcastJoinThreshold=-1) to walk around this bug. We should 
make the StateStore path determinate but not depends on TaskPartitionId.


> Failed to get state store in stream-stream join
> -----------------------------------------------
>
>                 Key: SPARK-29438
>                 URL: https://issues.apache.org/jira/browse/SPARK-29438
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.4
>            Reporter: Genmao Yu
>            Priority: Critical
>
> Now, Spark use the `TaskPartitionId` to determine the StateStore path.
> {code:java}
> TaskPartitionId   \ 
> StateStoreVersion  --> StoreProviderId -> StateStore
> StateStoreName    /  
> {code}
> In spark stages, the task partition id is determined by the number of tasks. 
> As we said the StateStore file path depends on the task partition id. So if 
> stream-stream join task partition id is changed against last batch, it will 
> get wrong StateStore data or fail with non-exist StateStore data. In some 
> corner cases, it happened. Following is a sample pseudocode:
> {code:java}
> val df3 = streamDf1.join(streamDf2)
> val df5 = streamDf3.join(batchDf4)
> val df = df3.union(df5)
> df.writeStream...start()
> {code}
> A simplified DAG like this:
> {code:java}
> DataSourceV2Scan   Scan Relation     DataSourceV2Scan   DataSourceV2Scan
>  (streamDf3)            |               (streamDf1)        (streamDf2)
>      |                  |                   |                 |
>   Exchange(200)      Exchange(200)       Exchange(200)     Exchange(200)
>      |                  |                   |                 | 
>    Sort                Sort                 |                 |
>      \                  /                   \                 /
>       \                /                     \               /
>         SortMergeJoin                    StreamingSymmetricHashJoin
>                      \                 /
>                        \             /
>                          \         /
>                             Union
> {code}
> Stream-Steam join task Id will start from 200 to 399 as they are in the same 
> stage with `SortMergeJoin`. But when there is no new incoming data in 
> `streamDf3` in some batch, it will generate a empty LocalRelation, and then 
> the SortMergeJoin will be replaced with a BroadcastHashJoin. In this case, 
> Stream-Steam join task Id will start from 1 to 200. Finally, it will get 
> wrong StateStore path through TaskPartitionId, and failed with error reading 
> state store delta file.
> {code:java}
> LocalTableScan   Scan Relation     DataSourceV2Scan   DataSourceV2Scan
>      |                  |                   |                 |
> BroadcastExchange       |              Exchange(200)     Exchange(200)
>      |                  |                   |                 | 
>      |                  |                   |                 |
>       \                /                     \               /
>        \             /                        \             /
>       BroadcastHashJoin                 StreamingSymmetricHashJoin
>                      \                 /
>                        \             /
>                          \         /
>                             Union
> {code}
> In my job, I closed the auto BroadcastJoin feature (set 
> spark.sql.autoBroadcastJoinThreshold=-1) to walk around this bug. We should 
> make the StateStore path determinate but not depends on TaskPartitionId.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to