This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 9a9c7140fad8 [SPARK-55628][SS] Integrate stream-stream join state
format V4
9a9c7140fad8 is described below
commit 9a9c7140fad894e7362c51ec02a629b2a595348b
Author: Nicholas Chew <[email protected]>
AuthorDate: Fri Mar 13 14:40:16 2026 +0900
[SPARK-55628][SS] Integrate stream-stream join state format V4
### What changes were proposed in this pull request?
Integrate stream-stream join state format V4 which uses timestamp-based
indexing with a secondary index.
Key changes:
- Enable V4 in `STREAMING_JOIN_STATE_FORMAT_VERSION` config
- Gated V4 behind `spark.sql.streaming.join.stateFormatV4.enabled` while V4
is under development.
- Route V4 to use VCF (`stateFormatVersion >= 3`) and hardcode schema
version 3 for VCF path
- Fix checkpoint ID routing for V4's single-store design
- Mark V4's secondary index (`TsWithKeyStore`) as `isInternal = true` to
prevent double-counting in `numRowsTotal` metrics
- Convert watermark from milliseconds to microseconds at all 4 eviction
call sites (V4 stores timestamps as `TimestampType`)
- Add `TimestampAsPostfixKeyStateEncoderSpec` and
`TimestampAsPrefixKeyStateEncoderSpec` to `KeyStateEncoderSpec.fromJson` for
checkpoint restart deserialization
- Add V4 branch in `getSchemaForStateStores` and
`getSchemasForStateStoreWithColFamily` for correct column family schemas and
encoder specs
### Why are the changes needed?
SPARK-55628 tracks the integration of V4 state format into the
stream-stream join operator. V4 was implemented in SPARK-55144 but not yet
wired into the operator.
### Does this PR introduce _any_ user-facing change?
No. V4 is gated behind an internal config
(`spark.sql.streaming.join.stateFormatVersion=4`, default remains 2). V4 is
marked as experimental and subject to change.
### How was this patch tested?
- Added `StreamingJoinV4Suite.scala` with 4 new test suites:
`StreamingInnerJoinV4Suite`, `StreamingOuterJoinV4Suite`,
`StreamingFullOuterJoinV4Suite`, `StreamingLeftSemiJoinV4Suite`
- All suites re-run existing join tests with V4 config via
`TestWithV4StateFormat` trait
- 2 V4-specific tests: plan assertion (verifies `stateFormatVersion == 4`
in execution plan) and schema validation (verifies correct column families and
encoder specs)
- 94/94 tests pass across all 4 suites
### Was this patch authored or co-authored using generative AI tooling?
Yes
### Behavioral Change Information
- [ ] This is a behavioral change
- [x] This is not a behavioral change
Closes #54777 from nicholaschew11/spark-55628-v4-join-integration.
Authored-by: Nicholas Chew <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../org/apache/spark/sql/internal/SQLConf.scala | 17 +-
.../join/StreamingSymmetricHashJoinExec.scala | 29 ++-
.../join/SymmetricHashJoinStateManager.scala | 86 +++++---
.../sql/execution/streaming/state/StateStore.scala | 4 +
.../spark/sql/streaming/StreamingJoinSuite.scala | 2 +-
.../spark/sql/streaming/StreamingJoinV4Suite.scala | 228 +++++++++++++++++++++
6 files changed, 324 insertions(+), 42 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 8c9796b71689..f8c2ebb75ddc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3133,14 +3133,23 @@ object SQLConf {
"State between versions are tend to be incompatible, so state format
version shouldn't " +
"be modified after running. Version 3 uses a single state store with
virtual column " +
"families instead of four stores and is only supported with RocksDB.
NOTE: version " +
- "1 is DEPRECATED and should not be explicitly set by users.")
+ "1 is DEPRECATED and should not be explicitly set by users. " +
+ "Version 4 is under development and only available for testing.")
.version("3.0.0")
.intConf
- // TODO: [SPARK-55628] Add version 4 once we integrate the state format
version 4 into
- // stream-stream join operator.
- .checkValue(v => Set(1, 2, 3).contains(v), "Valid versions are 1, 2, and
3")
+ .checkValue(v => Set(1, 2, 3, 4).contains(v), "Valid versions are 1, 2,
3, and 4")
.createWithDefault(2)
+ val STREAMING_JOIN_STATE_FORMAT_V4_ENABLED =
+ buildConf("spark.sql.streaming.join.stateFormatV4.enabled")
+ .internal()
+ .doc("When true, enables state format version 4 for stream-stream joins.
" +
+ "This config will be removed once V4 is complete.")
+ .version("4.2.0")
+ .withBindingPolicy(ConfigBindingPolicy.SESSION)
+ .booleanConf
+ .createWithDefaultFunction(() => Utils.isTesting)
+
val STREAMING_SESSION_WINDOW_MERGE_SESSIONS_IN_LOCAL_PARTITION =
buildConf("spark.sql.streaming.sessionWindow.merge.sessions.in.local.partition")
.doc("When true, streaming session window sorts and merge sessions in
local partition " +
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinExec.scala
index d8ad576bb68a..95acff230269 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinExec.scala
@@ -198,7 +198,7 @@ case class StreamingSymmetricHashJoinExec(
private val allowMultipleStatefulOperators =
conf.getConf(SQLConf.STATEFUL_OPERATOR_ALLOW_MULTIPLE)
- private val useVirtualColumnFamilies = stateFormatVersion == 3
+ private val useVirtualColumnFamilies = stateFormatVersion >= 3
// Determine the store names and metadata version based on format version
private val (numStoresPerPartition, _stateStoreNames,
_operatorStateMetadataVersion) =
@@ -292,8 +292,12 @@ case class StreamingSymmetricHashJoinExec(
val info = getStateInfo
val stateSchemaDir = stateSchemaDirPath()
+ // V4 uses VCF like V3, which requires schema version 3. The
stateSchemaVersion
+ // parameter may carry the stateFormatVersion (e.g. 4) from
IncrementalExecution,
+ // so we hardcode 3 here for the VCF path.
+ val effectiveSchemaVersion = 3
validateAndWriteStateSchema(
- hadoopConf, batchId, stateSchemaVersion, info, stateSchemaDir, session
+ hadoopConf, batchId, effectiveSchemaVersion, info, stateSchemaDir,
session
)
} else {
var result: Map[String, (StructType, StructType)] = Map.empty
@@ -437,7 +441,7 @@ case class StreamingSymmetricHashJoinExec(
removedRowIter.filterNot { kv =>
stateFormatVersion match {
case 1 => matchesWithRightSideState(new UnsafeRowPair(kv.key,
kv.value))
- case 2 | 3 => kv.matched
+ case 2 | 3 | 4 => kv.matched
case _ => throwBadStateFormatVersionException()
}
}.map(pair => joinedRow.withLeft(pair.value).withRight(nullRight))
@@ -463,7 +467,7 @@ case class StreamingSymmetricHashJoinExec(
removedRowIter.filterNot { kv =>
stateFormatVersion match {
case 1 => matchesWithLeftSideState(new UnsafeRowPair(kv.key,
kv.value))
- case 2 | 3 => kv.matched
+ case 2 | 3 | 4 => kv.matched
case _ => throwBadStateFormatVersionException()
}
}.map(pair => joinedRow.withLeft(nullLeft).withRight(pair.value))
@@ -479,7 +483,7 @@ case class StreamingSymmetricHashJoinExec(
case FullOuter =>
lazy val isKeyToValuePairMatched = (kv: KeyToValuePair) =>
stateFormatVersion match {
- case 2 | 3 => kv.matched
+ case 2 | 3 | 4 => kv.matched
case _ => throwBadStateFormatVersionException()
}
@@ -801,7 +805,7 @@ case class StreamingSymmetricHashJoinExec(
s.evictByKeyCondition(stateKeyWatermarkPredicateFunc)
case s: SupportsEvictByTimestamp =>
- s.evictByTimestamp(stateWatermark)
+ s.evictByTimestamp(watermarkMsToStateTimestamp(stateWatermark))
}
case Some(JoinStateValueWatermarkPredicate(_, stateWatermark)) =>
joinStateManager match {
@@ -809,7 +813,7 @@ case class StreamingSymmetricHashJoinExec(
s.evictByValueCondition(stateValueWatermarkPredicateFunc)
case s: SupportsEvictByTimestamp =>
- s.evictByTimestamp(stateWatermark)
+ s.evictByTimestamp(watermarkMsToStateTimestamp(stateWatermark))
}
case _ => 0L
}
@@ -833,7 +837,7 @@ case class StreamingSymmetricHashJoinExec(
s.evictAndReturnByKeyCondition(stateKeyWatermarkPredicateFunc)
case s: SupportsEvictByTimestamp =>
- s.evictAndReturnByTimestamp(stateWatermark)
+
s.evictAndReturnByTimestamp(watermarkMsToStateTimestamp(stateWatermark))
}
case Some(JoinStateValueWatermarkPredicate(_, stateWatermark)) =>
joinStateManager match {
@@ -841,12 +845,19 @@ case class StreamingSymmetricHashJoinExec(
s.evictAndReturnByValueCondition(stateValueWatermarkPredicateFunc)
case s: SupportsEvictByTimestamp =>
- s.evictAndReturnByTimestamp(stateWatermark)
+
s.evictAndReturnByTimestamp(watermarkMsToStateTimestamp(stateWatermark))
}
case _ => Iterator.empty
}
}
+ /**
+ * V4 stores timestamps in microseconds (TimestampType) while the watermark
+ * is tracked in milliseconds. Convert ms to microseconds for eviction
calls.
+ */
+ private def watermarkMsToStateTimestamp(watermarkMs: Long): Long =
+ watermarkMs * 1000
+
/** Commit changes to the buffer state */
def commitState(): Unit = {
joinStateManager.commit()
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala
index 9aa41e196659..ae18c49472f7 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import
org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperatorStateInfo,
StatefulOpStateStoreCheckpointInfo, WatermarkSupport}
import
org.apache.spark.sql.execution.streaming.operators.stateful.join.StreamingSymmetricHashJoinHelper._
import
org.apache.spark.sql.execution.streaming.state.{DropLastNFieldsStatePartitionKeyExtractor,
KeyStateEncoderSpec, NoopStatePartitionKeyExtractor,
NoPrefixKeyStateEncoderSpec, StatePartitionKeyExtractor, StateSchemaBroadcast,
StateStore, StateStoreCheckpointInfo, StateStoreColFamilySchema,
StateStoreConf, StateStoreErrors, StateStoreId, StateStoreMetrics,
StateStoreProvider, StateStoreProviderId, SupportsFineGrainedReplay,
TimestampAsPostfixKeyStateEncoderSpec, TimestampAsPrefixKeySt [...]
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{BooleanType, DataType, LongType, NullType,
StructField, StructType}
import org.apache.spark.util.NextIterator
@@ -252,9 +253,9 @@ class SymmetricHashJoinStateManagerV4(
Seq(StructField("dummy", NullType, nullable = true))
)
- // TODO: [SPARK-55628] Below two fields need to be handled properly during
integration with
- // the operator.
- private val stateStoreCkptId: Option[String] = None
+ // V4 uses a single store with VCFs (not separate
keyToNumValues/keyWithIndexToValue stores).
+ // Use the keyToNumValues checkpoint ID for loading the correct committed
version.
+ private val stateStoreCkptId: Option[String] = keyToNumValuesStateStoreCkptId
private val handlerSnapshotOptions: Option[HandlerSnapshotOptions] = None
private var stateStoreProvider: StateStoreProvider = _
@@ -496,7 +497,7 @@ class SymmetricHashJoinStateManagerV4(
private val attachTimestampProjection: UnsafeProjection =
TimestampKeyStateEncoder.getAttachTimestampProjection(keySchema)
- // Create the specific column family in the store for this join side's
KeyWithIndexToValueStore
+ // Create the specific column family in the store for this join side's
KeyWithTsToValuesStore.
stateStore.createColFamilyIfAbsent(
colFamilyName,
keySchema,
@@ -648,13 +649,15 @@ class SymmetricHashJoinStateManagerV4(
private val attachTimestampProjection: UnsafeProjection =
TimestampKeyStateEncoder.getAttachTimestampProjection(keySchema)
- // Create the specific column family in the store for this join side's
KeyWithIndexToValueStore
+ // Create the specific column family in the store for this join side's
TsWithKeyStore.
+ // Mark as internal so that numKeys counts only primary data, not the
secondary index.
stateStore.createColFamilyIfAbsent(
colFamilyName,
keySchema,
valueStructType,
TimestampAsPrefixKeyStateEncoderSpec(keySchemaWithTimestamp),
- useMultipleValuesPerKey = true
+ useMultipleValuesPerKey = true,
+ isInternal = true
)
private def createKeyRow(key: UnsafeRow, timestamp: Long): UnsafeRow = {
@@ -1311,8 +1314,8 @@ abstract class SymmetricHashJoinStateManagerBase(
val handlerSnapshotOptions: Option[HandlerSnapshotOptions] = None)
extends StateStoreHandler(
KeyToNumValuesType, keyToNumValuesStateStoreCkptId,
handlerSnapshotOptions) {
-SnapshotOptions
- private val useVirtualColumnFamilies = stateFormatVersion == 3
+
+ private val useVirtualColumnFamilies = stateFormatVersion >= 3
private val longValueSchema = new StructType().add("value", "long")
private val longToUnsafeRow = UnsafeProjection.create(longValueSchema)
private val valueRow = longToUnsafeRow(new
SpecificInternalRow(longValueSchema))
@@ -1411,7 +1414,7 @@ SnapshotOptions
extends StateStoreHandler(
KeyWithIndexToValueType, keyWithIndexToValueStateStoreCkptId,
handlerSnapshotOptions) {
- private val useVirtualColumnFamilies = stateFormatVersion == 3
+ private val useVirtualColumnFamilies = stateFormatVersion >= 3
private val keyWithIndexExprs = keyAttributes :+ Literal(1L)
private val keyWithIndexSchema = keySchema.add("index", LongType)
private val indexOrdinalInKeyWithIndexRow = keyAttributes.size
@@ -1744,6 +1747,8 @@ object SymmetricHashJoinStateManager {
snapshotOptions: Option[SnapshotOptions] = None,
joinStoreGenerator: JoinStateManagerStoreGenerator):
SymmetricHashJoinStateManager = {
if (stateFormatVersion == 4) {
+
require(SQLConf.get.getConf(SQLConf.STREAMING_JOIN_STATE_FORMAT_V4_ENABLED),
+ "State format version 4 is under development.")
new SymmetricHashJoinStateManagerV4(
joinSide, inputValueAttributes, joinKeys, stateInfo, storeConf,
hadoopConf,
partitionId, keyToNumValuesStateStoreCkptId,
keyWithIndexToValueStateStoreCkptId,
@@ -1780,28 +1785,44 @@ object SymmetricHashJoinStateManager {
inputValueAttributes: Seq[Attribute],
joinKeys: Seq[Expression],
stateFormatVersion: Int): Map[String, (StructType, StructType)] = {
- var result: Map[String, (StructType, StructType)] = Map.empty
-
- // get the key and value schema for the KeyToNumValues state store
val keySchema = StructType(
joinKeys.zipWithIndex.map { case (k, i) => StructField(s"field$i",
k.dataType, k.nullable) })
- val longValueSchema = new StructType().add("value", "long")
- result += (getStateStoreName(joinSide, KeyToNumValuesType) -> (keySchema,
longValueSchema))
-
- // get the key and value schema for the KeyWithIndexToValue state store
- val keyWithIndexSchema = keySchema.add("index", LongType)
- val valueSchema = if (stateFormatVersion == 1) {
- inputValueAttributes
- } else if (stateFormatVersion == 2 || stateFormatVersion == 3) {
- inputValueAttributes :+ AttributeReference("matched", BooleanType)()
+
+ if (stateFormatVersion == 4) {
+ // V4 uses two column families: KeyWithTsToValues and TsWithKey
+ val keySchemaWithTimestamp =
+ TimestampKeyStateEncoder.keySchemaWithTimestamp(keySchema)
+ val valueWithMatchedSchema =
+ (inputValueAttributes :+ AttributeReference("matched",
BooleanType)()).toStructType
+ val dummyValueSchema = StructType(Array(StructField("__dummy__",
NullType)))
+
+ Map(
+ getStateStoreName(joinSide, KeyWithTsToValuesType) ->
+ (keySchemaWithTimestamp, valueWithMatchedSchema),
+ getStateStoreName(joinSide, TsWithKeyType) ->
+ (keySchemaWithTimestamp, dummyValueSchema))
} else {
- throw new IllegalArgumentException("Incorrect state format version! " +
- s"version=$stateFormatVersion")
- }
- result += (getStateStoreName(joinSide, KeyWithIndexToValueType) ->
- (keyWithIndexSchema, valueSchema.toStructType))
+ var result: Map[String, (StructType, StructType)] = Map.empty
+
+ // get the key and value schema for the KeyToNumValues state store
+ val longValueSchema = new StructType().add("value", "long")
+ result += (getStateStoreName(joinSide, KeyToNumValuesType) ->
(keySchema, longValueSchema))
+
+ // get the key and value schema for the KeyWithIndexToValue state store
+ val keyWithIndexSchema = keySchema.add("index", LongType)
+ val valueSchema = if (stateFormatVersion == 1) {
+ inputValueAttributes
+ } else if (stateFormatVersion == 2 || stateFormatVersion == 3) {
+ inputValueAttributes :+ AttributeReference("matched", BooleanType)()
+ } else {
+ throw new IllegalArgumentException("Incorrect state format version! " +
+ s"version=$stateFormatVersion")
+ }
+ result += (getStateStoreName(joinSide, KeyWithIndexToValueType) ->
+ (keyWithIndexSchema, valueSchema.toStructType))
- result
+ result
+ }
}
/** Retrieves the schemas used for join operator state stores that use
column families */
@@ -1816,9 +1837,18 @@ object SymmetricHashJoinStateManager {
schemas.map {
case (colFamilyName, (keySchema, valueSchema)) =>
+ val keyStateEncoderSpec = if (stateFormatVersion == 4) {
+ if (colFamilyName == getStateStoreName(joinSide,
KeyWithTsToValuesType)) {
+ TimestampAsPostfixKeyStateEncoderSpec(keySchema)
+ } else {
+ TimestampAsPrefixKeyStateEncoderSpec(keySchema)
+ }
+ } else {
+ NoPrefixKeyStateEncoderSpec(keySchema)
+ }
colFamilyName -> StateStoreColFamilySchema(
colFamilyName, 0, keySchema, 0, valueSchema,
- Some(NoPrefixKeyStateEncoderSpec(keySchema))
+ Some(keyStateEncoderSpec)
)
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 12881ce36806..8bb1f609b2b4 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -611,6 +611,10 @@ object KeyStateEncoderSpec {
case "PrefixKeyScanStateEncoderSpec" =>
val numColsPrefixKey = m("numColsPrefixKey").asInstanceOf[BigInt].toInt
PrefixKeyScanStateEncoderSpec(keySchema, numColsPrefixKey)
+ case "TimestampAsPostfixKeyStateEncoderSpec" =>
+ TimestampAsPostfixKeyStateEncoderSpec(keySchema)
+ case "TimestampAsPrefixKeyStateEncoderSpec" =>
+ TimestampAsPrefixKeyStateEncoderSpec(keySchema)
}
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
index 6cdca9fb5309..62f45f644514 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
@@ -941,7 +941,7 @@ class StreamingInnerJoinSuite extends StreamingJoinSuite {
.select($"key", $"window.end".cast("long"), $"leftValue", $"rightValue")
val useVirtualColumnFamilies =
-
spark.sessionState.conf.getConf(SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION) ==
3
+
spark.sessionState.conf.getConf(SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION) >=
3
// Number of shuffle partitions being used is 3
val numStateStoreInstances = if (useVirtualColumnFamilies) {
// Only one state store is created per partition if we're using virtual
column families
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinV4Suite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinV4Suite.scala
new file mode 100644
index 000000000000..48ce34f611c7
--- /dev/null
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinV4Suite.scala
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.hadoop.fs.Path
+import org.scalatest.{Args, Status, Tag}
+
+import
org.apache.spark.sql.execution.streaming.checkpointing.CheckpointFileManager
+import
org.apache.spark.sql.execution.streaming.operators.stateful.join.StreamingSymmetricHashJoinExec
+import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.tags.SlowSQLTest
+
+/**
+ * Trait that overrides test execution to run with state format version 4.
+ * V4 uses timestamp-based indexing with a secondary index and requires
+ * RocksDB with virtual column families. The innermost withSQLConf wins,
+ * so wrapping the test body overrides the V3 setting from the parent trait.
+ */
+trait TestWithV4StateFormat extends AlsoTestWithVirtualColumnFamilyJoins {
+
+ override def testWithVirtualColumnFamilyJoins(
+ testName: String, testTags: Tag*)(testBody: => Any): Unit = {
+ super.testWithVirtualColumnFamilyJoins(testName, testTags: _*) {
+ withSQLConf(
+ SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION.key -> "4"
+ ) {
+ testBody
+ }
+ }
+ }
+
+ // V4 always uses virtual column families, so skip non-VCF tests.
+ override def testWithoutVirtualColumnFamilyJoins(
+ testName: String, testTags: Tag*)(testBody: => Any): Unit = {}
+
+ // Use lazy val because the parent constructor registers tests before
+ // subclass vals are initialized.
+ private lazy val testsToSkip = Seq(
+ // V4's timestamp-based indexing does not support window structs
+ // in join keys.
+ "stream stream inner join on windows - with watermark",
+ // V4 uses 1 store with VCFs instead of V3's 4*partitions layout,
+ // so metric assertions about number of state store instances differ.
+ "SPARK-35896: metrics in StateOperatorProgress are output correctly",
+ // V4 uses different column families and encoder specs than V3;
+ // overridden in StreamingInnerJoinV4Suite with V4-specific assertions.
+ "SPARK-51779 Verify StateSchemaV3 writes correct key and value " +
+ "schemas for join operator"
+ )
+
+ override def runTest(testName: String, args: Args): Status = {
+ if (testsToSkip.exists(testName.contains)) {
+ org.scalatest.SucceededStatus
+ } else {
+ super.runTest(testName, args)
+ }
+ }
+}
+
+@SlowSQLTest
+class StreamingInnerJoinV4Suite
+ extends StreamingInnerJoinSuite
+ with TestWithV4StateFormat {
+
+ import testImplicits._
+
+ test("SPARK-55628: V4 state format is active in execution plan") {
+ val input1 = MemoryStream[Int]
+ val input2 = MemoryStream[Int]
+
+ val df1 = input1.toDF()
+ .select($"value" as "key", timestamp_seconds($"value") as "ts",
+ ($"value" * 2) as "leftValue")
+ .withWatermark("ts", "10 seconds")
+ val df2 = input2.toDF()
+ .select($"value" as "key", timestamp_seconds($"value") as "ts",
+ ($"value" * 3) as "rightValue")
+ .withWatermark("ts", "10 seconds")
+
+ val joined = df1.join(df2, Seq("key"), "inner")
+
+ testStream(joined)(
+ AddData(input1, 1),
+ CheckAnswer(),
+ Execute { q =>
+ val joinNodes = q.lastExecution.executedPlan.collect {
+ case j: StreamingSymmetricHashJoinExec => j
+ }
+ assert(joinNodes.length == 1)
+ assert(joinNodes.head.stateFormatVersion == 4)
+ },
+ StopStream
+ )
+ }
+
+ // V4 uses different column families (keyWithTsToValues, tsWithKey)
+ // with timestamp-based key encoder specs instead of V3's
+ // keyToNumValues/keyWithIndexToValue.
+ testWithVirtualColumnFamilyJoins(
+ "SPARK-55628: verify V4 state schema writes correct key and " +
+ "value schemas for join operator") {
+ withTempDir { checkpointDir =>
+ val input1 = MemoryStream[Int]
+ val input2 = MemoryStream[Int]
+
+ val df1 = input1.toDF()
+ .select($"value" as "key", ($"value" * 2) as "leftValue")
+ val df2 = input2.toDF()
+ .select($"value" as "key", ($"value" * 3) as "rightValue")
+ val joined = df1.join(df2, "key")
+
+ val metadataPathPostfix = "state/0/_stateSchema/default"
+ val stateSchemaPath =
+ new Path(checkpointDir.toString, s"$metadataPathPostfix")
+ val hadoopConf = spark.sessionState.newHadoopConf()
+ val fm =
+ CheckpointFileManager.create(stateSchemaPath, hadoopConf)
+
+ val keySchemaWithTimestamp = new StructType()
+ .add("field0", IntegerType, nullable = false)
+ .add("__event_time", LongType, nullable = false)
+
+ val leftValueSchema: StructType = new StructType()
+ .add("key", IntegerType, nullable = false)
+ .add("leftValue", IntegerType, nullable = false)
+ .add("matched", BooleanType)
+ val rightValueSchema: StructType = new StructType()
+ .add("key", IntegerType, nullable = false)
+ .add("rightValue", IntegerType, nullable = false)
+ .add("matched", BooleanType)
+
+ val dummyValueSchema =
+ StructType(Array(StructField("__dummy__", NullType)))
+
+ val schemaLeftPrimary = StateStoreColFamilySchema(
+ "left-keyWithTsToValues", 0,
+ keySchemaWithTimestamp, 0, leftValueSchema,
+ Some(TimestampAsPostfixKeyStateEncoderSpec(
+ keySchemaWithTimestamp)),
+ None
+ )
+ val schemaLeftSecondary = StateStoreColFamilySchema(
+ "left-tsWithKey", 0,
+ keySchemaWithTimestamp, 0, dummyValueSchema,
+ Some(TimestampAsPrefixKeyStateEncoderSpec(
+ keySchemaWithTimestamp)),
+ None
+ )
+ val schemaRightPrimary = StateStoreColFamilySchema(
+ "right-keyWithTsToValues", 0,
+ keySchemaWithTimestamp, 0, rightValueSchema,
+ Some(TimestampAsPostfixKeyStateEncoderSpec(
+ keySchemaWithTimestamp)),
+ None
+ )
+ val schemaRightSecondary = StateStoreColFamilySchema(
+ "right-tsWithKey", 0,
+ keySchemaWithTimestamp, 0, dummyValueSchema,
+ Some(TimestampAsPrefixKeyStateEncoderSpec(
+ keySchemaWithTimestamp)),
+ None
+ )
+
+ testStream(joined)(
+ StartStream(
+ checkpointLocation = checkpointDir.getCanonicalPath),
+ AddData(input1, 1),
+ CheckAnswer(),
+ AddData(input2, 1, 10),
+ CheckNewAnswer((1, 2, 3)),
+ Execute { q =>
+ val schemaFilePath =
+ fm.list(stateSchemaPath).toSeq.head.getPath
+ val providerId = StateStoreProviderId(
+ StateStoreId(checkpointDir.getCanonicalPath, 0, 0),
+ q.lastProgress.runId
+ )
+ val checker = new StateSchemaCompatibilityChecker(
+ providerId,
+ hadoopConf,
+ List(schemaFilePath)
+ )
+ val colFamilySeq = checker.readSchemaFile()
+ assert(colFamilySeq.length == 4)
+ assert(colFamilySeq.map(_.toString).toSet == Set(
+ schemaLeftPrimary, schemaLeftSecondary,
+ schemaRightPrimary, schemaRightSecondary
+ ).map(_.toString))
+ },
+ StopStream
+ )
+ }
+ }
+}
+
+@SlowSQLTest
+class StreamingOuterJoinV4Suite
+ extends StreamingOuterJoinSuite
+ with TestWithV4StateFormat
+
+@SlowSQLTest
+class StreamingFullOuterJoinV4Suite
+ extends StreamingFullOuterJoinSuite
+ with TestWithV4StateFormat
+
+@SlowSQLTest
+class StreamingLeftSemiJoinV4Suite
+ extends StreamingLeftSemiJoinSuite
+ with TestWithV4StateFormat
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]