This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new ac4b9154b58 [SPARK-45419][SS] Avoid reusing rocksdb sst files in a 
dfferent rocksdb instance
ac4b9154b58 is described below

commit ac4b9154b5822779023e66f2efb24d05e20b1cca
Author: Chaoqin Li <chaoqin...@databricks.com>
AuthorDate: Tue Oct 10 11:03:19 2023 +0900

    [SPARK-45419][SS] Avoid reusing rocksdb sst files in a dfferent rocksdb 
instance
    
    ### What changes were proposed in this pull request?
    When loading a rocksdb instance, remove file version map entry of larger 
versions to avoid rocksdb sst file unique id mismatch exception. The SST files 
in larger versions can't be reused even if they have the same size and name 
because they belong to another rocksdb instance.
    
    ### Why are the changes needed?
    Avoid rocksdb file mismatch exception that may occur in runtime.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Add rocksdb unit test.
    
    Closes #43174 from chaoqin-li1123/rocksdb_mismatch.
    
    Authored-by: Chaoqin Li <chaoqin...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../streaming/state/RocksDBFileManager.scala       |  4 +++
 .../execution/streaming/state/RocksDBSuite.scala   | 29 ++++++++++++++++++++++
 2 files changed, 33 insertions(+)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index 0891d773713..faf9cd701ae 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -207,6 +207,10 @@ class RocksDBFileManager(
    */
   def loadCheckpointFromDfs(version: Long, localDir: File): 
RocksDBCheckpointMetadata = {
     logInfo(s"Loading checkpoint files for version $version")
+    // The unique ids of SST files are checked when opening a rocksdb 
instance. The SST files
+    // in larger versions can't be reused even if they have the same size and 
name because
+    // they belong to another rocksdb instance.
+    versionToRocksDBFiles.keySet().removeIf(_ >= version)
     val metadata = if (version == 0) {
       if (localDir.exists) Utils.deleteRecursively(localDir)
       localDir.mkdirs()
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index e31b05c362f..91dd8582207 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -214,6 +214,35 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
     }
   }
 
+  testWithChangelogCheckpointingEnabled("SPARK-45419: Do not reuse SST files" +
+    " in different RocksDB instances") {
+    val remoteDir = Utils.createTempDir().toString
+    val conf = dbConf.copy(minDeltasForSnapshot = 0, compactOnCommit = false)
+    new File(remoteDir).delete()  // to make sure that the directory gets 
created
+    withDB(remoteDir, conf = conf) { db =>
+      for (version <- 0 to 2) {
+        db.load(version)
+        db.put(version.toString, version.toString)
+        db.commit()
+      }
+      // upload snapshot 3.zip
+      db.doMaintenance()
+      // Roll back to version 1 and start to process data.
+      for (version <- 1 to 3) {
+        db.load(version)
+        db.put(version.toString, version.toString)
+        db.commit()
+      }
+      // Upload snapshot 4.zip, should not reuse the SST files in 3.zip
+      db.doMaintenance()
+    }
+
+    withDB(remoteDir, conf = conf) { db =>
+      // Open the db to verify that the state in 4.zip is no corrupted.
+      db.load(4)
+    }
+  }
+
   // A rocksdb instance with changelog checkpointing enabled should be able to 
load
   // an existing checkpoint without changelog.
   testWithChangelogCheckpointingEnabled(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to