This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 49b081187fb5 [SPARK-50253][SS] Stream-Stream Join should not fetch
checkpoint ID if not supported
49b081187fb5 is described below
commit 49b081187fb581a1a1e9a232ef05d4600efc4912
Author: Siying Dong <[email protected]>
AuthorDate: Thu Nov 7 14:33:31 2024 +0900
[SPARK-50253][SS] Stream-Stream Join should not fetch checkpoint ID if not
supported
### What changes were proposed in this pull request?
In stream-stream join, only call getLatestCheckpointInfo() when the format
version indicates that checkpoint version is supported.
The other place already have it applied:
https://github.com/apache/spark/blob/07301ddb889bdf361499f65e1708b5fdcab7e539/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L350-L362
### Why are the changes needed?
These code is not needed and wasteful. Also inside it has assertion that we
are not sure whether it is correct when the function is called when the state
store is not created for the checkpointID feature supported.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Make sure existing CI passes.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #48783 from siying/idjoinfix.
Authored-by: Siying Dong <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../streaming/StreamingSymmetricHashJoinExec.scala | 14 +++++++++-----
1 file changed, 9 insertions(+), 5 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
index 4bd531b618e6..c90c87899c73 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
@@ -510,11 +510,15 @@ case class StreamingSymmetricHashJoinExec(
val rightSideMetrics = rightSideJoiner.commitStateAndGetMetrics()
val combinedMetrics = StateStoreMetrics.combine(Seq(leftSideMetrics,
rightSideMetrics))
- val checkpointInfo =
SymmetricHashJoinStateManager.mergeStateStoreCheckpointInfo(
- JoinStateStoreCkptInfo(
- leftSideJoiner.getLatestCheckpointInfo(),
- rightSideJoiner.getLatestCheckpointInfo()))
- setStateStoreCheckpointInfo(checkpointInfo)
+ if (StatefulOperatorStateInfo.enableStateStoreCheckpointIds(conf)) {
+ val checkpointInfo =
SymmetricHashJoinStateManager.mergeStateStoreCheckpointInfo(
+ JoinStateStoreCkptInfo(
+ leftSideJoiner.getLatestCheckpointInfo(),
+ rightSideJoiner.getLatestCheckpointInfo()
+ )
+ )
+ setStateStoreCheckpointInfo(checkpointInfo)
+ }
// Update SQL metrics
numUpdatedStateRows +=
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]