This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a22991700fab [SPARK-49883][SS][TESTS][FOLLOWUP] RocksDB Fault 
Tolerance Test
a22991700fab is described below

commit a22991700fab447fa9e402e3b853299f88a18ef1
Author: Wei Liu <[email protected]>
AuthorDate: Fri Jan 10 07:24:49 2025 +0900

    [SPARK-49883][SS][TESTS][FOLLOWUP] RocksDB Fault Tolerance Test
    
    ### What changes were proposed in this pull request?
    
    Add a new test that verifies the correct snapshot version is loaded in ckpt 
v2.
    
    ### Why are the changes needed?
    
    Test coverage
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Test only addition
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #49175 from WweiL/test1.
    
    Authored-by: Wei Liu <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../execution/streaming/state/RocksDBSuite.scala   | 299 ++++++++++++++-------
 1 file changed, 199 insertions(+), 100 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index fb0a4720f63d..634a3c9de901 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -553,28 +553,12 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
         enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
         versionToUniqueId = versionToUniqueId) { db =>
         for (version <- 0 to 49) {
-          db.load(version)
+          db.load(version, versionToUniqueId.get(version))
           db.put(version.toString, version.toString)
           db.commit()
           if ((version + 1) % 5 == 0) db.doMaintenance()
         }
       }
