This is an automated email from the ASF dual-hosted git repository.
HeartSaVioR pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.x by this push:
new 3686ee0e5242 [SPARK-56539][SS] Add state row format validation to
prefixScan and rangeScan
3686ee0e5242 is described below
commit 3686ee0e5242ece013c03aa6a9d0b5af7096c82d
Author: Anupam Yadav <[email protected]>
AuthorDate: Wed May 20 06:50:15 2026 +0900
[SPARK-56539][SS] Add state row format validation to prefixScan and
rangeScan
### What changes were proposed in this pull request?
This PR adds `validateStateRowFormat` calls to `prefixScan` and `rangeScan`
in `RocksDBStateStoreProvider`, matching the existing pattern used by `get()`,
`prefixScanWithMultiValues`, and `rangeScanWithMultiValues`.
### Why are the changes needed?
`prefixScan` and `rangeScan` were missing the `validateStateRowFormat`
check that all other read operations perform. This was identified during the
review of SPARK-56369 (PR #55226) by viirya and filed as a follow-up by
HeartSaVioR.
Without this fix, state row format corruption could go undetected when data
is read through `prefixScan` or `rangeScan` without a prior `get()` call.
### Does this PR introduce _any_ user-facing change?
No. This is an internal validation improvement.
### How was this patch tested?
Added two new tests in `RocksDBStateStoreSuite`:
- `SPARK-56539: validate state row format in prefixScan`
- `SPARK-56539: validate state row format in rangeScan`
Both tests exercise the validation code path by calling
`prefixScan`/`rangeScan` without a prior `get()` call (ensuring `isValidated`
is false).
### Was this patch authored or co-authored using generative AI tooling?
Yes. The implementation approach was brainstormed with AI. The tests were
generated and run by AI with human review.
Model: Claude Opus 4.7
Closes #55468 from yadavay-amzn/fix/SPARK-56539-state-row-validation.
Authored-by: Anupam Yadav <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../state/RocksDBStateStoreProvider.scala | 10 +++
.../streaming/state/RocksDBStateStoreSuite.scala | 82 ++++++++++++++++++++++
2 files changed, 92 insertions(+)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
index b3d734c71f91..c181130eec94 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
@@ -516,6 +516,11 @@ private[sql] class RocksDBStateStoreProvider
val iter = rocksDbIter.map { kv =>
rowPair.withRows(kvEncoder._1.decodeKey(kv.key),
kvEncoder._2.decodeValue(kv.value))
+ if (!isValidated && rowPair.value != null && !useColumnFamilies) {
+ StateStoreProvider.validateStateRowFormat(
+ rowPair.key, keySchema, rowPair.value, valueSchema, stateStoreId,
storeConf)
+ isValidated = true
+ }
rowPair
}
@@ -575,6 +580,11 @@ private[sql] class RocksDBStateStoreProvider
val iter = rocksDbIter.map { kv =>
rowPair.withRows(kvEncoder._1.decodeKey(kv.key),
kvEncoder._2.decodeValue(kv.value))
+ if (!isValidated && rowPair.value != null && !useColumnFamilies) {
+ StateStoreProvider.validateStateRowFormat(
+ rowPair.key, keySchema, rowPair.value, valueSchema, stateStoreId,
storeConf)
+ isValidated = true
+ }
rowPair
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
index ed8c22740b32..e501366b7f98 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
@@ -1856,6 +1856,88 @@ class RocksDBStateStoreSuite extends
StateStoreSuiteBase[RocksDBStateStoreProvid
}
}
+ test("SPARK-56539: prefixScan triggers validateStateRowFormat on schema
mismatch") {
+ // Write data with correct schema, then reopen with a mismatched
valueSchema.
+ // prefixScan should trigger validateStateRowFormat and throw on the first
iteration.
+ val storeId = StateStoreId(newDir(), Random.nextInt(), 0)
+ val conf = new Configuration
+ conf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString)
+
+ // Step 1: Write data with correct schema and commit
+ val provider1 = new RocksDBStateStoreProvider()
+ provider1.init(storeId, keySchema, valueSchema,
+ PrefixKeyScanStateEncoderSpec(keySchema, 1), useColumnFamilies = false,
+ new StateStoreConf(SQLConf.get), conf, useMultipleValuesPerKey = false,
+ stateSchemaProvider = Some(new TestStateSchemaProvider))
+ val store1 = provider1.getStore(0)
+ store1.put(dataToKeyRow("a", 1), dataToValueRow(1))
+ store1.commit()
+ provider1.close()
+
+ // Step 2: Reopen with a wrong valueSchema (StringType instead of
IntegerType)
+ // The stored IntegerType value bytes will be misinterpreted as a
variable-length
+ // StringType offset/size, causing structural integrity validation to fail.
+ val wrongValueSchema = StructType(Seq(StructField("v1", StringType, true)))
+ val provider2 = new RocksDBStateStoreProvider()
+ provider2.init(storeId, keySchema, wrongValueSchema,
+ PrefixKeyScanStateEncoderSpec(keySchema, 1), useColumnFamilies = false,
+ new StateStoreConf(SQLConf.get), conf, useMultipleValuesPerKey = false,
+ stateSchemaProvider = Some(new TestStateSchemaProvider))
+ val store2 = provider2.getStore(1)
+ try {
+ // prefixScan should trigger validation and throw because stored value
bytes
+ // are not structurally valid for the declared StringType schema
+ intercept[StateStoreValueRowFormatValidationFailure] {
+ store2.prefixScan(dataToPrefixKeyRow("a")).toSeq
+ }
+ } finally {
+ store2.abort()
+ provider2.close()
+ }
+ }
+
+ test("SPARK-56539: rangeScan triggers validateStateRowFormat on schema
mismatch") {
+ // Write data with correct schema, then reopen with a mismatched
valueSchema.
+ // rangeScan should trigger validateStateRowFormat and throw on the first
iteration.
+ val storeId = StateStoreId(newDir(), Random.nextInt(), 0)
+ val conf = new Configuration
+ conf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString)
+
+ // Step 1: Write data with correct schema and commit
+ val provider1 = new RocksDBStateStoreProvider()
+ provider1.init(storeId, keySchemaWithRangeScan, valueSchema,
+ RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)),
+ useColumnFamilies = false,
+ new StateStoreConf(SQLConf.get), conf, useMultipleValuesPerKey = false,
+ stateSchemaProvider = Some(new TestStateSchemaProvider))
+ val store1 = provider1.getStore(0)
+ store1.put(dataToKeyRowWithRangeScan(10L, "a"), dataToValueRow(10))
+ store1.commit()
+ provider1.close()
+
+ // Step 2: Reopen with a wrong valueSchema (StringType instead of
IntegerType)
+ val wrongValueSchema = StructType(Seq(StructField("v1", StringType, true)))
+ val provider2 = new RocksDBStateStoreProvider()
+ provider2.init(storeId, keySchemaWithRangeScan, wrongValueSchema,
+ RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)),
+ useColumnFamilies = false,
+ new StateStoreConf(SQLConf.get), conf, useMultipleValuesPerKey = false,
+ stateSchemaProvider = Some(new TestStateSchemaProvider))
+ val store2 = provider2.getStore(1)
+ try {
+ // rangeScan should trigger validation and throw because stored value
bytes
+ // are not structurally valid for the declared StringType schema
+ intercept[StateStoreValueRowFormatValidationFailure] {
+ store2.rangeScan(
+ Some(dataToKeyRowWithRangeScan(10L, "a")),
+ Some(dataToKeyRowWithRangeScan(20L, "a"))).toSeq
+ }
+ } finally {
+ store2.abort()
+ provider2.close()
+ }
+ }
+
testWithColumnFamiliesAndEncodingTypes(
"rocksdb key and value schema encoders for column families",
TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled
=>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]