This is an automated email from the ASF dual-hosted git repository.
HeartSaVioR pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.x by this push:
new 3135e3cece17 [SPARK-56839][SS] Remove dev loop config for new state
format of stream-stream join
3135e3cece17 is described below
commit 3135e3cece17521f98523d1273c264a96b436b8e
Author: Jungtaek Lim <[email protected]>
AuthorDate: Wed May 20 19:23:44 2026 +0900
[SPARK-56839][SS] Remove dev loop config for new state format of
stream-stream join
### What changes were proposed in this pull request?
This PR proposes to remove dev-loop config for new state format of
stream-stream join.
### Why are the changes needed?
We made a code complete about new state format in stream-stream join in
Apache Spark 4.2.0, but forgot about removing the config which was a gate to
prevent the feature to be released "in the middle". Since all code changes have
been made into 4.2.0, the flag is unnecessary.
They still need to opt-in the state format version, so that's effectively
playing a gate even though we remove the gate config. Keeping the gate config
is just more verbose.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UTs. (Actually N/A since the config has been `true` for UTs)
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.7
Closes #55837 from
HeartSaVioR/remove-dev-loop-config-for-stream-stream-join-new-state-format.
Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
(cherry picked from commit 55ff0269052b388531c67df28fbf77bb300e0bf7)
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../scala/org/apache/spark/sql/internal/SQLConf.scala | 17 ++++-------------
.../stateful/join/SymmetricHashJoinStateManager.scala | 2 --
2 files changed, 4 insertions(+), 15 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 57f39ad72355..03d6fe96c33b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3274,24 +3274,15 @@ object SQLConf {
.doc("State format version used by streaming join operations in a
streaming query. " +
"State between versions are tend to be incompatible, so state format
version shouldn't " +
"be modified after running. Version 3 uses a single state store with
virtual column " +
- "families instead of four stores and is only supported with RocksDB.
NOTE: version " +
- "1 is DEPRECATED and should not be explicitly set by users. " +
- "Version 4 is under development and only available for testing.")
+ "families instead of four stores and is only supported with RocksDB.
Version 4 optimizes " +
+ "watermark-based eviction, also using a single state store with
virtual " +
+ "column families and only supported with RocksDB. NOTE: version 1 is
DEPRECATED and " +
+ "should not be explicitly set by users.")
.version("3.0.0")
.intConf
.checkValue(v => Set(1, 2, 3, 4).contains(v), "Valid versions are 1, 2,
3, and 4")
.createWithDefault(2)
- val STREAMING_JOIN_STATE_FORMAT_V4_ENABLED =
- buildConf("spark.sql.streaming.join.stateFormatV4.enabled")
- .internal()
- .doc("When true, enables state format version 4 for stream-stream joins.
" +
- "This config will be removed once V4 is complete.")
- .version("4.2.0")
- .withBindingPolicy(ConfigBindingPolicy.SESSION)
- .booleanConf
- .createWithDefaultFunction(() => Utils.isTesting)
-
val STREAMING_SESSION_WINDOW_MERGE_SESSIONS_IN_LOCAL_PARTITION =
buildConf("spark.sql.streaming.sessionWindow.merge.sessions.in.local.partition")
.doc("When true, streaming session window sorts and merge sessions in
local partition " +
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala
index 02c9ef11df89..ea8fc861b104 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala
@@ -2045,8 +2045,6 @@ object SymmetricHashJoinStateManager {
joinStoreGenerator: JoinStateManagerStoreGenerator,
joinKeyOrdinalForWatermark: Option[Int] = None):
SymmetricHashJoinStateManager = {
if (stateFormatVersion == 4) {
-
require(SQLConf.get.getConf(SQLConf.STREAMING_JOIN_STATE_FORMAT_V4_ENABLED),
- "State format version 4 is under development.")
new SymmetricHashJoinStateManagerV4(
joinSide, inputValueAttributes, joinKeys, stateInfo, storeConf,
hadoopConf,
partitionId, keyToNumValuesStateStoreCkptId,
keyWithIndexToValueStateStoreCkptId,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]