Jungtaek Lim created SPARK-49829:
------------------------------------
Summary: Issues with optimization on adding input to state store
in stream-stream join
Key: SPARK-49829
URL: https://issues.apache.org/jira/browse/SPARK-49829
Project: Spark
Issue Type: Bug
Components: Structured Streaming
Affects Versions: 3.5.3, 3.5.2, 3.5.1, 3.5.0, 4.0.0
Reporter: Andrzej Zera
NOTE: The ticket is describing the actual issue after RCA of tests the reporter
reported, but I want to still assign the reporter to [~azera], to give the
proper credit. Tests are directly pointing to the edge cases and I could easily
spot on the root cause based on the simple reproducers. Thanks [~azera] !
Report link (user@):
[https://lists.apache.org/thread/kmq9vlgdbsoytwlcch147n3mlxy56llf]
[https://github.com/apache/spark/blob/039fd13eacb1cef835045e3a60cebf958589e1a2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L671-L677]
{code:java}
val isLeftSemiWithMatch =
joinType == LeftSemi && joinSide == LeftSide && iteratorNotEmpty
// Add to state store only if both removal predicates do not match,
// and the row is not matched for left side of left semi join.
val shouldAddToState =
!stateKeyWatermarkPredicateFunc(key) &&
!stateValueWatermarkPredicateFunc(thisRow) &&
!isLeftSemiWithMatch {code}
The above condition is to determine whether the stream-stream join add the
input into state store. The criteria of `both removal predicates do not match`
means the input is going to be evicted in this batch - before Spark introduced
multiple stateful operators, watermark for late record and watermark for
eviction were same, hence the input won't be matched with the condition after
filtering out late records (Not sure about the edge case this condition was
dealing with.)
After multiple stateful operators, watermark for late record and watermark for
eviction are no longer the same. That said, input can be determined as not
late, and can be evicted at the same batch. The above condition has to reflect
this change but it was missed, hence having correctness issues on the report.
SPARK-45637 is a one of the case, which I'll mark as duplicated.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]