-  }
-
-  testWithColumnFamilies(
-    "RocksDB: check changelog and snapshot version",
-    TestWithChangelogCheckpointingEnabled) { colFamiliesEnabled =>
-    val remoteDir = Utils.createTempDir().toString
-    val conf = dbConf.copy(minDeltasForSnapshot = 1)
-    new File(remoteDir).delete() // to make sure that the directory gets 
created
-    for (version <- 0 to 49) {
-      withDB(remoteDir, version = version, conf = conf,
-        useColumnFamilies = colFamiliesEnabled) { db =>
-        db.put(version.toString, version.toString)
-        db.commit()
-        if ((version + 1) % 5 == 0) db.doMaintenance()
-      }
-    }
 
       if (isChangelogCheckpointingEnabled) {
         assert(changelogVersionsPresent(remoteDir) === (1 to 50))
@@ -613,7 +597,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with 
SharedSparkSession
           enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
           versionToUniqueId = versionToUniqueId) { db =>
         ex = intercept[SparkException] {
-          db.load(1)
+          db.load(1, versionToUniqueId.get(1))
         }
         checkError(
           ex,
@@ -728,27 +712,27 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
           enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
           versionToUniqueId = versionToUniqueId) { db =>
         for (version <- 0 to 1) {
-          db.load(version)
+          db.load(version, versionToUniqueId.get(version))
           db.commit()
           db.doMaintenance()
         }
         // Snapshot should not be created because minDeltasForSnapshot = 3
         assert(snapshotVersionsPresent(remoteDir) === Seq.empty)
         assert(changelogVersionsPresent(remoteDir) == Seq(1, 2))
-        db.load(2)
+        db.load(2, versionToUniqueId.get(2))
         db.commit()
         db.doMaintenance()
         assert(snapshotVersionsPresent(remoteDir) === Seq(3))
-        db.load(3)
+        db.load(3, versionToUniqueId.get(3))
 
         for (version <- 3 to 7) {
-          db.load(version)
+          db.load(version, versionToUniqueId.get(version))
           db.commit()
           db.doMaintenance()
         }
         assert(snapshotVersionsPresent(remoteDir) === Seq(3, 6))
         for (version <- 8 to 17) {
-          db.load(version)
+          db.load(version, versionToUniqueId.get(version))
           db.commit()
         }
         db.doMaintenance()
@@ -759,13 +743,13 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
       withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled,
           enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
           versionToUniqueId = versionToUniqueId) { db =>
-        db.load(18)
+        db.load(18, versionToUniqueId.get(18))
         db.commit()
         db.doMaintenance()
         assert(snapshotVersionsPresent(remoteDir) === Seq(3, 6, 18))
 
         for (version <- 19 to 20) {
-          db.load(version)
+          db.load(version, versionToUniqueId.get(version))
           db.commit()
         }
         db.doMaintenance()
@@ -785,7 +769,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with 
SharedSparkSession
           enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
           versionToUniqueId = versionToUniqueId) { db =>
         for (version <- 0 to 2) {
-          db.load(version)
+          db.load(version, versionToUniqueId.get(version))
           db.put(version.toString, version.toString)
           db.commit()
         }
@@ -793,7 +777,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with 
SharedSparkSession
         db.doMaintenance()
         // Roll back to version 1 and start to process data.
         for (version <- 1 to 3) {
-          db.load(version)
+          db.load(version, versionToUniqueId.get(version))
           db.put(version.toString, version.toString)
           db.commit()
         }
@@ -805,7 +789,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with 
SharedSparkSession
           enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
           versionToUniqueId = versionToUniqueId) { db =>
         // Open the db to verify that the state in 4.zip is no corrupted.
-        db.load(4)
+        db.load(4, versionToUniqueId.get(4))
       }
   }
 
@@ -824,7 +808,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with 
SharedSparkSession
       enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
       versionToUniqueId = versionToUniqueId) { db =>
       for (version <- 1 to 30) {
-        db.load(version - 1)
+        db.load(version - 1, versionToUniqueId.get(version - 1))
         db.put(version.toString, version.toString)
         db.remove((version - 1).toString)
         db.commit()
@@ -842,11 +826,11 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
       enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
       versionToUniqueId = versionToUniqueId) { db =>
       for (version <- 1 to 30) {
-        db.load(version)
+        db.load(version, versionToUniqueId.get(version))
         assert(db.iterator().map(toStr).toSet === Set((version.toString, 
version.toString)))
       }
       for (version <- 30 to 60) {
-        db.load(version - 1)
+        db.load(version - 1, versionToUniqueId.get(version - 1))
         db.put(version.toString, version.toString)
         db.remove((version - 1).toString)
         db.commit()
@@ -854,13 +838,13 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
       assert(snapshotVersionsPresent(remoteDir) === (1 to 30))
       assert(changelogVersionsPresent(remoteDir) === (30 to 60))
       for (version <- 1 to 60) {
-        db.load(version, readOnly = true)
+        db.load(version, versionToUniqueId.get(version), readOnly = true)
         assert(db.iterator().map(toStr).toSet === Set((version.toString, 
version.toString)))
       }
 
       // recommit 60 to ensure that acquireLock is released for maintenance
       for (version <- 60 to 60) {
-        db.load(version - 1)
+        db.load(version - 1, versionToUniqueId.get(version - 1))
         db.put(version.toString, version.toString)
         db.remove((version - 1).toString)
         db.commit()
@@ -877,12 +861,116 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
 
       // Verify the content of retained versions.
       for (version <- 30 to 60) {
-        db.load(version, readOnly = true)
+        db.load(version, versionToUniqueId.get(version), readOnly = true)
         assert(db.iterator().map(toStr).toSet === Set((version.toString, 
version.toString)))
       }
     }
   }
 
+  testWithChangelogCheckpointingEnabled("RocksDB Fault Tolerance: correctly 
handle when there " +
+    "are multiple snapshot files for the same version") {
+    val enableStateStoreCheckpointIds = true
+    val useColumnFamily = true
+    val remoteDir = Utils.createTempDir().toString
+    new File(remoteDir).delete() // to make sure that the directory gets 
created
+    val enableChangelogCheckpointingConf =
+      dbConf.copy(enableChangelogCheckpointing = true, minVersionsToRetain = 
20,
+        minDeltasForSnapshot = 3)
+
+    // Simulate when there are multiple snapshot files for the same version
+    // The first DB writes to version 0 with uniqueId
+    val versionToUniqueId1 = new mutable.HashMap[Long, String]()
+    withDB(remoteDir, conf = enableChangelogCheckpointingConf,
+      useColumnFamilies = useColumnFamily,
+      enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
+      versionToUniqueId = versionToUniqueId1) { db =>
+      db.load(0, versionToUniqueId1.get(0))
+      db.put("a", "1") // write key a here
+      db.commit()
+
+      // Add some change log files after the snapshot
+      for (version <- 2 to 5) {
+        db.load(version - 1, versionToUniqueId1.get(version - 1))
+        db.put(version.toString, version.toString) // update "1" -> "1", "2" 
-> "2", ...
+        db.commit()
+      }
+
+      // doMaintenance uploads the snapshot
+      db.doMaintenance()
+
+      for (version <- 6 to 10) {
+        db.load(version - 1, versionToUniqueId1.get(version - 1))
+        db.put(version.toString, version.toString)
+        db.commit()
+      }
+    }
+
+    // versionToUniqueId1 should be non-empty, meaning the id is updated from 
rocksDB to the map
+    assert(versionToUniqueId1.nonEmpty)
+
+    // The second DB writes to version 0 with another uniqueId
+    val versionToUniqueId2 = new mutable.HashMap[Long, String]()
+    withDB(remoteDir, conf = enableChangelogCheckpointingConf,
+      useColumnFamilies = useColumnFamily,
+      enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
+      versionToUniqueId = versionToUniqueId2) { db =>
+      db.load(0, versionToUniqueId2.get(0))
+      db.put("b", "2") // write key b here
+      db.commit()
+      // Add some change log files after the snapshot
+      for (version <- 2 to 5) {
+        db.load(version - 1, versionToUniqueId2.get(version - 1))
+        db.put(version.toString, (version + 1).toString) // update "1" -> "2", 
"2" -> "3", ...
+        db.commit()
+      }
+
+      // doMaintenance uploads the snapshot
+      db.doMaintenance()
+
+      for (version <- 6 to 10) {
+        db.load(version - 1, versionToUniqueId2.get(version - 1))
+        db.put(version.toString, (version + 1).toString)
+        db.commit()
+      }
+    }
+
+    // versionToUniqueId2 should be non-empty, meaning the id is updated from 
rocksDB to the map
+    assert(versionToUniqueId2.nonEmpty)
+
+    // During a load() with linage from the first rocksDB,
+    // the DB should load with data in the first db
+    withDB(remoteDir, conf = enableChangelogCheckpointingConf,
+      useColumnFamilies = useColumnFamily,
+      enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
+      versionToUniqueId = versionToUniqueId1) { db =>
+      db.load(10, versionToUniqueId1.get(10))
+      assert(toStr(db.get("a")) === "1")
+      for (version <- 2 to 10) {
+        // first time we write version -> version
+        // second time we write version -> version + 1
+        // here since we are loading from the first db lineage, we should see 
version -> version
+        assert(toStr(db.get(version.toString)) === version.toString)
+      }
+    }
+
+    // During a load() with linage from the second rocksDB,
+    // the DB should load with data in the second db
+    withDB(remoteDir, conf = enableChangelogCheckpointingConf,
+      useColumnFamilies = useColumnFamily,
+      enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
+      versionToUniqueId = versionToUniqueId2) { db =>
+      db.load(10, versionToUniqueId2.get(10))
+      assert(toStr(db.get("b")) === "2")
+      for (version <- 2 to 10) {
+        // first time we write version -> version
+        // second time we write version -> version + 1
+        // here since we are loading from the second db lineage,
+        // we should see version -> version + 1
+        assert(toStr(db.get(version.toString)) === (version + 1).toString)
+      }
+    }
+  }
+
   // A rocksdb instance with changelog checkpointing disabled should be able 
to load
   // an existing checkpoint with changelog.
   testWithStateStoreCheckpointIdsAndColumnFamilies(
@@ -899,7 +987,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with 
SharedSparkSession
       enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
       versionToUniqueId = versionToUniqueId) { db =>
       for (version <- 1 to 30) {
-        db.load(version - 1)
+        db.load(version - 1, versionToUniqueId.get(version - 1))
         db.put(version.toString, version.toString)
         db.remove((version - 1).toString)
         db.commit()
@@ -916,11 +1004,11 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
       enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
       versionToUniqueId = versionToUniqueId) { db =>
       for (version <- 1 to 30) {
-        db.load(version)
+        db.load(version, versionToUniqueId.get(version))
         assert(db.iterator().map(toStr).toSet === Set((version.toString, 
version.toString)))
       }
       for (version <- 31 to 60) {
-        db.load(version - 1)
+        db.load(version - 1, versionToUniqueId.get(version - 1))
         db.put(version.toString, version.toString)
         db.remove((version - 1).toString)
         db.commit()
@@ -928,7 +1016,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
       assert(changelogVersionsPresent(remoteDir) === (1 to 30))
       assert(snapshotVersionsPresent(remoteDir) === (31 to 60))
       for (version <- 1 to 60) {
-        db.load(version, readOnly = true)
+        db.load(version, versionToUniqueId.get(version), readOnly = true)
         assert(db.iterator().map(toStr).toSet === Set((version.toString, 
version.toString)))
       }
       // Check that snapshots and changelogs get purged correctly.
@@ -937,7 +1025,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
       assert(changelogVersionsPresent(remoteDir) === Seq.empty)
       // Verify the content of retained versions.
       for (version <- 41 to 60) {
-        db.load(version, readOnly = true)
+        db.load(version, versionToUniqueId.get(version), readOnly = true)
         assert(db.iterator().map(toStr).toSet === Set((version.toString, 
version.toString)))
       }
     }
@@ -1030,7 +1118,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
           assert(toStr(db.get("b")) === "2")
           assert(db.iterator().map(toStr).toSet === Set(("a", "1"), ("b", 
"2")))
 
-          db.load(1)
+          db.load(1, versionToUniqueId.get(1))
           assert(toStr(db.get("b")) === null)
           assert(db.iterator().map(toStr).toSet === Set(("a", "1")))
         }
@@ -1847,12 +1935,12 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
         // DB has been loaded so current thread has already
         // acquired the lock on the RocksDB instance
 
-        db.load(0) // Current thread should be able to load again
+        db.load(0, versionToUniqueId.get(0)) // Current thread should be able 
to load again
 
         // Another thread should not be able to load while current thread is 
using it
         var ex = intercept[SparkException] {
           ThreadUtils.runInNewThread("concurrent-test-thread-1") {
-            db.load(0)
+            db.load(0, versionToUniqueId.get(0))
           }
         }
         checkError(
@@ -1872,15 +1960,15 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
         // Commit should release the instance allowing other threads to load 
new version
         db.commit()
         ThreadUtils.runInNewThread("concurrent-test-thread-2") {
-          db.load(1)
+          db.load(1, versionToUniqueId.get(1))
           db.commit()
         }
 
         // Another thread should not be able to load while current thread is 
using it
-        db.load(2)
+        db.load(2, versionToUniqueId.get(2))
         ex = intercept[SparkException] {
           ThreadUtils.runInNewThread("concurrent-test-thread-2") {
-            db.load(2)
+            db.load(2, versionToUniqueId.get(2))
           }
         }
         checkError(
@@ -1900,7 +1988,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
         // Rollback should release the instance allowing other threads to load 
new version
         db.rollback()
         ThreadUtils.runInNewThread("concurrent-test-thread-3") {
-          db.load(1)
+          db.load(1, versionToUniqueId.get(1))
           db.commit()
         }
       }
@@ -2354,14 +2442,14 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
     withDB(remoteDir, conf = conf, enableStateStoreCheckpointIds = 
enableStateStoreCheckpointIds,
       versionToUniqueId = versionToUniqueId) { db =>
       for (version <- 0 to 1) {
-        db.load(version)
+        db.load(version, versionToUniqueId.get(version))
         db.put(version.toString, version.toString)
         db.commit()
       }
       // upload snapshot 2.zip
       db.doMaintenance()
       for (version <- Seq(2)) {
-        db.load(version)
+        db.load(version, versionToUniqueId.get(version))
         db.put(version.toString, version.toString)
         db.commit()
       }
@@ -2379,16 +2467,16 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
         }
         db1.doMaintenance()
       }
-      db.load(2)
+      db.load(2, versionToUniqueId.get(2))
       for (version <- Seq(2)) {
-        db.load(version)
+        db.load(version, versionToUniqueId.get(version))
         db.put(version.toString, version.toString)
         db.commit()
       }
       // upload snapshot 3.zip
       db.doMaintenance()
       // rollback to version 2
-      db.load(2)
+      db.load(2, versionToUniqueId.get(2))
     }
   }
 
@@ -2403,18 +2491,18 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
       withDB(remoteDir, conf = conf, enableStateStoreCheckpointIds = 
enableStateStoreCheckpointIds,
         versionToUniqueId = versionToUniqueId) { db =>
         for (version <- 0 to 1) {
-          db.load(version)
+          db.load(version, versionToUniqueId.get(version))
           db.put(version.toString, version.toString)
           db.commit()
         }
         // upload snapshot 2.zip
         db.doMaintenance()
         for (version <- 2 to 3) {
-          db.load(version)
+          db.load(version, versionToUniqueId.get(version))
           db.put(version.toString, version.toString)
           db.commit()
         }
-        db.load(0)
+        db.load(0, versionToUniqueId.get(0))
         // simulate db in another executor that override the zip file
         // In checkpoint V2, reusing the same versionToUniqueId to simulate 
when two executors
         // are scheduled with the same uniqueId in the same microbatch
@@ -2429,7 +2517,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
           db1.doMaintenance()
         }
         for (version <- 2 to 3) {
-          db.load(version)
+          db.load(version, versionToUniqueId.get(version))
           db.put(version.toString, version.toString)
           db.commit()
         }
@@ -2450,14 +2538,14 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
     withDB(remoteDir, conf = conf, enableStateStoreCheckpointIds = 
enableStateStoreCheckpointIds,
       versionToUniqueId = versionToUniqueId) { db =>
       for (version <- 0 to 2) {
-        db.load(version)
+        db.load(version, versionToUniqueId.get(version))
         db.put(version.toString, version.toString)
         db.commit()
       }
       // upload snapshot 2.zip
       db.doMaintenance()
       for (version <- 1 to 3) {
-        db.load(version)
+        db.load(version, versionToUniqueId.get(version))
         db.put(version.toString, version.toString)
         db.commit()
       }
@@ -2478,13 +2566,13 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
     withDB(remoteDir, conf = conf, enableStateStoreCheckpointIds = 
enableStateStoreCheckpointIds,
       versionToUniqueId = versionToUniqueId) { db =>
       for (version <- 0 to 1) {
-        db.load(version)
+        db.load(version, versionToUniqueId.get(version))
         db.put(version.toString, version.toString)
         db.commit()
       }
 
       // load previous version, and recreate the snapshot
-      db.load(1)
+      db.load(1, versionToUniqueId.get(1))
       db.put("3", "3")
 
       // upload any latest snapshots so far
@@ -2514,12 +2602,12 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
         withDB(remoteDir, conf = conf, hadoopConf = hadoopConf,
           enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
           versionToUniqueId = versionToUniqueId) { db =>
-          db.load(0)
+          db.load(0, versionToUniqueId.get(0))
           db.put("a", "1")
           db.commit()
 
           // load previous version, will recreate snapshot on commit
-          db.load(0)
+          db.load(0, versionToUniqueId.get(0))
           db.put("a", "1")
 
           // upload version 1 snapshot created previously
@@ -2565,13 +2653,13 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
       // This test was accidentally fixed by
       // SPARK-48931 (https://github.com/apache/spark/pull/47393)
 
-      db.load(0)
+      db.load(0, versionToUniqueId.get(0))
       db.put("foo", "bar")
       // Snapshot checkpoint not needed
       db.commit()
 
       // Continue using local DB
-      db.load(1)
+      db.load(1, versionToUniqueId.get(1))
       db.put("foo", "bar")
       // Should create a local RocksDB snapshot
       db.commit()
@@ -2579,19 +2667,19 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
       db.doMaintenance()
 
       // This will reload Db from the cloud.
-      db.load(1)
+      db.load(1, versionToUniqueId.get(1))
       db.put("foo", "bar")
       // Should create another local snapshot
       db.commit()
 
       // Continue using local DB
-      db.load(2)
+      db.load(2, versionToUniqueId.get(2))
       db.put("foo", "bar")
       // Snapshot checkpoint not needed
       db.commit()
 
       // Reload DB from the cloud, loading from 2.zip
-      db.load(2)
+      db.load(2, versionToUniqueId.get(2))
       db.put("foo", "bar")
       // Snapshot checkpoint not needed
       db.commit()
@@ -2600,14 +2688,14 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
       db.doMaintenance()
 
       // Reload new 2.zip just uploaded to validate it is not corrupted.
-      db.load(2)
+      db.load(2, versionToUniqueId.get(2))
       db.put("foo", "bar")
       db.commit()
 
       // Test the maintenance thread is delayed even after the next snapshot 
is created.
       // There will be two outstanding snapshots.
       for (batchVersion <- 3 to 6) {
-        db.load(batchVersion)
+        db.load(batchVersion, versionToUniqueId.get(batchVersion))
         db.put("foo", "bar")
         // In batchVersion 3 and 5, it will generate a local snapshot but 
won't be uploaded.
         db.commit()
@@ -2618,7 +2706,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
       // maintenance tasks finish quickly.
       for (batchVersion <- 7 to 10) {
         for (j <- 0 to 1) {
-          db.load(batchVersion)
+          db.load(batchVersion, versionToUniqueId.get(batchVersion))
           db.put("foo", "bar")
           db.commit()
           db.doMaintenance()
@@ -2649,7 +2737,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
             val random = new Random(randomSeed)
             var curVer: Int = 0
             for (i <- 1 to 100) {
-              db.load(curVer)
+              db.load(curVer, versionToUniqueId.get(curVer))
               db.put("foo", "bar")
               db.commit()
               // For a one in five chance, maintenance task is executed. The 
chance is created to
@@ -2702,33 +2790,33 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
           enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
           versionToUniqueId = versionToUniqueId) { db2 =>
           // commit version 1 via db1
-          db1.load(0)
+          db1.load(0, versionToUniqueId.get(0))
           db1.put("a", "1")
           db1.put("b", "1")
 
           db1.commit()
 
           // commit version 1 via db2
-          db2.load(0)
+          db2.load(0, versionToUniqueId.get(0))
           db2.put("a", "1")
           db2.put("b", "1")
 
           db2.commit()
 
           // commit version 2 via db2
-          db2.load(1)
+          db2.load(1, versionToUniqueId.get(1))
           db2.put("a", "2")
           db2.put("b", "2")
 
           db2.commit()
 
           // reload version 1, this should succeed
-          db2.load(1)
-          db1.load(1)
+          db2.load(1, versionToUniqueId.get(1))
+          db1.load(1, versionToUniqueId.get(1))
 
           // reload version 2, this should succeed
-          db2.load(2)
-          db1.load(2)
+          db2.load(2, versionToUniqueId.get(2))
+          db1.load(2, versionToUniqueId.get(2))
         }
       }
     }
@@ -2748,33 +2836,33 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
           enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
           versionToUniqueId = versionToUniqueId) { db2 =>
           // commit version 1 via db1
-          db1.load(0)
+          db1.load(0, versionToUniqueId.get(0))
           db1.put("a", "1")
           db1.put("b", "1")
 
           db1.commit()
 
           // commit version 1 via db2
-          db2.load(0)
+          db2.load(0, versionToUniqueId.get(0))
           db2.put("a", "1")
           db2.put("b", "1")
 
           db2.commit()
 
           // commit version 2 via db2
-          db2.load(1)
+          db2.load(1, versionToUniqueId.get(1))
           db2.put("a", "2")
           db2.put("b", "2")
 
           db2.commit()
 
           // reload version 1, this should succeed
-          db2.load(1)
-          db1.load(1)
+          db2.load(1, versionToUniqueId.get(1))
+          db1.load(1, versionToUniqueId.get(1))
 
           // reload version 2, this should succeed
-          db2.load(2)
-          db1.load(2)
+          db2.load(2, versionToUniqueId.get(2))
+          db1.load(2, versionToUniqueId.get(2))
         }
       }
     }
@@ -2798,33 +2886,33 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
           enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
           versionToUniqueId = versionToUniqueId) { db2 =>
           // commit version 1 via db2
-          db2.load(0)
+          db2.load(0, versionToUniqueId.get(0))
           db2.put("a", "1")
           db2.put("b", "1")
 
           db2.commit()
 
           // commit version 1 via db1
-          db1.load(0)
+          db1.load(0, versionToUniqueId.get(0))
           db1.put("a", "1")
           db1.put("b", "1")
 
           db1.commit()
 
           // commit version 2 via db2
-          db2.load(1)
+          db2.load(1, versionToUniqueId.get(1))
           db2.put("a", "2")
           db2.put("b", "2")
 
           db2.commit()
 
           // reload version 1, this should succeed
-          db2.load(1)
-          db1.load(1)
+          db2.load(1, versionToUniqueId.get(1))
+          db1.load(1, versionToUniqueId.get(1))
 
           // reload version 2, this should succeed
-          db2.load(2)
-          db1.load(2)
+          db2.load(2, versionToUniqueId.get(2))
+          db1.load(2, versionToUniqueId.get(2))
         }
       }
     }
