[
https://issues.apache.org/jira/browse/SPARK-49829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dongjoon Hyun closed SPARK-49829.
---------------------------------
> Issues with optimization on adding input to state store in stream-stream join
> ------------------------------------------------------------------------------
>
> Key: SPARK-49829
> URL: https://issues.apache.org/jira/browse/SPARK-49829
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 3.4.2, 3.4.0, 3.4.1, 3.5.0, 3.5.1, 3.5.2, 3.4.3, 3.5.3,
> 4.0.0
> Reporter: Andrzej Zera
> Assignee: Jungtaek Lim
> Priority: Blocker
> Labels: correctness, pull-request-available
> Fix For: 3.4.4, 3.5.4, 4.0.0
>
>
> NOTE: The ticket is describing the actual issue after RCA of tests the
> reporter reported, but I want to still assign the reporter to [~azera], to
> give the proper credit. Tests are directly pointing to the edge cases and I
> could easily spot on the root cause based on the simple reproducers. Thanks
> [~azera] !
> Report link (user@):
> [https://lists.apache.org/thread/kmq9vlgdbsoytwlcch147n3mlxy56llf]
> [https://github.com/apache/spark/blob/039fd13eacb1cef835045e3a60cebf958589e1a2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L671-L677]
> {code:java}
> val isLeftSemiWithMatch =
> joinType == LeftSemi && joinSide == LeftSide && iteratorNotEmpty
> // Add to state store only if both removal predicates do not match,
> // and the row is not matched for left side of left semi join.
> val shouldAddToState =
> !stateKeyWatermarkPredicateFunc(key) &&
> !stateValueWatermarkPredicateFunc(thisRow) &&
> !isLeftSemiWithMatch {code}
> The above condition is to determine whether the stream-stream join add the
> input into state store. The criteria of `both removal predicates do not
> match` means the input is going to be evicted in this batch - before Spark
> introduced multiple stateful operators, watermark for late record and
> watermark for eviction were same, hence the input won't be matched with the
> condition after filtering out late records (Not sure about the edge case this
> condition was dealing with.)
> After multiple stateful operators
> (https://issues.apache.org/jira/browse/SPARK-40925), watermark for late
> record and watermark for eviction are no longer the same. That said, input
> can be determined as not late, and can be evicted at the same batch. The
> above condition has to reflect this change but it was missed, hence having
> correctness issues on the report.
> SPARK-45637 is a one of the case, which I'll mark as duplicated.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]