This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ff87c0958f7 [SPARK-45419][SS] Avoid reusing rocksdb sst files in a 
dfferent rocksdb instance
ff87c0958f7 is described below

commit ff87c0958f79b16c2f276e0dc53a855fca558347
Author: Chaoqin Li <chaoqin...@databricks.com>
AuthorDate: Tue Oct 10 11:03:19 2023 +0900

    [SPARK-45419][SS] Avoid reusing rocksdb sst files in a dfferent rocksdb 
instance
    
    ### What changes were proposed in this pull request?
    When loading a rocksdb instance, remove file version map entry of larger 
versions to avoid rocksdb sst file unique id mismatch exception. The SST files 
in larger versions can't be reused even if they have the same size and name 
because they belong to another rocksdb instance.
    
    ### Why are the changes needed?
    Avoid rocksdb file mismatch exception that may occur in runtime.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Add rocksdb unit test.
    
    Closes #43174 from chaoqin-li1123/rocksdb_mismatch.
    
    Authored-by: Chaoqin Li <chaoqin...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../streaming/state/RocksDBFileManager.scala       |  4 +++
 .../execution/streaming/state/RocksDBSuite.scala   | 29 ++++++++++++++++++++++
 2 files changed, 33 insertions(+)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index 05fd845accb..eae9aac3c0a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -208,6 +208,10 @@ class RocksDBFileManager(
    */
   def loadCheckpointFromDfs(version: Long, localDir: File): 
RocksDBCheckpointMetadata = {
     logInfo(s"Loading checkpoint files for version $version")
+    // The unique ids of SST files are checked when opening a rocksdb 
instance. The SST files
+    // in larger versions can't be reused even if they have the same size and 
name because
+    // they belong to another rocksdb instance.
+    versionToRocksDBFiles.keySet().removeIf(_ >= version)
     val metadata = if (version == 0) {
       if (localDir.exists) Utils.deleteRecursively(localDir)
       localDir.mkdirs()
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index b4b67f381d2..764358dc1f0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -240,6 +240,35 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
     }
   }
 
+  testWithChangelogCheckpointingEnabled("SPARK-45419: Do not reuse SST files" +
+    " in different RocksDB instances") {
+    val remoteDir = Utils.createTempDir().toString
+    val conf = dbConf.copy(minDeltasForSnapshot = 0, compactOnCommit = false)
+    new File(remoteDir).delete()  // to make sure that the directory gets 
created
+    withDB(remoteDir, conf = conf) { db =>
+      for (version <- 0 to 2) {
+        db.load(version)
+        db.put(version.toString, version.toString)
+        db.commit()
+      }
+      // upload snapshot 3.zip
+      db.doMaintenance()
+      // Roll back to version 1 and start to process data.
+      for (version <- 1 to 3) {
+        db.load(version)
+        db.put(version.toString, version.toString)
+        db.commit()
+      }
+      // Upload snapshot 4.zip, should not reuse the SST files in 3.zip
+      db.doMaintenance()
+    }
+
+    withDB(remoteDir, conf = conf) { db =>
+      // Open the db to verify that the state in 4.zip is no corrupted.
+      db.load(4)
+    }
+  }
+
   // A rocksdb instance with changelog checkpointing enabled should be able to 
load
   // an existing checkpoint without changelog.
   testWithChangelogCheckpointingEnabled(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to