@@ -2844,33 +2932,33 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
           enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
           versionToUniqueId = versionToUniqueId) { db2 =>
           // commit version 1 via db2
-          db2.load(0)
+          db2.load(0, versionToUniqueId.get(0))
           db2.put("a", "1")
           db2.put("b", "1")
 
           db2.commit()
 
           // commit version 1 via db1
-          db1.load(0)
+          db1.load(0, versionToUniqueId.get(0))
           db1.put("a", "1")
           db1.put("b", "1")
 
           db1.commit()
 
           // commit version 2 via db2
-          db2.load(1)
+          db2.load(1, versionToUniqueId.get(1))
           db2.put("a", "2")
           db2.put("b", "2")
 
           db2.commit()
 
           // reload version 1, this should succeed
-          db2.load(1)
-          db1.load(1)
+          db2.load(1, versionToUniqueId.get(1))
+          db1.load(1, versionToUniqueId.get(1))
 
           // reload version 2, this should succeed
-          db2.load(2)
-          db1.load(2)
+          db2.load(2, versionToUniqueId.get(2))
+          db1.load(2, versionToUniqueId.get(2))
         }
       }
     }
@@ -3163,7 +3251,13 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
         version: Long,
         ckptId: Option[String] = None,
         readOnly: Boolean = false): RocksDB = {
-      super.load(version, versionToUniqueId.get(version), readOnly)
+      // When a ckptId is defined, it means the test is explicitly using v2 
semantic
+      // When it is not, it is possible that implicitly uses it.
+      // So still do a versionToUniqueId.get
+      ckptId match {
+        case Some(_) => super.load(version, ckptId, readOnly)
+        case None => super.load(version, versionToUniqueId.get(version), 
readOnly)
+      }
     }
 
     override def commit(): Long = {
@@ -3184,6 +3278,11 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
       hadoopConf: Configuration = hadoopConf,
       useColumnFamilies: Boolean = false,
       enableStateStoreCheckpointIds: Boolean = false,
+      // versionToUniqueId is used in checkpoint format v2, it simulates the 
lineage
+      // stored in the commit log. The lineage will be automatically updated 
in db.commit()
+      // When testing V2, please create a versionToUniqueId map
+      // and call versionToUniqueId.get(version) in the db.load() function.
+      // In V1, versionToUniqueId is not used and 
versionToUniqueId.get(version) returns None.
       versionToUniqueId : mutable.Map[Long, String] = mutable.Map[Long, 
String](),
       localDir: File = Utils.createTempDir())(
       func: RocksDB => T): T = {
@@ -3207,7 +3306,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
           loggingId = s"[Thread-${Thread.currentThread.getId}]",
           useColumnFamilies = useColumnFamilies)
       }
-      db.load(version)
+      db.load(version, versionToUniqueId.get(version))
       func(db)
     } finally {
       if (db != null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to