This is an automated email from the ASF dual-hosted git repository.

ashrigondekar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 117e17114d51 [SPARK-55510][SS] Fix deleteRange of Rocksdb state store 
to call changelogWriter
117e17114d51 is described below

commit 117e17114d516d54950d213298cb0b39bf0377a9
Author: zeruibao <[email protected]>
AuthorDate: Fri Feb 27 12:57:36 2026 -0800

    [SPARK-55510][SS] Fix deleteRange of Rocksdb state store to call 
changelogWriter
    
    ### What changes were proposed in this pull request?
    This PR adds changelog writer support for deleteRange in the RocksDB state 
store and exposes delete range operations through the State Data Source change 
feed via a new end_key column. Previously, deleteRange only performed the 
RocksDB native range deletion but did not record the operation in the changelog 
file.
    The changes include:
    #### Changelog write/replay support:
    
    - Added a new DELETE_RANGE_RECORD record type (byte 0x20) to the RecordType 
enum in StateStoreChangelog.scala
    - Added an abstract deleteRange(beginKey, endKey) method to 
StateStoreChangelogWriter, implemented in V2/V4 writers (V1/V3 throw 
UnsupportedOperationException, consistent with merge)
    - Updated StateStoreChangelogReaderV2 to parse DELETE_RANGE_RECORD entries
    - Updated RocksDB.deleteRange to write to the changelog after the native 
db.deleteRange call, with checksum encoding for endKey when rowChecksumEnabled 
is true
    - Updated RocksDB.replayChangelog to handle DELETE_RANGE_RECORD by 
verifying checksums and calling deleteRange during recovery
    #### State Data Source change feed integration:
    
    - Added a new end_key column (with key schema) to the State Data Source 
change feed output schema for all state variable types (generic, ValueState, 
ListState, MapState)
    - Updated the StateStoreChangeDataReader return type from a 4-tuple to a 
5-tuple (RecordType, key, value, endKey, batchId) to carry the end key
    - Updated RocksDBStateStoreChangeDataReader.getNext() to decode both 
beginKey and endKey for DELETE_RANGE_RECORD, with proper column family prefix 
stripping and a .copy() on beginKey to avoid buffer reuse issues with 
RangeKeyScanStateEncoder
    - Updated HDFSBackedStateStoreChangeDataReader.getNext() to conform to the 
new 5-tuple (endKey is always null since HDFS does not support deleteRange)
    - Updated StateStoreChangeDataPartitionReader to map the new endKey field 
to the end_key schema column for both regular state types and MapState
    
    ### Why are the changes needed?
    When changelog checkpointing is enabled, the state store recovers by 
replaying changelog files rather than loading full snapshots. Since deleteRange 
was not recorded in the changelog, any range deletions were silently lost 
during changelog-based recovery, leading to data inconsistency -- keys that 
should have been deleted would reappear after a restart.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    UT
    
    ### Was this patch authored or co-authored using generative AI tooling?
    Yes, co-authored with Cursor
    
    Closes #54298 from 
zeruibao/zeruibao/SPARK-55510-fix-delete-range-to-call-changlog-writer.
    
    Authored-by: zeruibao <[email protected]>
    Signed-off-by: Anish Shrigondekar <[email protected]>
