[
https://issues.apache.org/jira/browse/SPARK-29438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Genmao Yu updated SPARK-29438:
------------------------------
Description:
Now, Spark use the `TaskPartitionId` to determine the StateStore path.
{code:java}
TaskPartitionId \
StateStoreVersion --> StoreProviderId -> StateStore
StateStoreName /
{code}
In spark stages, the task partition id is determined by the number of tasks. As
we said the StateStore file path depends on the task partition id. So if
stream-stream join task partition id is changed against last batch, it will get
wrong StateStore data or fail with non-exist StateStore data. In some corner
cases, it happened:
{code:java}
val df3 = streamDf1.join(streamDf2)
val df5 = streamDf3.join(batchDf4)
val df = df3.union(df5)
df.writeStream...start()
{code}
A simplified DAG like this:
{code:java}
DataSourceV2Scan Scan Relation DataSourceV2Scan DataSourceV2Scan
(streamDf3) | (streamDf1) (streamDf2)
| | | |
Exchange(200) Exchange(200) Exchange(200) Exchange(200)
| | | |
Sort Sort | |
\ / \ /
\ / \ /
SortMergeJoin StreamingSymmetricHashJoin
\ /
\ /
\ /
Union
{code}
Stream-Steam join task Id will start from 200 to 399 as they are in the same
stage with `SortMergeJoin`. But when there is no new incoming data in
`streamDf3` in some batch, it will generate a empty LocalRelation, and then the
SortMergeJoin will be replaced with a BroadcastHashJoin. In this case,
Stream-Steam join task Id will start from 1 to 200. Finally, it will get wrong
StateStore path through TaskPartitionId, and failed with error reading state
store delta file.
{code:java}
LocalTableScan Scan Relation DataSourceV2Scan DataSourceV2Scan
| | | |
BroadcastExchange | Exchange(200) Exchange(200)
| | | |
| | | |
\ / \ /
\ / \ /
BroadcastHashJoin StreamingSymmetricHashJoin
\ /
\ /
\ /
Union
{code}
In my job, I closed the auto BroadcastJoin feature (set
spark.sql.autoBroadcastJoinThreshold=-1) to walk around this bug. We should
make the StateStore path determinate but not depends on TaskPartitionId.
was:
Now, Spark use the `TaskPartitionId` to determine the StateStore path.
{code:java}
TaskPartitionId \
StateStoreVersion --> StoreProviderId -> StateStore
StateStoreName /
{code}
In spark stages, the task partition id is determined by the number of tasks. As
we said the StateStore file path depends on the task partition id. So if
stream-stream join task partition id is changed against last batch, it will get
wrong StateStore data or fail with non-exist StateStore data. In some corner
cases, it happened:
{code:java}
val df3 = streamDf1.join(streamDf2)
val df5 = streamDf3.join(batchDf4)
val df = df3.union(df5)
df.writeStream...start()
{code}
A simplified DAG like this:
{code:java}
DataSourceV2Scan Scan Relation DataSourceV2Scan DataSourceV2Scan
(streamDf3) | (streamDf1) (streamDf2)
| | | |
Exchange(200) Exchange(200) Exchange(200) Exchange(200)
| | | |
Sort Sort | |
\ / \ /
\ / \ /
SortMergeJoin StreamingSymmetricHashJoin
\ /
\ /
\ /
Union
{code}
Stream-Steam join task Id will start from 200 to 399 as they are in the same
stage with `SortMergeJoin`. But when there is no new incoming data in
`streamDf3` in some batch, it will generate a empty LocalRelation, and then the
SortMergeJoin will be replaced with a BroadcastHashJoin. In this case,
Stream-Steam join task Id will start from 1 to 200. Finally, it will get wrong
StateStore path through TaskPartitionId, and failed with error reading state
store delta file.
{code:java}
LocalTableScan Scan Relation DataSourceV2Scan DataSourceV2Scan
| | | |
BroadcastExchange | Exchange(200) Exchange(200)
| | | |
| | | |
\ / \ /
\ / \ /
BroadcastHashJoin StreamingSymmetricHashJoin
\ /
\ /
\ /
Union
{code}
In my job, I closed the auto BroadcastJoin feature (set
spark.sql.autoBroadcastJoinThreshold=false) to walk around this bug. We should
make the StateStore path determinate but not depends on TaskPartitionId.
> Failed to get state store in stream-stream join
> -----------------------------------------------
>
> Key: SPARK-29438
> URL: https://issues.apache.org/jira/browse/SPARK-29438
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 2.4.4
> Reporter: Genmao Yu
> Priority: Critical
>
> Now, Spark use the `TaskPartitionId` to determine the StateStore path.
> {code:java}
> TaskPartitionId \
> StateStoreVersion --> StoreProviderId -> StateStore
> StateStoreName /
> {code}
> In spark stages, the task partition id is determined by the number of tasks.
> As we said the StateStore file path depends on the task partition id. So if
> stream-stream join task partition id is changed against last batch, it will
> get wrong StateStore data or fail with non-exist StateStore data. In some
> corner cases, it happened:
> {code:java}
> val df3 = streamDf1.join(streamDf2)
> val df5 = streamDf3.join(batchDf4)
> val df = df3.union(df5)
> df.writeStream...start()
> {code}
> A simplified DAG like this:
> {code:java}
> DataSourceV2Scan Scan Relation DataSourceV2Scan DataSourceV2Scan
> (streamDf3) | (streamDf1) (streamDf2)
> | | | |
> Exchange(200) Exchange(200) Exchange(200) Exchange(200)
> | | | |
> Sort Sort | |
> \ / \ /
> \ / \ /
> SortMergeJoin StreamingSymmetricHashJoin
> \ /
> \ /
> \ /
> Union
> {code}
> Stream-Steam join task Id will start from 200 to 399 as they are in the same
> stage with `SortMergeJoin`. But when there is no new incoming data in
> `streamDf3` in some batch, it will generate a empty LocalRelation, and then
> the SortMergeJoin will be replaced with a BroadcastHashJoin. In this case,
> Stream-Steam join task Id will start from 1 to 200. Finally, it will get
> wrong StateStore path through TaskPartitionId, and failed with error reading
> state store delta file.
> {code:java}
> LocalTableScan Scan Relation DataSourceV2Scan DataSourceV2Scan
> | | | |
> BroadcastExchange | Exchange(200) Exchange(200)
> | | | |
> | | | |
> \ / \ /
> \ / \ /
> BroadcastHashJoin StreamingSymmetricHashJoin
> \ /
> \ /
> \ /
> Union
> {code}
> In my job, I closed the auto BroadcastJoin feature (set
> spark.sql.autoBroadcastJoinThreshold=-1) to walk around this bug. We should
> make the StateStore path determinate but not depends on TaskPartitionId.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]