This is an automated email from the ASF dual-hosted git repository.

HeartSaVioR pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 55ff0269052b [SPARK-56839][SS] Remove dev loop config for new state 
format of stream-stream join
55ff0269052b is described below

commit 55ff0269052b388531c67df28fbf77bb300e0bf7
Author: Jungtaek Lim <[email protected]>
AuthorDate: Wed May 20 19:23:44 2026 +0900

    [SPARK-56839][SS] Remove dev loop config for new state format of 
stream-stream join
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to remove dev-loop config for new state format of 
stream-stream join.
    
    ### Why are the changes needed?
    
    We made a code complete about new state format in stream-stream join in 
Apache Spark 4.2.0, but forgot about removing the config which was a gate to 
prevent the feature to be released "in the middle". Since all code changes have 
been made into 4.2.0, the flag is unnecessary.
    
    They still need to opt-in the state format version, so that's effectively 
playing a gate even though we remove the gate config. Keeping the gate config 
is just more verbose.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing UTs. (Actually N/A since the config has been `true` for UTs)
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Opus 4.7
    
    Closes #55837 from 
HeartSaVioR/remove-dev-loop-config-for-stream-stream-join-new-state-format.
    
    Authored-by: Jungtaek Lim <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../scala/org/apache/spark/sql/internal/SQLConf.scala   | 17 ++++-------------
 .../stateful/join/SymmetricHashJoinStateManager.scala   |  2 --
 2 files changed, 4 insertions(+), 15 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 498cf2a1cded..77ef8bb600f9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3306,24 +3306,15 @@ object SQLConf {
       .doc("State format version used by streaming join operations in a 
streaming query. " +
         "State between versions are tend to be incompatible, so state format 
version shouldn't " +
         "be modified after running. Version 3 uses a single state store with 
virtual column " +
-        "families instead of four stores and is only supported with RocksDB. 
NOTE: version " +
-        "1 is DEPRECATED and should not be explicitly set by users. " +
-        "Version 4 is under development and only available for testing.")
+        "families instead of four stores and is only supported with RocksDB. 
Version 4 optimizes " +
+        "watermark-based eviction, also using a single state store with 
virtual " +
+        "column families and only supported with RocksDB. NOTE: version 1 is 
DEPRECATED and " +
+        "should not be explicitly set by users.")
       .version("3.0.0")
       .intConf
       .checkValue(v => Set(1, 2, 3, 4).contains(v), "Valid versions are 1, 2, 
3, and 4")
       .createWithDefault(2)
 
-  val STREAMING_JOIN_STATE_FORMAT_V4_ENABLED =
-    buildConf("spark.sql.streaming.join.stateFormatV4.enabled")
-      .internal()
-      .doc("When true, enables state format version 4 for stream-stream joins. 
" +
-        "This config will be removed once V4 is complete.")
-      .version("4.2.0")
-      .withBindingPolicy(ConfigBindingPolicy.SESSION)
-      .booleanConf
-      .createWithDefaultFunction(() => Utils.isTesting)
-
   val STREAMING_SESSION_WINDOW_MERGE_SESSIONS_IN_LOCAL_PARTITION =
     
buildConf("spark.sql.streaming.sessionWindow.merge.sessions.in.local.partition")
       .doc("When true, streaming session window sorts and merge sessions in 
local partition " +
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala
index 02c9ef11df89..ea8fc861b104 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala
@@ -2045,8 +2045,6 @@ object SymmetricHashJoinStateManager {
       joinStoreGenerator: JoinStateManagerStoreGenerator,
       joinKeyOrdinalForWatermark: Option[Int] = None): 
SymmetricHashJoinStateManager = {
     if (stateFormatVersion == 4) {
-      
require(SQLConf.get.getConf(SQLConf.STREAMING_JOIN_STATE_FORMAT_V4_ENABLED),
-        "State format version 4 is under development.")
       new SymmetricHashJoinStateManagerV4(
         joinSide, inputValueAttributes, joinKeys, stateInfo, storeConf, 
hadoopConf,
         partitionId, keyToNumValuesStateStoreCkptId, 
keyWithIndexToValueStateStoreCkptId,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to