This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a9c7140fad8 [SPARK-55628][SS] Integrate stream-stream join state 
format V4
9a9c7140fad8 is described below

commit 9a9c7140fad894e7362c51ec02a629b2a595348b
Author: Nicholas Chew <[email protected]>
AuthorDate: Fri Mar 13 14:40:16 2026 +0900

    [SPARK-55628][SS] Integrate stream-stream join state format V4
    
    ### What changes were proposed in this pull request?
    
    Integrate stream-stream join state format V4 which uses timestamp-based 
indexing with a secondary index.
    
    Key changes:
    - Enable V4 in `STREAMING_JOIN_STATE_FORMAT_VERSION` config
    - Gated V4 behind `spark.sql.streaming.join.stateFormatV4.enabled` while V4 
is under development.
    - Route V4 to use VCF (`stateFormatVersion >= 3`) and hardcode schema 
version 3 for VCF path
    - Fix checkpoint ID routing for V4's single-store design
    - Mark V4's secondary index (`TsWithKeyStore`) as `isInternal = true` to 
prevent double-counting in `numRowsTotal` metrics
    - Convert watermark from milliseconds to microseconds at all 4 eviction 
call sites (V4 stores timestamps as `TimestampType`)
    - Add `TimestampAsPostfixKeyStateEncoderSpec` and 
`TimestampAsPrefixKeyStateEncoderSpec` to `KeyStateEncoderSpec.fromJson` for 
checkpoint restart deserialization
    - Add V4 branch in `getSchemaForStateStores` and 
`getSchemasForStateStoreWithColFamily` for correct column family schemas and 
encoder specs
    
    ### Why are the changes needed?
    
    SPARK-55628 tracks the integration of V4 state format into the 
stream-stream join operator. V4 was implemented in SPARK-55144 but not yet 
wired into the operator.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. V4 is gated behind an internal config 
(`spark.sql.streaming.join.stateFormatVersion=4`, default remains 2). V4 is 
marked as experimental and subject to change.
    
    ### How was this patch tested?
    
    - Added `StreamingJoinV4Suite.scala` with 4 new test suites: 
`StreamingInnerJoinV4Suite`, `StreamingOuterJoinV4Suite`, 
`StreamingFullOuterJoinV4Suite`, `StreamingLeftSemiJoinV4Suite`
    - All suites re-run existing join tests with V4 config via 
`TestWithV4StateFormat` trait
    - 2 V4-specific tests: plan assertion (verifies `stateFormatVersion == 4` 
in execution plan) and schema validation (verifies correct column families and 
encoder specs)
    - 94/94 tests pass across all 4 suites
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Yes
    
    ### Behavioral Change Information
    
    - [ ] This is a behavioral change
    - [x] This is not a behavioral change
    
    Closes #54777 from nicholaschew11/spark-55628-v4-join-integration.
    
    Authored-by: Nicholas Chew <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    |  17 +-
 .../join/StreamingSymmetricHashJoinExec.scala      |  29 ++-
 .../join/SymmetricHashJoinStateManager.scala       |  86 +++++---
 .../sql/execution/streaming/state/StateStore.scala |   4 +
 .../spark/sql/streaming/StreamingJoinSuite.scala   |   2 +-
 .../spark/sql/streaming/StreamingJoinV4Suite.scala | 228 +++++++++++++++++++++
 6 files changed, 324 insertions(+), 42 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 8c9796b71689..f8c2ebb75ddc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3133,14 +3133,23 @@ object SQLConf {
         "State between versions are tend to be incompatible, so state format 
version shouldn't " +
         "be modified after running. Version 3 uses a single state store with 
virtual column " +
         "families instead of four stores and is only supported with RocksDB. 
NOTE: version " +
-        "1 is DEPRECATED and should not be explicitly set by users.")
+        "1 is DEPRECATED and should not be explicitly set by users. " +
+        "Version 4 is under development and only available for testing.")
       .version("3.0.0")
       .intConf
