[ 
https://issues.apache.org/jira/browse/SPARK-49829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun closed SPARK-49829.
---------------------------------

> Issues with optimization on adding input to state store in stream-stream join 
> ------------------------------------------------------------------------------
>
>                 Key: SPARK-49829
>                 URL: https://issues.apache.org/jira/browse/SPARK-49829
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 3.4.2, 3.4.0, 3.4.1, 3.5.0, 3.5.1, 3.5.2, 3.4.3, 3.5.3, 
> 4.0.0
>            Reporter: Andrzej Zera
>            Assignee: Jungtaek Lim
>            Priority: Blocker
>              Labels: correctness, pull-request-available
>             Fix For: 3.4.4, 3.5.4, 4.0.0
>
>
> NOTE: The ticket is describing the actual issue after RCA of tests the 
> reporter reported, but I want to still assign the reporter to [~azera], to 
> give the proper credit. Tests are directly pointing to the edge cases and I 
> could easily spot on the root cause based on the simple reproducers. Thanks 
> [~azera] !
> Report link (user@): 
> [https://lists.apache.org/thread/kmq9vlgdbsoytwlcch147n3mlxy56llf]
> [https://github.com/apache/spark/blob/039fd13eacb1cef835045e3a60cebf958589e1a2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L671-L677]
> {code:java}
>         val isLeftSemiWithMatch =
>           joinType == LeftSemi && joinSide == LeftSide && iteratorNotEmpty
>         // Add to state store only if both removal predicates do not match,
>         // and the row is not matched for left side of left semi join.
>         val shouldAddToState =
>           !stateKeyWatermarkPredicateFunc(key) && 
> !stateValueWatermarkPredicateFunc(thisRow) &&
>           !isLeftSemiWithMatch {code}
> The above condition is to determine whether the stream-stream join add the 
> input into state store. The criteria of `both removal predicates do not 
> match` means the input is going to be evicted in this batch - before Spark 
> introduced multiple stateful operators, watermark for late record and 
> watermark for eviction were same, hence the input won't be matched with the 
> condition after filtering out late records (Not sure about the edge case this 
> condition was dealing with.)
> After multiple stateful operators 
> (https://issues.apache.org/jira/browse/SPARK-40925), watermark for late 
> record and watermark for eviction are no longer the same. That said, input 
> can be determined as not late, and can be evicted at the same batch. The 
> above condition has to reflect this change but it was missed, hence having 
> correctness issues on the report.
> SPARK-45637 is a one of the case, which I'll mark as duplicated.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to