---
 .../sql/execution/streaming/state/RocksDB.scala    | 43 ++++++++++---
 .../state/RocksDBStateStoreProvider.scala          |  5 ++
 .../sql/execution/streaming/state/StateStore.scala |  1 -
 .../streaming/state/StateStoreChangelog.scala      | 25 ++++++++
 .../streaming/state/RocksDBStateStoreSuite.scala   | 74 +++++++++++++++++++++-
 5 files changed, 138 insertions(+), 10 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 69a7e9618bb3..a17c788d3207 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -981,6 +981,12 @@ class RocksDB(
               verifyChangelogRecord(kvVerifier, key, Some(value))
               merge(key, value, includesPrefix = useColumnFamilies,
                 deriveCfName = useColumnFamilies, includesChecksum = 
conf.rowChecksumEnabled)
+
+            case RecordType.DELETE_RANGE_RECORD =>
+              // For deleteRange, 'key' is beginKey and 'value' is endKey
+              verifyChangelogRecord(kvVerifier, key, Some(value))
+              deleteRange(key, value, includesPrefix = useColumnFamilies,
+                includesChecksum = conf.rowChecksumEnabled)
           }
         }
       } finally {
@@ -1438,29 +1444,50 @@ class RocksDB(
    * Delete all keys in the range [beginKey, endKey).
    * Uses RocksDB's native deleteRange for efficient bulk deletion.
    *
-   * @param beginKey The start key of the range (inclusive)
-   * @param endKey   The end key of the range (exclusive)
-   * @param cfName   The column family name
+   * @param beginKey       The start key of the range (inclusive)
+   * @param endKey         The end key of the range (exclusive)
+   * @param cfName         The column family name
+   * @param includesPrefix Whether the keys already include the column family 
prefix.
+   *                       Set to true during changelog replay to avoid 
double-encoding.
    */
   def deleteRange(
       beginKey: Array[Byte],
       endKey: Array[Byte],
-      cfName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
+      cfName: String = StateStore.DEFAULT_COL_FAMILY_NAME,
+      includesPrefix: Boolean = false,
+      includesChecksum: Boolean = false): Unit = {
     updateMemoryUsageIfNeeded()
 
-    val beginKeyWithPrefix = if (useColumnFamilies) {
+    val originalEndKey = if (conf.rowChecksumEnabled && includesChecksum) {
+      KeyValueChecksumEncoder.decodeAndVerifyValueRowWithChecksum(
+        readVerifier, beginKey, endKey, delimiterSize)
+    } else {
+      endKey
+    }
+
+    val beginKeyWithPrefix = if (useColumnFamilies && !includesPrefix) {
       encodeStateRowWithPrefix(beginKey, cfName)
     } else {
       beginKey
     }
 
-    val endKeyWithPrefix = if (useColumnFamilies) {
-      encodeStateRowWithPrefix(endKey, cfName)
+    val endKeyWithPrefix = if (useColumnFamilies && !includesPrefix) {
+      encodeStateRowWithPrefix(originalEndKey, cfName)
     } else {
-      endKey
+      originalEndKey
     }
 
     db.deleteRange(writeOptions, beginKeyWithPrefix, endKeyWithPrefix)
+    changelogWriter.foreach { writer =>
+      val endKeyForChangelog = if (conf.rowChecksumEnabled) {
+        KeyValueChecksumEncoder.encodeValueRowWithChecksum(endKeyWithPrefix,
+          KeyValueChecksum.create(beginKeyWithPrefix, Some(endKeyWithPrefix)))
+      } else {
+        endKeyWithPrefix
+      }
+      writer.deleteRange(beginKeyWithPrefix, endKeyForChangelog)
+    }
+    // TODO: Add metrics update for deleteRange operations
   }
 
   /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
index d8e0440e210d..1dbf948e0434 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
@@ -1495,6 +1495,11 @@ class RocksDBStateStoreChangeDataReader(
         }
 
         val nextRecord = reader.next()
+        if (nextRecord._1 == RecordType.DELETE_RANGE_RECORD) {
+          throw new UnsupportedOperationException(
+            "DELETE_RANGE_RECORD is not supported in the state data source 
change feed. " +
+              "Range deletions cannot be expanded into individual key-value 
change records.")
+        }
         val keyBytes = if (storeConf.rowChecksumEnabled
           && nextRecord._1 == RecordType.DELETE_RECORD) {
           // remove checksum and decode to the original key
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index e2d7c166a675..12881ce36806 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -287,7 +287,6 @@ trait StateStore extends ReadStateStore {
    * Delete all keys in the range [beginKey, endKey).
    * Uses RocksDB's native deleteRange for efficient bulk deletion.
    *
-   * @note This operation is NOT recorded in the changelog.
    * @param beginKey      The start key of the range (inclusive)
    * @param endKey        The end key of the range (exclusive)
    * @param colFamilyName The column family name
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
index 2029c0988756..3c611e7a0de4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
@@ -46,6 +46,7 @@ object RecordType extends Enumeration {
   val PUT_RECORD = Value("put_record")
   val DELETE_RECORD = Value("delete_record")
   val MERGE_RECORD = Value("merge_record")
+  val DELETE_RANGE_RECORD = Value("delete_range_record")
 
   // Generate byte representation of each record type
   def getRecordTypeAsByte(recordType: RecordType): Byte = {
@@ -54,6 +55,7 @@ object RecordType extends Enumeration {
       case PUT_RECORD => 0x01.toByte
       case DELETE_RECORD => 0x10.toByte
       case MERGE_RECORD => 0x11.toByte
+      case DELETE_RANGE_RECORD => 0x20.toByte
     }
   }
 
@@ -62,6 +64,7 @@ object RecordType extends Enumeration {
       case PUT_RECORD => "update"
       case DELETE_RECORD => "delete"
       case MERGE_RECORD => "append"
+      case DELETE_RANGE_RECORD => "delete_range"
       case _ => throw StateStoreErrors.unsupportedOperationException(
         "getRecordTypeAsString", recordType.toString)
     }
@@ -74,6 +77,7 @@ object RecordType extends Enumeration {
       case 0x01 => PUT_RECORD
       case 0x10 => DELETE_RECORD
       case 0x11 => MERGE_RECORD
+      case 0x20 => DELETE_RANGE_RECORD
       case _ => throw new RuntimeException(s"Found invalid record type for 
value=$byte")
     }
   }
@@ -128,6 +132,8 @@ abstract class StateStoreChangelogWriter(
 
   def merge(key: Array[Byte], value: Array[Byte]): Unit
 
+  def deleteRange(beginKey: Array[Byte], endKey: Array[Byte]): Unit
+
   def abort(): Unit = {
     try {
       if (backingFileStream != null) backingFileStream.cancel()
@@ -189,6 +195,11 @@ class StateStoreChangelogWriterV1(
       "changelog writer v1")
   }
 
+  override def deleteRange(beginKey: Array[Byte], endKey: Array[Byte]): Unit = 
{
+    throw new UnsupportedOperationException("Operation not supported with 
state " +
+      "changelog writer v1")
+  }
+
   override def commit(): Unit = {
     try {
       // -1 in the key length field mean EOF.
@@ -244,6 +255,15 @@ class StateStoreChangelogWriterV2(
     writePutOrMergeRecord(key, value, RecordType.MERGE_RECORD)
   }
 
+  override def deleteRange(beginKey: Array[Byte], endKey: Array[Byte]): Unit = 
{
+    assert(compressedStream != null)
+    
compressedStream.write(RecordType.getRecordTypeAsByte(RecordType.DELETE_RANGE_RECORD))
+    compressedStream.writeInt(beginKey.length)
+    compressedStream.write(beginKey)
+    compressedStream.writeInt(endKey.length)
+    compressedStream.write(endKey)
+  }
+
   private def writePutOrMergeRecord(key: Array[Byte],
       value: Array[Byte],
       recordType: RecordType): Unit = {
@@ -557,6 +577,11 @@ class StateStoreChangelogReaderV2(
           val valueBuffer = parseBuffer(input)
           (RecordType.MERGE_RECORD, keyBuffer, valueBuffer)
 
+        case RecordType.DELETE_RANGE_RECORD =>
+          val beginKeyBuffer = parseBuffer(input)
+          val endKeyBuffer = parseBuffer(input)
+          (RecordType.DELETE_RANGE_RECORD, beginKeyBuffer, endKeyBuffer)
+
         case _ =>
           throw new IOException("Failed to process unknown record type")
       }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
index 3c200b860f52..b1e888c31e28 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
@@ -2509,7 +2509,7 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
     }
   }
 
-  test("deleteRange - bulk deletion of keys in range") {
+  testWithRocksDBStateStore("deleteRange - bulk deletion of keys in range") {
     tryWithProviderResource(
       newStoreProvider(
         keySchemaWithRangeScan,
@@ -2547,6 +2547,78 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
     }
   }
 
+  test("deleteRange - changelog checkpointing records and replays range 
deletions") {
+    // useColumnFamilies = true is required to get changelog writer V2 which 
supports
+    // DELETE_RANGE_RECORD. V1 (used when useColumnFamilies = false) does not 
support it.
+    withSQLConf(
+      RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + 
".changelogCheckpointing.enabled" -> "true",
+      SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "100") {
+      val storeId = StateStoreId(newDir(), Random.nextInt(), 0)
+      val keyEncoderSpec = 
RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0))
+      val cfName = "testColFamily"
+
+      // Create provider and commit version 1 with some data and a deleteRange
+      tryWithProviderResource(
+        newStoreProvider(storeId, keyEncoderSpec,
+          keySchema = keySchemaWithRangeScan,
+          useColumnFamilies = true)) { provider =>
+        val store = provider.getStore(0)
+        store.createColFamilyIfAbsent(cfName,
+          keySchemaWithRangeScan, valueSchema,
+          RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)))
+
+        // Put keys: (1, "a"), (2, "b"), (3, "c"), (4, "d"), (5, "e")
+        store.put(dataToKeyRowWithRangeScan(1L, "a"), dataToValueRow(10), 
cfName)
+        store.put(dataToKeyRowWithRangeScan(2L, "b"), dataToValueRow(20), 
cfName)
+        store.put(dataToKeyRowWithRangeScan(3L, "c"), dataToValueRow(30), 
cfName)
+        store.put(dataToKeyRowWithRangeScan(4L, "d"), dataToValueRow(40), 
cfName)
+        store.put(dataToKeyRowWithRangeScan(5L, "e"), dataToValueRow(50), 
cfName)
+        store.commit()
+
+        // Version 2: deleteRange [2, 4) - should delete keys 2 and 3
+        val store2 = provider.getStore(1)
+        store2.createColFamilyIfAbsent(cfName,
+          keySchemaWithRangeScan, valueSchema,
+          RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)))
+        val beginKey = dataToKeyRowWithRangeScan(2L, "")
+        val endKey = dataToKeyRowWithRangeScan(4L, "")
+        store2.deleteRange(beginKey, endKey, cfName)
+        store2.commit()
+      }
+
+      // Reload from a fresh provider (same storeId) to force changelog replay
+      tryWithProviderResource(
+        newStoreProvider(storeId, keyEncoderSpec,
+          keySchema = keySchemaWithRangeScan,
+          useColumnFamilies = true)) { reloadedProvider =>
+        val reloadedStore = reloadedProvider.getStore(2)
+        try {
+          reloadedStore.createColFamilyIfAbsent(cfName,
+            keySchemaWithRangeScan, valueSchema,
+            RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)))
+          val remainingKeys = reloadedStore.iterator(cfName).map { kv =>
+            keyRowWithRangeScanToData(kv.key)
+          }.toSeq
+
+          // Keys 1, 4, 5 should remain; keys 2, 3 should have been deleted 
via replay
+          assert(remainingKeys.length === 3)
+          assert(remainingKeys.map(_._1).toSet === Set(1L, 4L, 5L))
+        } finally {
+          if (!reloadedStore.hasCommitted) reloadedStore.abort()
+        }
+
+        // Verify that the change data reader throws on DELETE_RANGE_RECORD
+        val reader = reloadedProvider.asInstanceOf[SupportsFineGrainedReplay]
+          .getStateStoreChangeDataReader(2, 2, Some(cfName))
+        val ex = intercept[UnsupportedOperationException] {
+          reader.next()
+        }
+        assert(ex.getMessage.contains("DELETE_RANGE_RECORD"))
+        reader.closeIfNeeded()
+      }
+    }
+  }
+
   test("Rocks DB task completion listener does not double unlock 
acquireThread") {
     // This test verifies that a thread that locks then unlocks the db and then
     // fires a completion listener (Thread 1) does not unlock the lock validly


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to