This is an automated email from the ASF dual-hosted git repository.
ashrigondekar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 117e17114d51 [SPARK-55510][SS] Fix deleteRange of Rocksdb state store
to call changelogWriter
117e17114d51 is described below
commit 117e17114d516d54950d213298cb0b39bf0377a9
Author: zeruibao <[email protected]>
AuthorDate: Fri Feb 27 12:57:36 2026 -0800
[SPARK-55510][SS] Fix deleteRange of Rocksdb state store to call
changelogWriter
### What changes were proposed in this pull request?
This PR adds changelog writer support for deleteRange in the RocksDB state
store and exposes delete range operations through the State Data Source change
feed via a new end_key column. Previously, deleteRange only performed the
RocksDB native range deletion but did not record the operation in the changelog
file.
The changes include:
#### Changelog write/replay support:
- Added a new DELETE_RANGE_RECORD record type (byte 0x20) to the RecordType
enum in StateStoreChangelog.scala
- Added an abstract deleteRange(beginKey, endKey) method to
StateStoreChangelogWriter, implemented in V2/V4 writers (V1/V3 throw
UnsupportedOperationException, consistent with merge)
- Updated StateStoreChangelogReaderV2 to parse DELETE_RANGE_RECORD entries
- Updated RocksDB.deleteRange to write to the changelog after the native
db.deleteRange call, with checksum encoding for endKey when rowChecksumEnabled
is true
- Updated RocksDB.replayChangelog to handle DELETE_RANGE_RECORD by
verifying checksums and calling deleteRange during recovery
#### State Data Source change feed integration:
- Added a new end_key column (with key schema) to the State Data Source
change feed output schema for all state variable types (generic, ValueState,
ListState, MapState)
- Updated the StateStoreChangeDataReader return type from a 4-tuple to a
5-tuple (RecordType, key, value, endKey, batchId) to carry the end key
- Updated RocksDBStateStoreChangeDataReader.getNext() to decode both
beginKey and endKey for DELETE_RANGE_RECORD, with proper column family prefix
stripping and a .copy() on beginKey to avoid buffer reuse issues with
RangeKeyScanStateEncoder
- Updated HDFSBackedStateStoreChangeDataReader.getNext() to conform to the
new 5-tuple (endKey is always null since HDFS does not support deleteRange)
- Updated StateStoreChangeDataPartitionReader to map the new endKey field
to the end_key schema column for both regular state types and MapState
### Why are the changes needed?
When changelog checkpointing is enabled, the state store recovers by
replaying changelog files rather than loading full snapshots. Since deleteRange
was not recorded in the changelog, any range deletions were silently lost
during changelog-based recovery, leading to data inconsistency -- keys that
should have been deleted would reappear after a restart.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT
### Was this patch authored or co-authored using generative AI tooling?
Yes, co-authored with Cursor
Closes #54298 from
zeruibao/zeruibao/SPARK-55510-fix-delete-range-to-call-changlog-writer.
Authored-by: zeruibao <[email protected]>
Signed-off-by: Anish Shrigondekar <[email protected]>
---
.../sql/execution/streaming/state/RocksDB.scala | 43 ++++++++++---
.../state/RocksDBStateStoreProvider.scala | 5 ++
.../sql/execution/streaming/state/StateStore.scala | 1 -
.../streaming/state/StateStoreChangelog.scala | 25 ++++++++
.../streaming/state/RocksDBStateStoreSuite.scala | 74 +++++++++++++++++++++-
5 files changed, 138 insertions(+), 10 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 69a7e9618bb3..a17c788d3207 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -981,6 +981,12 @@ class RocksDB(
verifyChangelogRecord(kvVerifier, key, Some(value))
merge(key, value, includesPrefix = useColumnFamilies,
deriveCfName = useColumnFamilies, includesChecksum =
conf.rowChecksumEnabled)
+
+ case RecordType.DELETE_RANGE_RECORD =>
+ // For deleteRange, 'key' is beginKey and 'value' is endKey
+ verifyChangelogRecord(kvVerifier, key, Some(value))
+ deleteRange(key, value, includesPrefix = useColumnFamilies,
+ includesChecksum = conf.rowChecksumEnabled)
}
}
} finally {
@@ -1438,29 +1444,50 @@ class RocksDB(
* Delete all keys in the range [beginKey, endKey).
* Uses RocksDB's native deleteRange for efficient bulk deletion.
*
- * @param beginKey The start key of the range (inclusive)
- * @param endKey The end key of the range (exclusive)
- * @param cfName The column family name
+ * @param beginKey The start key of the range (inclusive)
+ * @param endKey The end key of the range (exclusive)
+ * @param cfName The column family name
+ * @param includesPrefix Whether the keys already include the column family
prefix.
+ * Set to true during changelog replay to avoid
double-encoding.
*/
def deleteRange(
beginKey: Array[Byte],
endKey: Array[Byte],
- cfName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
+ cfName: String = StateStore.DEFAULT_COL_FAMILY_NAME,
+ includesPrefix: Boolean = false,
+ includesChecksum: Boolean = false): Unit = {
updateMemoryUsageIfNeeded()
- val beginKeyWithPrefix = if (useColumnFamilies) {
+ val originalEndKey = if (conf.rowChecksumEnabled && includesChecksum) {
+ KeyValueChecksumEncoder.decodeAndVerifyValueRowWithChecksum(
+ readVerifier, beginKey, endKey, delimiterSize)
+ } else {
+ endKey
+ }
+
+ val beginKeyWithPrefix = if (useColumnFamilies && !includesPrefix) {
encodeStateRowWithPrefix(beginKey, cfName)
} else {
beginKey
}
- val endKeyWithPrefix = if (useColumnFamilies) {
- encodeStateRowWithPrefix(endKey, cfName)
+ val endKeyWithPrefix = if (useColumnFamilies && !includesPrefix) {
+ encodeStateRowWithPrefix(originalEndKey, cfName)
} else {
- endKey
+ originalEndKey
}
db.deleteRange(writeOptions, beginKeyWithPrefix, endKeyWithPrefix)
+ changelogWriter.foreach { writer =>
+ val endKeyForChangelog = if (conf.rowChecksumEnabled) {
+ KeyValueChecksumEncoder.encodeValueRowWithChecksum(endKeyWithPrefix,
+ KeyValueChecksum.create(beginKeyWithPrefix, Some(endKeyWithPrefix)))
+ } else {
+ endKeyWithPrefix
+ }
+ writer.deleteRange(beginKeyWithPrefix, endKeyForChangelog)
+ }
+ // TODO: Add metrics update for deleteRange operations
}
/**
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
index d8e0440e210d..1dbf948e0434 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
@@ -1495,6 +1495,11 @@ class RocksDBStateStoreChangeDataReader(
}
val nextRecord = reader.next()
+ if (nextRecord._1 == RecordType.DELETE_RANGE_RECORD) {
+ throw new UnsupportedOperationException(
+ "DELETE_RANGE_RECORD is not supported in the state data source
change feed. " +
+ "Range deletions cannot be expanded into individual key-value
change records.")
+ }
val keyBytes = if (storeConf.rowChecksumEnabled
&& nextRecord._1 == RecordType.DELETE_RECORD) {
// remove checksum and decode to the original key
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index e2d7c166a675..12881ce36806 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -287,7 +287,6 @@ trait StateStore extends ReadStateStore {
* Delete all keys in the range [beginKey, endKey).
* Uses RocksDB's native deleteRange for efficient bulk deletion.
*
- * @note This operation is NOT recorded in the changelog.
* @param beginKey The start key of the range (inclusive)
* @param endKey The end key of the range (exclusive)
* @param colFamilyName The column family name
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
index 2029c0988756..3c611e7a0de4 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
@@ -46,6 +46,7 @@ object RecordType extends Enumeration {
val PUT_RECORD = Value("put_record")
val DELETE_RECORD = Value("delete_record")
val MERGE_RECORD = Value("merge_record")
+ val DELETE_RANGE_RECORD = Value("delete_range_record")
// Generate byte representation of each record type
def getRecordTypeAsByte(recordType: RecordType): Byte = {
@@ -54,6 +55,7 @@ object RecordType extends Enumeration {
case PUT_RECORD => 0x01.toByte
case DELETE_RECORD => 0x10.toByte
case MERGE_RECORD => 0x11.toByte
+ case DELETE_RANGE_RECORD => 0x20.toByte
}
}
@@ -62,6 +64,7 @@ object RecordType extends Enumeration {
case PUT_RECORD => "update"
case DELETE_RECORD => "delete"
case MERGE_RECORD => "append"
+ case DELETE_RANGE_RECORD => "delete_range"
case _ => throw StateStoreErrors.unsupportedOperationException(
"getRecordTypeAsString", recordType.toString)
}
@@ -74,6 +77,7 @@ object RecordType extends Enumeration {
case 0x01 => PUT_RECORD
case 0x10 => DELETE_RECORD
case 0x11 => MERGE_RECORD
+ case 0x20 => DELETE_RANGE_RECORD
case _ => throw new RuntimeException(s"Found invalid record type for
value=$byte")
}
}
@@ -128,6 +132,8 @@ abstract class StateStoreChangelogWriter(
def merge(key: Array[Byte], value: Array[Byte]): Unit
+ def deleteRange(beginKey: Array[Byte], endKey: Array[Byte]): Unit
+
def abort(): Unit = {
try {
if (backingFileStream != null) backingFileStream.cancel()
@@ -189,6 +195,11 @@ class StateStoreChangelogWriterV1(
"changelog writer v1")
}
+ override def deleteRange(beginKey: Array[Byte], endKey: Array[Byte]): Unit =
{
+ throw new UnsupportedOperationException("Operation not supported with
state " +
+ "changelog writer v1")
+ }
+
override def commit(): Unit = {
try {
// -1 in the key length field mean EOF.
@@ -244,6 +255,15 @@ class StateStoreChangelogWriterV2(
writePutOrMergeRecord(key, value, RecordType.MERGE_RECORD)
}
+ override def deleteRange(beginKey: Array[Byte], endKey: Array[Byte]): Unit =
{
+ assert(compressedStream != null)
+
compressedStream.write(RecordType.getRecordTypeAsByte(RecordType.DELETE_RANGE_RECORD))
+ compressedStream.writeInt(beginKey.length)
+ compressedStream.write(beginKey)
+ compressedStream.writeInt(endKey.length)
+ compressedStream.write(endKey)
+ }
+
private def writePutOrMergeRecord(key: Array[Byte],
value: Array[Byte],
recordType: RecordType): Unit = {
@@ -557,6 +577,11 @@ class StateStoreChangelogReaderV2(
val valueBuffer = parseBuffer(input)
(RecordType.MERGE_RECORD, keyBuffer, valueBuffer)
+ case RecordType.DELETE_RANGE_RECORD =>
+ val beginKeyBuffer = parseBuffer(input)
+ val endKeyBuffer = parseBuffer(input)
+ (RecordType.DELETE_RANGE_RECORD, beginKeyBuffer, endKeyBuffer)
+
case _ =>
throw new IOException("Failed to process unknown record type")
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
index 3c200b860f52..b1e888c31e28 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
@@ -2509,7 +2509,7 @@ class RocksDBStateStoreSuite extends
StateStoreSuiteBase[RocksDBStateStoreProvid
}
}
- test("deleteRange - bulk deletion of keys in range") {
+ testWithRocksDBStateStore("deleteRange - bulk deletion of keys in range") {
tryWithProviderResource(
newStoreProvider(
keySchemaWithRangeScan,
@@ -2547,6 +2547,78 @@ class RocksDBStateStoreSuite extends
StateStoreSuiteBase[RocksDBStateStoreProvid
}
}
+ test("deleteRange - changelog checkpointing records and replays range
deletions") {
+ // useColumnFamilies = true is required to get changelog writer V2 which
supports
+ // DELETE_RANGE_RECORD. V1 (used when useColumnFamilies = false) does not
support it.
+ withSQLConf(
+ RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX +
".changelogCheckpointing.enabled" -> "true",
+ SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "100") {
+ val storeId = StateStoreId(newDir(), Random.nextInt(), 0)
+ val keyEncoderSpec =
RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0))
+ val cfName = "testColFamily"
+
+ // Create provider and commit version 1 with some data and a deleteRange
+ tryWithProviderResource(
+ newStoreProvider(storeId, keyEncoderSpec,
+ keySchema = keySchemaWithRangeScan,
+ useColumnFamilies = true)) { provider =>
+ val store = provider.getStore(0)
+ store.createColFamilyIfAbsent(cfName,
+ keySchemaWithRangeScan, valueSchema,
+ RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)))
+
+ // Put keys: (1, "a"), (2, "b"), (3, "c"), (4, "d"), (5, "e")
+ store.put(dataToKeyRowWithRangeScan(1L, "a"), dataToValueRow(10),
cfName)
+ store.put(dataToKeyRowWithRangeScan(2L, "b"), dataToValueRow(20),
cfName)
+ store.put(dataToKeyRowWithRangeScan(3L, "c"), dataToValueRow(30),
cfName)
+ store.put(dataToKeyRowWithRangeScan(4L, "d"), dataToValueRow(40),
cfName)
+ store.put(dataToKeyRowWithRangeScan(5L, "e"), dataToValueRow(50),
cfName)
+ store.commit()
+
+ // Version 2: deleteRange [2, 4) - should delete keys 2 and 3
+ val store2 = provider.getStore(1)
+ store2.createColFamilyIfAbsent(cfName,
+ keySchemaWithRangeScan, valueSchema,
+ RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)))
+ val beginKey = dataToKeyRowWithRangeScan(2L, "")
+ val endKey = dataToKeyRowWithRangeScan(4L, "")
+ store2.deleteRange(beginKey, endKey, cfName)
+ store2.commit()
+ }
+
+ // Reload from a fresh provider (same storeId) to force changelog replay
+ tryWithProviderResource(
+ newStoreProvider(storeId, keyEncoderSpec,
+ keySchema = keySchemaWithRangeScan,
+ useColumnFamilies = true)) { reloadedProvider =>
+ val reloadedStore = reloadedProvider.getStore(2)
+ try {
+ reloadedStore.createColFamilyIfAbsent(cfName,
+ keySchemaWithRangeScan, valueSchema,
+ RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)))
+ val remainingKeys = reloadedStore.iterator(cfName).map { kv =>
+ keyRowWithRangeScanToData(kv.key)
+ }.toSeq
+
+ // Keys 1, 4, 5 should remain; keys 2, 3 should have been deleted
via replay
+ assert(remainingKeys.length === 3)
+ assert(remainingKeys.map(_._1).toSet === Set(1L, 4L, 5L))
+ } finally {
+ if (!reloadedStore.hasCommitted) reloadedStore.abort()
+ }
+
+ // Verify that the change data reader throws on DELETE_RANGE_RECORD
+ val reader = reloadedProvider.asInstanceOf[SupportsFineGrainedReplay]
+ .getStateStoreChangeDataReader(2, 2, Some(cfName))
+ val ex = intercept[UnsupportedOperationException] {
+ reader.next()
+ }
+ assert(ex.getMessage.contains("DELETE_RANGE_RECORD"))
+ reader.closeIfNeeded()
+ }
+ }
+ }
+
test("Rocks DB task completion listener does not double unlock
acquireThread") {
// This test verifies that a thread that locks then unlocks the db and then
// fires a completion listener (Thread 1) does not unlock the lock validly
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]