Jungtaek Lim created SPARK-49829:
------------------------------------

             Summary: Issues with optimization on adding input to state store 
in stream-stream join 
                 Key: SPARK-49829
                 URL: https://issues.apache.org/jira/browse/SPARK-49829
             Project: Spark
          Issue Type: Bug
          Components: Structured Streaming
    Affects Versions: 3.5.3, 3.5.2, 3.5.1, 3.5.0, 4.0.0
            Reporter: Andrzej Zera


NOTE: The ticket is describing the actual issue after RCA of tests the reporter 
reported, but I want to still assign the reporter to [~azera], to give the 
proper credit. Tests are directly pointing to the edge cases and I could easily 
spot on the root cause based on the simple reproducers. Thanks [~azera] !

Report link (user@): 
[https://lists.apache.org/thread/kmq9vlgdbsoytwlcch147n3mlxy56llf]

[https://github.com/apache/spark/blob/039fd13eacb1cef835045e3a60cebf958589e1a2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L671-L677]
{code:java}
        val isLeftSemiWithMatch =
          joinType == LeftSemi && joinSide == LeftSide && iteratorNotEmpty
        // Add to state store only if both removal predicates do not match,
        // and the row is not matched for left side of left semi join.
        val shouldAddToState =
          !stateKeyWatermarkPredicateFunc(key) && 
!stateValueWatermarkPredicateFunc(thisRow) &&
          !isLeftSemiWithMatch {code}
The above condition is to determine whether the stream-stream join add the 
input into state store. The criteria of `both removal predicates do not match` 
means the input is going to be evicted in this batch - before Spark introduced 
multiple stateful operators, watermark for late record and watermark for 
eviction were same, hence the input won't be matched with the condition after 
filtering out late records (Not sure about the edge case this condition was 
dealing with.)

After multiple stateful operators, watermark for late record and watermark for 
eviction are no longer the same. That said, input can be determined as not 
late, and can be evicted at the same batch. The above condition has to reflect 
this change but it was missed, hence having correctness issues on the report.

SPARK-45637 is a one of the case, which I'll mark as duplicated.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to