[ 
https://issues.apache.org/jira/browse/SPARK-49829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17885771#comment-17885771
 ] 

Jungtaek Lim commented on SPARK-49829:
--------------------------------------

I have a fix - will submit a fix sooner.

> Issues with optimization on adding input to state store in stream-stream join 
> ------------------------------------------------------------------------------
>
>                 Key: SPARK-49829
>                 URL: https://issues.apache.org/jira/browse/SPARK-49829
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 3.5.0, 4.0.0, 3.5.1, 3.5.2, 3.5.3
>            Reporter: Andrzej Zera
>            Priority: Blocker
>              Labels: correctness
>
> NOTE: The ticket is describing the actual issue after RCA of tests the 
> reporter reported, but I want to still assign the reporter to [~azera], to 
> give the proper credit. Tests are directly pointing to the edge cases and I 
> could easily spot on the root cause based on the simple reproducers. Thanks 
> [~azera] !
> Report link (user@): 
> [https://lists.apache.org/thread/kmq9vlgdbsoytwlcch147n3mlxy56llf]
> [https://github.com/apache/spark/blob/039fd13eacb1cef835045e3a60cebf958589e1a2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L671-L677]
> {code:java}
>         val isLeftSemiWithMatch =
>           joinType == LeftSemi && joinSide == LeftSide && iteratorNotEmpty
>         // Add to state store only if both removal predicates do not match,
>         // and the row is not matched for left side of left semi join.
>         val shouldAddToState =
>           !stateKeyWatermarkPredicateFunc(key) && 
> !stateValueWatermarkPredicateFunc(thisRow) &&
>           !isLeftSemiWithMatch {code}
> The above condition is to determine whether the stream-stream join add the 
> input into state store. The criteria of `both removal predicates do not 
> match` means the input is going to be evicted in this batch - before Spark 
> introduced multiple stateful operators, watermark for late record and 
> watermark for eviction were same, hence the input won't be matched with the 
> condition after filtering out late records (Not sure about the edge case this 
> condition was dealing with.)
> After multiple stateful operators, watermark for late record and watermark 
> for eviction are no longer the same. That said, input can be determined as 
> not late, and can be evicted at the same batch. The above condition has to 
> reflect this change but it was missed, hence having correctness issues on the 
> report.
> SPARK-45637 is a one of the case, which I'll mark as duplicated.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to