-      // TODO: [SPARK-55628] Add version 4 once we integrate the state format 
version 4 into
-      //  stream-stream join operator.
-      .checkValue(v => Set(1, 2, 3).contains(v), "Valid versions are 1, 2, and 
3")
+      .checkValue(v => Set(1, 2, 3, 4).contains(v), "Valid versions are 1, 2, 
3, and 4")
       .createWithDefault(2)
 
+  val STREAMING_JOIN_STATE_FORMAT_V4_ENABLED =
+    buildConf("spark.sql.streaming.join.stateFormatV4.enabled")
+      .internal()
+      .doc("When true, enables state format version 4 for stream-stream joins. 
" +
+        "This config will be removed once V4 is complete.")
+      .version("4.2.0")
+      .withBindingPolicy(ConfigBindingPolicy.SESSION)
+      .booleanConf
+      .createWithDefaultFunction(() => Utils.isTesting)
+
   val STREAMING_SESSION_WINDOW_MERGE_SESSIONS_IN_LOCAL_PARTITION =
     
buildConf("spark.sql.streaming.sessionWindow.merge.sessions.in.local.partition")
       .doc("When true, streaming session window sorts and merge sessions in 
local partition " +
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinExec.scala
index d8ad576bb68a..95acff230269 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinExec.scala
@@ -198,7 +198,7 @@ case class StreamingSymmetricHashJoinExec(
   private val allowMultipleStatefulOperators =
     conf.getConf(SQLConf.STATEFUL_OPERATOR_ALLOW_MULTIPLE)
 
-  private val useVirtualColumnFamilies = stateFormatVersion == 3
+  private val useVirtualColumnFamilies = stateFormatVersion >= 3
 
   // Determine the store names and metadata version based on format version
   private val (numStoresPerPartition, _stateStoreNames, 
_operatorStateMetadataVersion) =
@@ -292,8 +292,12 @@ case class StreamingSymmetricHashJoinExec(
       val info = getStateInfo
       val stateSchemaDir = stateSchemaDirPath()
 
+      // V4 uses VCF like V3, which requires schema version 3. The 
stateSchemaVersion
+      // parameter may carry the stateFormatVersion (e.g. 4) from 
IncrementalExecution,
+      // so we hardcode 3 here for the VCF path.
+      val effectiveSchemaVersion = 3
       validateAndWriteStateSchema(
-        hadoopConf, batchId, stateSchemaVersion, info, stateSchemaDir, session
+        hadoopConf, batchId, effectiveSchemaVersion, info, stateSchemaDir, 
session
       )
     } else {
       var result: Map[String, (StructType, StructType)] = Map.empty
@@ -437,7 +441,7 @@ case class StreamingSymmetricHashJoinExec(
           removedRowIter.filterNot { kv =>
             stateFormatVersion match {
               case 1 => matchesWithRightSideState(new UnsafeRowPair(kv.key, 
kv.value))
-              case 2 | 3 => kv.matched
+              case 2 | 3 | 4 => kv.matched
               case _ => throwBadStateFormatVersionException()
             }
           }.map(pair => joinedRow.withLeft(pair.value).withRight(nullRight))
@@ -463,7 +467,7 @@ case class StreamingSymmetricHashJoinExec(
           removedRowIter.filterNot { kv =>
             stateFormatVersion match {
               case 1 => matchesWithLeftSideState(new UnsafeRowPair(kv.key, 
kv.value))
-              case 2 | 3 => kv.matched
+              case 2 | 3 | 4 => kv.matched
               case _ => throwBadStateFormatVersionException()
             }
           }.map(pair => joinedRow.withLeft(nullLeft).withRight(pair.value))
@@ -479,7 +483,7 @@ case class StreamingSymmetricHashJoinExec(
       case FullOuter =>
         lazy val isKeyToValuePairMatched = (kv: KeyToValuePair) =>
           stateFormatVersion match {
-            case 2 | 3 => kv.matched
+            case 2 | 3 | 4 => kv.matched
             case _ => throwBadStateFormatVersionException()
           }
 
@@ -801,7 +805,7 @@ case class StreamingSymmetricHashJoinExec(
               s.evictByKeyCondition(stateKeyWatermarkPredicateFunc)
 
             case s: SupportsEvictByTimestamp =>
-              s.evictByTimestamp(stateWatermark)
+              s.evictByTimestamp(watermarkMsToStateTimestamp(stateWatermark))
           }
         case Some(JoinStateValueWatermarkPredicate(_, stateWatermark)) =>
           joinStateManager match {
@@ -809,7 +813,7 @@ case class StreamingSymmetricHashJoinExec(
               s.evictByValueCondition(stateValueWatermarkPredicateFunc)
 
             case s: SupportsEvictByTimestamp =>
-              s.evictByTimestamp(stateWatermark)
+              s.evictByTimestamp(watermarkMsToStateTimestamp(stateWatermark))
           }
         case _ => 0L
       }
@@ -833,7 +837,7 @@ case class StreamingSymmetricHashJoinExec(
               s.evictAndReturnByKeyCondition(stateKeyWatermarkPredicateFunc)
 
             case s: SupportsEvictByTimestamp =>
-              s.evictAndReturnByTimestamp(stateWatermark)
+              
s.evictAndReturnByTimestamp(watermarkMsToStateTimestamp(stateWatermark))
           }
         case Some(JoinStateValueWatermarkPredicate(_, stateWatermark)) =>
           joinStateManager match {
@@ -841,12 +845,19 @@ case class StreamingSymmetricHashJoinExec(
               
s.evictAndReturnByValueCondition(stateValueWatermarkPredicateFunc)
 
             case s: SupportsEvictByTimestamp =>
-              s.evictAndReturnByTimestamp(stateWatermark)
+              
s.evictAndReturnByTimestamp(watermarkMsToStateTimestamp(stateWatermark))
           }
         case _ => Iterator.empty
       }
     }
 
+    /**
+     * V4 stores timestamps in microseconds (TimestampType) while the watermark
+     * is tracked in milliseconds. Convert ms to microseconds for eviction 
calls.
+     */
+    private def watermarkMsToStateTimestamp(watermarkMs: Long): Long =
+      watermarkMs * 1000
+
     /** Commit changes to the buffer state */
     def commitState(): Unit = {
       joinStateManager.commit()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala
index 9aa41e196659..ae18c49472f7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric
 import 
org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperatorStateInfo,
 StatefulOpStateStoreCheckpointInfo, WatermarkSupport}
 import 
org.apache.spark.sql.execution.streaming.operators.stateful.join.StreamingSymmetricHashJoinHelper._
 import 
org.apache.spark.sql.execution.streaming.state.{DropLastNFieldsStatePartitionKeyExtractor,
 KeyStateEncoderSpec, NoopStatePartitionKeyExtractor, 
NoPrefixKeyStateEncoderSpec, StatePartitionKeyExtractor, StateSchemaBroadcast, 
StateStore, StateStoreCheckpointInfo, StateStoreColFamilySchema, 
StateStoreConf, StateStoreErrors, StateStoreId, StateStoreMetrics, 
StateStoreProvider, StateStoreProviderId, SupportsFineGrainedReplay, 
TimestampAsPostfixKeyStateEncoderSpec, TimestampAsPrefixKeySt [...]
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{BooleanType, DataType, LongType, NullType, 
StructField, StructType}
 import org.apache.spark.util.NextIterator
 
@@ -252,9 +253,9 @@ class SymmetricHashJoinStateManagerV4(
     Seq(StructField("dummy", NullType, nullable = true))
   )
 
-  // TODO: [SPARK-55628] Below two fields need to be handled properly during 
integration with
-  //   the operator.
-  private val stateStoreCkptId: Option[String] = None
+  // V4 uses a single store with VCFs (not separate 
keyToNumValues/keyWithIndexToValue stores).
+  // Use the keyToNumValues checkpoint ID for loading the correct committed 
version.
+  private val stateStoreCkptId: Option[String] = keyToNumValuesStateStoreCkptId
   private val handlerSnapshotOptions: Option[HandlerSnapshotOptions] = None
 
   private var stateStoreProvider: StateStoreProvider = _
@@ -496,7 +497,7 @@ class SymmetricHashJoinStateManagerV4(
     private val attachTimestampProjection: UnsafeProjection =
       TimestampKeyStateEncoder.getAttachTimestampProjection(keySchema)
 
-    // Create the specific column family in the store for this join side's 
KeyWithIndexToValueStore
+    // Create the specific column family in the store for this join side's 
KeyWithTsToValuesStore.
     stateStore.createColFamilyIfAbsent(
       colFamilyName,
       keySchema,
@@ -648,13 +649,15 @@ class SymmetricHashJoinStateManagerV4(
     private val attachTimestampProjection: UnsafeProjection =
       TimestampKeyStateEncoder.getAttachTimestampProjection(keySchema)
 
-    // Create the specific column family in the store for this join side's 
KeyWithIndexToValueStore
+    // Create the specific column family in the store for this join side's 
TsWithKeyStore.
+    // Mark as internal so that numKeys counts only primary data, not the 
secondary index.
     stateStore.createColFamilyIfAbsent(
       colFamilyName,
       keySchema,
       valueStructType,
       TimestampAsPrefixKeyStateEncoderSpec(keySchemaWithTimestamp),
-      useMultipleValuesPerKey = true
+      useMultipleValuesPerKey = true,
+      isInternal = true
     )
 
     private def createKeyRow(key: UnsafeRow, timestamp: Long): UnsafeRow = {
@@ -1311,8 +1314,8 @@ abstract class SymmetricHashJoinStateManagerBase(
       val handlerSnapshotOptions: Option[HandlerSnapshotOptions] = None)
     extends StateStoreHandler(
       KeyToNumValuesType, keyToNumValuesStateStoreCkptId, 
handlerSnapshotOptions) {
-SnapshotOptions
-    private val useVirtualColumnFamilies = stateFormatVersion == 3
+
+    private val useVirtualColumnFamilies = stateFormatVersion >= 3
     private val longValueSchema = new StructType().add("value", "long")
     private val longToUnsafeRow = UnsafeProjection.create(longValueSchema)
     private val valueRow = longToUnsafeRow(new 
SpecificInternalRow(longValueSchema))
@@ -1411,7 +1414,7 @@ SnapshotOptions
     extends StateStoreHandler(
       KeyWithIndexToValueType, keyWithIndexToValueStateStoreCkptId, 
handlerSnapshotOptions) {
 
-    private val useVirtualColumnFamilies = stateFormatVersion == 3
+    private val useVirtualColumnFamilies = stateFormatVersion >= 3
     private val keyWithIndexExprs = keyAttributes :+ Literal(1L)
     private val keyWithIndexSchema = keySchema.add("index", LongType)
     private val indexOrdinalInKeyWithIndexRow = keyAttributes.size
@@ -1744,6 +1747,8 @@ object SymmetricHashJoinStateManager {
       snapshotOptions: Option[SnapshotOptions] = None,
       joinStoreGenerator: JoinStateManagerStoreGenerator): 
SymmetricHashJoinStateManager = {
     if (stateFormatVersion == 4) {
+      
require(SQLConf.get.getConf(SQLConf.STREAMING_JOIN_STATE_FORMAT_V4_ENABLED),
+        "State format version 4 is under development.")
       new SymmetricHashJoinStateManagerV4(
         joinSide, inputValueAttributes, joinKeys, stateInfo, storeConf, 
hadoopConf,
         partitionId, keyToNumValuesStateStoreCkptId, 
keyWithIndexToValueStateStoreCkptId,
@@ -1780,28 +1785,44 @@ object SymmetricHashJoinStateManager {
       inputValueAttributes: Seq[Attribute],
       joinKeys: Seq[Expression],
       stateFormatVersion: Int): Map[String, (StructType, StructType)] = {
-    var result: Map[String, (StructType, StructType)] = Map.empty
-
-    // get the key and value schema for the KeyToNumValues state store
     val keySchema = StructType(
       joinKeys.zipWithIndex.map { case (k, i) => StructField(s"field$i", 
k.dataType, k.nullable) })
-    val longValueSchema = new StructType().add("value", "long")
-    result += (getStateStoreName(joinSide, KeyToNumValuesType) -> (keySchema, 
longValueSchema))
-
-    // get the key and value schema for the KeyWithIndexToValue state store
-    val keyWithIndexSchema = keySchema.add("index", LongType)
-    val valueSchema = if (stateFormatVersion == 1) {
-      inputValueAttributes
-    } else if (stateFormatVersion == 2 || stateFormatVersion == 3) {
-      inputValueAttributes :+ AttributeReference("matched", BooleanType)()
+
+    if (stateFormatVersion == 4) {
+      // V4 uses two column families: KeyWithTsToValues and TsWithKey
+      val keySchemaWithTimestamp =
+        TimestampKeyStateEncoder.keySchemaWithTimestamp(keySchema)
+      val valueWithMatchedSchema =
+        (inputValueAttributes :+ AttributeReference("matched", 
BooleanType)()).toStructType
+      val dummyValueSchema = StructType(Array(StructField("__dummy__", 
NullType)))
+
+      Map(
+        getStateStoreName(joinSide, KeyWithTsToValuesType) ->
+          (keySchemaWithTimestamp, valueWithMatchedSchema),
+        getStateStoreName(joinSide, TsWithKeyType) ->
+          (keySchemaWithTimestamp, dummyValueSchema))
     } else {
-      throw new IllegalArgumentException("Incorrect state format version! " +
-        s"version=$stateFormatVersion")
-    }
-    result += (getStateStoreName(joinSide, KeyWithIndexToValueType) ->
-      (keyWithIndexSchema, valueSchema.toStructType))
+      var result: Map[String, (StructType, StructType)] = Map.empty
+
+      // get the key and value schema for the KeyToNumValues state store
+      val longValueSchema = new StructType().add("value", "long")
+      result += (getStateStoreName(joinSide, KeyToNumValuesType) -> 
(keySchema, longValueSchema))
+
+      // get the key and value schema for the KeyWithIndexToValue state store
+      val keyWithIndexSchema = keySchema.add("index", LongType)
+      val valueSchema = if (stateFormatVersion == 1) {
+        inputValueAttributes
+      } else if (stateFormatVersion == 2 || stateFormatVersion == 3) {
+        inputValueAttributes :+ AttributeReference("matched", BooleanType)()
+      } else {
+        throw new IllegalArgumentException("Incorrect state format version! " +
+          s"version=$stateFormatVersion")
+      }
+      result += (getStateStoreName(joinSide, KeyWithIndexToValueType) ->
+        (keyWithIndexSchema, valueSchema.toStructType))
 
-    result
+      result
+    }
   }
 
   /** Retrieves the schemas used for join operator state stores that use 
column families */
@@ -1816,9 +1837,18 @@ object SymmetricHashJoinStateManager {
 
     schemas.map {
       case (colFamilyName, (keySchema, valueSchema)) =>
+        val keyStateEncoderSpec = if (stateFormatVersion == 4) {
+          if (colFamilyName == getStateStoreName(joinSide, 
KeyWithTsToValuesType)) {
+            TimestampAsPostfixKeyStateEncoderSpec(keySchema)
+          } else {
+            TimestampAsPrefixKeyStateEncoderSpec(keySchema)
+          }
+        } else {
+          NoPrefixKeyStateEncoderSpec(keySchema)
+        }
         colFamilyName -> StateStoreColFamilySchema(
           colFamilyName, 0, keySchema, 0, valueSchema,
-          Some(NoPrefixKeyStateEncoderSpec(keySchema))
+          Some(keyStateEncoderSpec)
         )
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 12881ce36806..8bb1f609b2b4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -611,6 +611,10 @@ object KeyStateEncoderSpec {
       case "PrefixKeyScanStateEncoderSpec" =>
         val numColsPrefixKey = m("numColsPrefixKey").asInstanceOf[BigInt].toInt
         PrefixKeyScanStateEncoderSpec(keySchema, numColsPrefixKey)
+      case "TimestampAsPostfixKeyStateEncoderSpec" =>
+        TimestampAsPostfixKeyStateEncoderSpec(keySchema)
+      case "TimestampAsPrefixKeyStateEncoderSpec" =>
+        TimestampAsPrefixKeyStateEncoderSpec(keySchema)
     }
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
index 6cdca9fb5309..62f45f644514 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
@@ -941,7 +941,7 @@ class StreamingInnerJoinSuite extends StreamingJoinSuite {
       .select($"key", $"window.end".cast("long"), $"leftValue", $"rightValue")
 
     val useVirtualColumnFamilies =
-      
spark.sessionState.conf.getConf(SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION) == 
3
+      
spark.sessionState.conf.getConf(SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION) >= 
3
     // Number of shuffle partitions being used is 3
     val numStateStoreInstances = if (useVirtualColumnFamilies) {
       // Only one state store is created per partition if we're using virtual 
column families
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinV4Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinV4Suite.scala
new file mode 100644
index 000000000000..48ce34f611c7
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinV4Suite.scala
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.hadoop.fs.Path
+import org.scalatest.{Args, Status, Tag}
+
+import 
org.apache.spark.sql.execution.streaming.checkpointing.CheckpointFileManager
+import 
org.apache.spark.sql.execution.streaming.operators.stateful.join.StreamingSymmetricHashJoinExec
+import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.tags.SlowSQLTest
+
+/**
+ * Trait that overrides test execution to run with state format version 4.
+ * V4 uses timestamp-based indexing with a secondary index and requires
+ * RocksDB with virtual column families. The innermost withSQLConf wins,
+ * so wrapping the test body overrides the V3 setting from the parent trait.
+ */
+trait TestWithV4StateFormat extends AlsoTestWithVirtualColumnFamilyJoins {
+
+  override def testWithVirtualColumnFamilyJoins(
+      testName: String, testTags: Tag*)(testBody: => Any): Unit = {
+    super.testWithVirtualColumnFamilyJoins(testName, testTags: _*) {
+      withSQLConf(
+        SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION.key -> "4"
+      ) {
+        testBody
+      }
+    }
+  }
+
+  // V4 always uses virtual column families, so skip non-VCF tests.
+  override def testWithoutVirtualColumnFamilyJoins(
+      testName: String, testTags: Tag*)(testBody: => Any): Unit = {}
+
+  // Use lazy val because the parent constructor registers tests before
+  // subclass vals are initialized.
+  private lazy val testsToSkip = Seq(
+    // V4's timestamp-based indexing does not support window structs
+    // in join keys.
+    "stream stream inner join on windows - with watermark",
+    // V4 uses 1 store with VCFs instead of V3's 4*partitions layout,
+    // so metric assertions about number of state store instances differ.
+    "SPARK-35896: metrics in StateOperatorProgress are output correctly",
+    // V4 uses different column families and encoder specs than V3;
+    // overridden in StreamingInnerJoinV4Suite with V4-specific assertions.
+    "SPARK-51779 Verify StateSchemaV3 writes correct key and value " +
+      "schemas for join operator"
+  )
+
+  override def runTest(testName: String, args: Args): Status = {
+    if (testsToSkip.exists(testName.contains)) {
+      org.scalatest.SucceededStatus
+    } else {
+      super.runTest(testName, args)
+    }
+  }
+}
+
+@SlowSQLTest
+class StreamingInnerJoinV4Suite
+  extends StreamingInnerJoinSuite
+  with TestWithV4StateFormat {
+
+  import testImplicits._
+
+  test("SPARK-55628: V4 state format is active in execution plan") {
+    val input1 = MemoryStream[Int]
+    val input2 = MemoryStream[Int]
+
+    val df1 = input1.toDF()
+      .select($"value" as "key", timestamp_seconds($"value") as "ts",
+        ($"value" * 2) as "leftValue")
+      .withWatermark("ts", "10 seconds")
+    val df2 = input2.toDF()
+      .select($"value" as "key", timestamp_seconds($"value") as "ts",
+        ($"value" * 3) as "rightValue")
+      .withWatermark("ts", "10 seconds")
+
+    val joined = df1.join(df2, Seq("key"), "inner")
+
+    testStream(joined)(
+      AddData(input1, 1),
+      CheckAnswer(),
+      Execute { q =>
+        val joinNodes = q.lastExecution.executedPlan.collect {
+          case j: StreamingSymmetricHashJoinExec => j
+        }
+        assert(joinNodes.length == 1)
+        assert(joinNodes.head.stateFormatVersion == 4)
+      },
+      StopStream
+    )
+  }
+
+  // V4 uses different column families (keyWithTsToValues, tsWithKey)
+  // with timestamp-based key encoder specs instead of V3's
+  // keyToNumValues/keyWithIndexToValue.
+  testWithVirtualColumnFamilyJoins(
+    "SPARK-55628: verify V4 state schema writes correct key and " +
+      "value schemas for join operator") {
+    withTempDir { checkpointDir =>
+      val input1 = MemoryStream[Int]
+      val input2 = MemoryStream[Int]
+
+      val df1 = input1.toDF()
+        .select($"value" as "key", ($"value" * 2) as "leftValue")
+      val df2 = input2.toDF()
+        .select($"value" as "key", ($"value" * 3) as "rightValue")
+      val joined = df1.join(df2, "key")
+
+      val metadataPathPostfix = "state/0/_stateSchema/default"
+      val stateSchemaPath =
+        new Path(checkpointDir.toString, s"$metadataPathPostfix")
+      val hadoopConf = spark.sessionState.newHadoopConf()
+      val fm =
+        CheckpointFileManager.create(stateSchemaPath, hadoopConf)
+
+      val keySchemaWithTimestamp = new StructType()
+        .add("field0", IntegerType, nullable = false)
+        .add("__event_time", LongType, nullable = false)
+
+      val leftValueSchema: StructType = new StructType()
+        .add("key", IntegerType, nullable = false)
+        .add("leftValue", IntegerType, nullable = false)
+        .add("matched", BooleanType)
+      val rightValueSchema: StructType = new StructType()
+        .add("key", IntegerType, nullable = false)
+        .add("rightValue", IntegerType, nullable = false)
+        .add("matched", BooleanType)
+
+      val dummyValueSchema =
+        StructType(Array(StructField("__dummy__", NullType)))
+
+      val schemaLeftPrimary = StateStoreColFamilySchema(
+        "left-keyWithTsToValues", 0,
+        keySchemaWithTimestamp, 0, leftValueSchema,
+        Some(TimestampAsPostfixKeyStateEncoderSpec(
+          keySchemaWithTimestamp)),
+        None
+      )
+      val schemaLeftSecondary = StateStoreColFamilySchema(
+        "left-tsWithKey", 0,
+        keySchemaWithTimestamp, 0, dummyValueSchema,
+        Some(TimestampAsPrefixKeyStateEncoderSpec(
+          keySchemaWithTimestamp)),
+        None
+      )
+      val schemaRightPrimary = StateStoreColFamilySchema(
+        "right-keyWithTsToValues", 0,
+        keySchemaWithTimestamp, 0, rightValueSchema,
+        Some(TimestampAsPostfixKeyStateEncoderSpec(
+          keySchemaWithTimestamp)),
+        None
+      )
+      val schemaRightSecondary = StateStoreColFamilySchema(
+        "right-tsWithKey", 0,
+        keySchemaWithTimestamp, 0, dummyValueSchema,
+        Some(TimestampAsPrefixKeyStateEncoderSpec(
+          keySchemaWithTimestamp)),
+        None
+      )
+
+      testStream(joined)(
+        StartStream(
+          checkpointLocation = checkpointDir.getCanonicalPath),
+        AddData(input1, 1),
+        CheckAnswer(),
+        AddData(input2, 1, 10),
+        CheckNewAnswer((1, 2, 3)),
+        Execute { q =>
+          val schemaFilePath =
+            fm.list(stateSchemaPath).toSeq.head.getPath
+          val providerId = StateStoreProviderId(
+            StateStoreId(checkpointDir.getCanonicalPath, 0, 0),
+            q.lastProgress.runId
+          )
+          val checker = new StateSchemaCompatibilityChecker(
+            providerId,
+            hadoopConf,
+            List(schemaFilePath)
+          )
+          val colFamilySeq = checker.readSchemaFile()
+          assert(colFamilySeq.length == 4)
+          assert(colFamilySeq.map(_.toString).toSet == Set(
+            schemaLeftPrimary, schemaLeftSecondary,
+            schemaRightPrimary, schemaRightSecondary
+          ).map(_.toString))
+        },
+        StopStream
+      )
+    }
+  }
+}
+
+@SlowSQLTest
+class StreamingOuterJoinV4Suite
+  extends StreamingOuterJoinSuite
+  with TestWithV4StateFormat
+
+@SlowSQLTest
+class StreamingFullOuterJoinV4Suite
+  extends StreamingFullOuterJoinSuite
+  with TestWithV4StateFormat
+
+@SlowSQLTest
+class StreamingLeftSemiJoinV4Suite
+  extends StreamingLeftSemiJoinSuite
+    with TestWithV4StateFormat


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to