This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a22991700fab [SPARK-49883][SS][TESTS][FOLLOWUP] RocksDB Fault
Tolerance Test
a22991700fab is described below
commit a22991700fab447fa9e402e3b853299f88a18ef1
Author: Wei Liu <[email protected]>
AuthorDate: Fri Jan 10 07:24:49 2025 +0900
[SPARK-49883][SS][TESTS][FOLLOWUP] RocksDB Fault Tolerance Test
### What changes were proposed in this pull request?
Add a new test that verifies the correct snapshot version is loaded in ckpt
v2.
### Why are the changes needed?
Test coverage
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Test only addition
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #49175 from WweiL/test1.
Authored-by: Wei Liu <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../execution/streaming/state/RocksDBSuite.scala | 299 ++++++++++++++-------
1 file changed, 199 insertions(+), 100 deletions(-)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index fb0a4720f63d..634a3c9de901 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -553,28 +553,12 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
versionToUniqueId = versionToUniqueId) { db =>
for (version <- 0 to 49) {
- db.load(version)
+ db.load(version, versionToUniqueId.get(version))
db.put(version.toString, version.toString)
db.commit()
if ((version + 1) % 5 == 0) db.doMaintenance()
}
}
- }
-
- testWithColumnFamilies(
- "RocksDB: check changelog and snapshot version",
- TestWithChangelogCheckpointingEnabled) { colFamiliesEnabled =>
- val remoteDir = Utils.createTempDir().toString
- val conf = dbConf.copy(minDeltasForSnapshot = 1)
- new File(remoteDir).delete() // to make sure that the directory gets
created
- for (version <- 0 to 49) {
- withDB(remoteDir, version = version, conf = conf,
- useColumnFamilies = colFamiliesEnabled) { db =>
- db.put(version.toString, version.toString)
- db.commit()
- if ((version + 1) % 5 == 0) db.doMaintenance()
- }
- }
if (isChangelogCheckpointingEnabled) {
assert(changelogVersionsPresent(remoteDir) === (1 to 50))
@@ -613,7 +597,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with
SharedSparkSession
enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
versionToUniqueId = versionToUniqueId) { db =>
ex = intercept[SparkException] {
- db.load(1)
+ db.load(1, versionToUniqueId.get(1))
}
checkError(
ex,
@@ -728,27 +712,27 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
versionToUniqueId = versionToUniqueId) { db =>
for (version <- 0 to 1) {
- db.load(version)
+ db.load(version, versionToUniqueId.get(version))
db.commit()
db.doMaintenance()
}
// Snapshot should not be created because minDeltasForSnapshot = 3
assert(snapshotVersionsPresent(remoteDir) === Seq.empty)
assert(changelogVersionsPresent(remoteDir) == Seq(1, 2))
- db.load(2)
+ db.load(2, versionToUniqueId.get(2))
db.commit()
db.doMaintenance()
assert(snapshotVersionsPresent(remoteDir) === Seq(3))
- db.load(3)
+ db.load(3, versionToUniqueId.get(3))
for (version <- 3 to 7) {
- db.load(version)
+ db.load(version, versionToUniqueId.get(version))
db.commit()
db.doMaintenance()
}
assert(snapshotVersionsPresent(remoteDir) === Seq(3, 6))
for (version <- 8 to 17) {
- db.load(version)
+ db.load(version, versionToUniqueId.get(version))
db.commit()
}
db.doMaintenance()
@@ -759,13 +743,13 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled,
enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
versionToUniqueId = versionToUniqueId) { db =>
- db.load(18)
+ db.load(18, versionToUniqueId.get(18))
db.commit()
db.doMaintenance()
assert(snapshotVersionsPresent(remoteDir) === Seq(3, 6, 18))
for (version <- 19 to 20) {
- db.load(version)
+ db.load(version, versionToUniqueId.get(version))
db.commit()
}
db.doMaintenance()
@@ -785,7 +769,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with
SharedSparkSession
enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
versionToUniqueId = versionToUniqueId) { db =>
for (version <- 0 to 2) {
- db.load(version)
+ db.load(version, versionToUniqueId.get(version))
db.put(version.toString, version.toString)
db.commit()
}
@@ -793,7 +777,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with
SharedSparkSession
db.doMaintenance()
// Roll back to version 1 and start to process data.
for (version <- 1 to 3) {
- db.load(version)
+ db.load(version, versionToUniqueId.get(version))
db.put(version.toString, version.toString)
db.commit()
}
@@ -805,7 +789,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with
SharedSparkSession
enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
versionToUniqueId = versionToUniqueId) { db =>
// Open the db to verify that the state in 4.zip is no corrupted.
- db.load(4)
+ db.load(4, versionToUniqueId.get(4))
}
}
@@ -824,7 +808,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with
SharedSparkSession
enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
versionToUniqueId = versionToUniqueId) { db =>
for (version <- 1 to 30) {
- db.load(version - 1)
+ db.load(version - 1, versionToUniqueId.get(version - 1))
db.put(version.toString, version.toString)
db.remove((version - 1).toString)
db.commit()
@@ -842,11 +826,11 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
versionToUniqueId = versionToUniqueId) { db =>
for (version <- 1 to 30) {
- db.load(version)
+ db.load(version, versionToUniqueId.get(version))
assert(db.iterator().map(toStr).toSet === Set((version.toString,
version.toString)))
}
for (version <- 30 to 60) {
- db.load(version - 1)
+ db.load(version - 1, versionToUniqueId.get(version - 1))
db.put(version.toString, version.toString)
db.remove((version - 1).toString)
db.commit()
@@ -854,13 +838,13 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
assert(snapshotVersionsPresent(remoteDir) === (1 to 30))
assert(changelogVersionsPresent(remoteDir) === (30 to 60))
for (version <- 1 to 60) {
- db.load(version, readOnly = true)
+ db.load(version, versionToUniqueId.get(version), readOnly = true)
assert(db.iterator().map(toStr).toSet === Set((version.toString,
version.toString)))
}
// recommit 60 to ensure that acquireLock is released for maintenance
for (version <- 60 to 60) {
- db.load(version - 1)
+ db.load(version - 1, versionToUniqueId.get(version - 1))
db.put(version.toString, version.toString)
db.remove((version - 1).toString)
db.commit()
@@ -877,12 +861,116 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
// Verify the content of retained versions.
for (version <- 30 to 60) {
- db.load(version, readOnly = true)
+ db.load(version, versionToUniqueId.get(version), readOnly = true)
assert(db.iterator().map(toStr).toSet === Set((version.toString,
version.toString)))
}
}
}
+ testWithChangelogCheckpointingEnabled("RocksDB Fault Tolerance: correctly
handle when there " +
+ "are multiple snapshot files for the same version") {
+ val enableStateStoreCheckpointIds = true
+ val useColumnFamily = true
+ val remoteDir = Utils.createTempDir().toString
+ new File(remoteDir).delete() // to make sure that the directory gets
created
+ val enableChangelogCheckpointingConf =
+ dbConf.copy(enableChangelogCheckpointing = true, minVersionsToRetain =
20,
+ minDeltasForSnapshot = 3)
+
+ // Simulate when there are multiple snapshot files for the same version
+ // The first DB writes to version 0 with uniqueId
+ val versionToUniqueId1 = new mutable.HashMap[Long, String]()
+ withDB(remoteDir, conf = enableChangelogCheckpointingConf,
+ useColumnFamilies = useColumnFamily,
+ enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
+ versionToUniqueId = versionToUniqueId1) { db =>
+ db.load(0, versionToUniqueId1.get(0))
+ db.put("a", "1") // write key a here
+ db.commit()
+
+ // Add some change log files after the snapshot
+ for (version <- 2 to 5) {
+ db.load(version - 1, versionToUniqueId1.get(version - 1))
+ db.put(version.toString, version.toString) // update "1" -> "1", "2"
-> "2", ...
+ db.commit()
+ }
+
+ // doMaintenance uploads the snapshot
+ db.doMaintenance()
+
+ for (version <- 6 to 10) {
+ db.load(version - 1, versionToUniqueId1.get(version - 1))
+ db.put(version.toString, version.toString)
+ db.commit()
+ }
+ }
+
+ // versionToUniqueId1 should be non-empty, meaning the id is updated from
rocksDB to the map
+ assert(versionToUniqueId1.nonEmpty)
+
+ // The second DB writes to version 0 with another uniqueId
+ val versionToUniqueId2 = new mutable.HashMap[Long, String]()
+ withDB(remoteDir, conf = enableChangelogCheckpointingConf,
+ useColumnFamilies = useColumnFamily,
+ enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
+ versionToUniqueId = versionToUniqueId2) { db =>
+ db.load(0, versionToUniqueId2.get(0))
+ db.put("b", "2") // write key b here
+ db.commit()
+ // Add some change log files after the snapshot
+ for (version <- 2 to 5) {
+ db.load(version - 1, versionToUniqueId2.get(version - 1))
+ db.put(version.toString, (version + 1).toString) // update "1" -> "2",
"2" -> "3", ...
+ db.commit()
+ }
+
+ // doMaintenance uploads the snapshot
+ db.doMaintenance()
+
+ for (version <- 6 to 10) {
+ db.load(version - 1, versionToUniqueId2.get(version - 1))
+ db.put(version.toString, (version + 1).toString)
+ db.commit()
+ }
+ }
+
+ // versionToUniqueId2 should be non-empty, meaning the id is updated from
rocksDB to the map
+ assert(versionToUniqueId2.nonEmpty)
+
+ // During a load() with linage from the first rocksDB,
+ // the DB should load with data in the first db
+ withDB(remoteDir, conf = enableChangelogCheckpointingConf,
+ useColumnFamilies = useColumnFamily,
+ enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
+ versionToUniqueId = versionToUniqueId1) { db =>
+ db.load(10, versionToUniqueId1.get(10))
+ assert(toStr(db.get("a")) === "1")
+ for (version <- 2 to 10) {
+ // first time we write version -> version
+ // second time we write version -> version + 1
+ // here since we are loading from the first db lineage, we should see
version -> version
+ assert(toStr(db.get(version.toString)) === version.toString)
+ }
+ }
+
+ // During a load() with linage from the second rocksDB,
+ // the DB should load with data in the second db
+ withDB(remoteDir, conf = enableChangelogCheckpointingConf,
+ useColumnFamilies = useColumnFamily,
+ enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
+ versionToUniqueId = versionToUniqueId2) { db =>
+ db.load(10, versionToUniqueId2.get(10))
+ assert(toStr(db.get("b")) === "2")
+ for (version <- 2 to 10) {
+ // first time we write version -> version
+ // second time we write version -> version + 1
+ // here since we are loading from the second db lineage,
+ // we should see version -> version + 1
+ assert(toStr(db.get(version.toString)) === (version + 1).toString)
+ }
+ }
+ }
+
// A rocksdb instance with changelog checkpointing disabled should be able
to load
// an existing checkpoint with changelog.
testWithStateStoreCheckpointIdsAndColumnFamilies(
@@ -899,7 +987,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with
SharedSparkSession
enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
versionToUniqueId = versionToUniqueId) { db =>
for (version <- 1 to 30) {
- db.load(version - 1)
+ db.load(version - 1, versionToUniqueId.get(version - 1))
db.put(version.toString, version.toString)
db.remove((version - 1).toString)
db.commit()
@@ -916,11 +1004,11 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
versionToUniqueId = versionToUniqueId) { db =>
for (version <- 1 to 30) {
- db.load(version)
+ db.load(version, versionToUniqueId.get(version))
assert(db.iterator().map(toStr).toSet === Set((version.toString,
version.toString)))
}
for (version <- 31 to 60) {
- db.load(version - 1)
+ db.load(version - 1, versionToUniqueId.get(version - 1))
db.put(version.toString, version.toString)
db.remove((version - 1).toString)
db.commit()
@@ -928,7 +1016,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
assert(changelogVersionsPresent(remoteDir) === (1 to 30))
assert(snapshotVersionsPresent(remoteDir) === (31 to 60))
for (version <- 1 to 60) {
- db.load(version, readOnly = true)
+ db.load(version, versionToUniqueId.get(version), readOnly = true)
assert(db.iterator().map(toStr).toSet === Set((version.toString,
version.toString)))
}
// Check that snapshots and changelogs get purged correctly.
@@ -937,7 +1025,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
assert(changelogVersionsPresent(remoteDir) === Seq.empty)
// Verify the content of retained versions.
for (version <- 41 to 60) {
- db.load(version, readOnly = true)
+ db.load(version, versionToUniqueId.get(version), readOnly = true)
assert(db.iterator().map(toStr).toSet === Set((version.toString,
version.toString)))
}
}
@@ -1030,7 +1118,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
assert(toStr(db.get("b")) === "2")
assert(db.iterator().map(toStr).toSet === Set(("a", "1"), ("b",
"2")))
- db.load(1)
+ db.load(1, versionToUniqueId.get(1))
assert(toStr(db.get("b")) === null)
assert(db.iterator().map(toStr).toSet === Set(("a", "1")))
}
@@ -1847,12 +1935,12 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
// DB has been loaded so current thread has already
// acquired the lock on the RocksDB instance
- db.load(0) // Current thread should be able to load again
+ db.load(0, versionToUniqueId.get(0)) // Current thread should be able
to load again
// Another thread should not be able to load while current thread is
using it
var ex = intercept[SparkException] {
ThreadUtils.runInNewThread("concurrent-test-thread-1") {
- db.load(0)
+ db.load(0, versionToUniqueId.get(0))
}
}
checkError(
@@ -1872,15 +1960,15 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
// Commit should release the instance allowing other threads to load
new version
db.commit()
ThreadUtils.runInNewThread("concurrent-test-thread-2") {
- db.load(1)
+ db.load(1, versionToUniqueId.get(1))
db.commit()
}
// Another thread should not be able to load while current thread is
using it
- db.load(2)
+ db.load(2, versionToUniqueId.get(2))
ex = intercept[SparkException] {
ThreadUtils.runInNewThread("concurrent-test-thread-2") {
- db.load(2)
+ db.load(2, versionToUniqueId.get(2))
}
}
checkError(
@@ -1900,7 +1988,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
// Rollback should release the instance allowing other threads to load
new version
db.rollback()
ThreadUtils.runInNewThread("concurrent-test-thread-3") {
- db.load(1)
+ db.load(1, versionToUniqueId.get(1))
db.commit()
}
}
@@ -2354,14 +2442,14 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
withDB(remoteDir, conf = conf, enableStateStoreCheckpointIds =
enableStateStoreCheckpointIds,
versionToUniqueId = versionToUniqueId) { db =>
for (version <- 0 to 1) {
- db.load(version)
+ db.load(version, versionToUniqueId.get(version))
db.put(version.toString, version.toString)
db.commit()
}
// upload snapshot 2.zip
db.doMaintenance()
for (version <- Seq(2)) {
- db.load(version)
+ db.load(version, versionToUniqueId.get(version))
db.put(version.toString, version.toString)
db.commit()
}
@@ -2379,16 +2467,16 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
}
db1.doMaintenance()
}
- db.load(2)
+ db.load(2, versionToUniqueId.get(2))
for (version <- Seq(2)) {
- db.load(version)
+ db.load(version, versionToUniqueId.get(version))
db.put(version.toString, version.toString)
db.commit()
}
// upload snapshot 3.zip
db.doMaintenance()
// rollback to version 2
- db.load(2)
+ db.load(2, versionToUniqueId.get(2))
}
}
@@ -2403,18 +2491,18 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
withDB(remoteDir, conf = conf, enableStateStoreCheckpointIds =
enableStateStoreCheckpointIds,
versionToUniqueId = versionToUniqueId) { db =>
for (version <- 0 to 1) {
- db.load(version)
+ db.load(version, versionToUniqueId.get(version))
db.put(version.toString, version.toString)
db.commit()
}
// upload snapshot 2.zip
db.doMaintenance()
for (version <- 2 to 3) {
- db.load(version)
+ db.load(version, versionToUniqueId.get(version))
db.put(version.toString, version.toString)
db.commit()
}
- db.load(0)
+ db.load(0, versionToUniqueId.get(0))
// simulate db in another executor that override the zip file
// In checkpoint V2, reusing the same versionToUniqueId to simulate
when two executors
// are scheduled with the same uniqueId in the same microbatch
@@ -2429,7 +2517,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
db1.doMaintenance()
}
for (version <- 2 to 3) {
- db.load(version)
+ db.load(version, versionToUniqueId.get(version))
db.put(version.toString, version.toString)
db.commit()
}
@@ -2450,14 +2538,14 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
withDB(remoteDir, conf = conf, enableStateStoreCheckpointIds =
enableStateStoreCheckpointIds,
versionToUniqueId = versionToUniqueId) { db =>
for (version <- 0 to 2) {
- db.load(version)
+ db.load(version, versionToUniqueId.get(version))
db.put(version.toString, version.toString)
db.commit()
}
// upload snapshot 2.zip
db.doMaintenance()
for (version <- 1 to 3) {
- db.load(version)
+ db.load(version, versionToUniqueId.get(version))
db.put(version.toString, version.toString)
db.commit()
}
@@ -2478,13 +2566,13 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
withDB(remoteDir, conf = conf, enableStateStoreCheckpointIds =
enableStateStoreCheckpointIds,
versionToUniqueId = versionToUniqueId) { db =>
for (version <- 0 to 1) {
- db.load(version)
+ db.load(version, versionToUniqueId.get(version))
db.put(version.toString, version.toString)
db.commit()
}
// load previous version, and recreate the snapshot
- db.load(1)
+ db.load(1, versionToUniqueId.get(1))
db.put("3", "3")
// upload any latest snapshots so far
@@ -2514,12 +2602,12 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
withDB(remoteDir, conf = conf, hadoopConf = hadoopConf,
enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
versionToUniqueId = versionToUniqueId) { db =>
- db.load(0)
+ db.load(0, versionToUniqueId.get(0))
db.put("a", "1")
db.commit()
// load previous version, will recreate snapshot on commit
- db.load(0)
+ db.load(0, versionToUniqueId.get(0))
db.put("a", "1")
// upload version 1 snapshot created previously
@@ -2565,13 +2653,13 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
// This test was accidentally fixed by
// SPARK-48931 (https://github.com/apache/spark/pull/47393)
- db.load(0)
+ db.load(0, versionToUniqueId.get(0))
db.put("foo", "bar")
// Snapshot checkpoint not needed
db.commit()
// Continue using local DB
- db.load(1)
+ db.load(1, versionToUniqueId.get(1))
db.put("foo", "bar")
// Should create a local RocksDB snapshot
db.commit()
@@ -2579,19 +2667,19 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
db.doMaintenance()
// This will reload Db from the cloud.
- db.load(1)
+ db.load(1, versionToUniqueId.get(1))
db.put("foo", "bar")
// Should create another local snapshot
db.commit()
// Continue using local DB
- db.load(2)
+ db.load(2, versionToUniqueId.get(2))
db.put("foo", "bar")
// Snapshot checkpoint not needed
db.commit()
// Reload DB from the cloud, loading from 2.zip
- db.load(2)
+ db.load(2, versionToUniqueId.get(2))
db.put("foo", "bar")
// Snapshot checkpoint not needed
db.commit()
@@ -2600,14 +2688,14 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
db.doMaintenance()
// Reload new 2.zip just uploaded to validate it is not corrupted.
- db.load(2)
+ db.load(2, versionToUniqueId.get(2))
db.put("foo", "bar")
db.commit()
// Test the maintenance thread is delayed even after the next snapshot
is created.
// There will be two outstanding snapshots.
for (batchVersion <- 3 to 6) {
- db.load(batchVersion)
+ db.load(batchVersion, versionToUniqueId.get(batchVersion))
db.put("foo", "bar")
// In batchVersion 3 and 5, it will generate a local snapshot but
won't be uploaded.
db.commit()
@@ -2618,7 +2706,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
// maintenance tasks finish quickly.
for (batchVersion <- 7 to 10) {
for (j <- 0 to 1) {
- db.load(batchVersion)
+ db.load(batchVersion, versionToUniqueId.get(batchVersion))
db.put("foo", "bar")
db.commit()
db.doMaintenance()
@@ -2649,7 +2737,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
val random = new Random(randomSeed)
var curVer: Int = 0
for (i <- 1 to 100) {
- db.load(curVer)
+ db.load(curVer, versionToUniqueId.get(curVer))
db.put("foo", "bar")
db.commit()
// For a one in five chance, maintenance task is executed. The
chance is created to
@@ -2702,33 +2790,33 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
versionToUniqueId = versionToUniqueId) { db2 =>
// commit version 1 via db1
- db1.load(0)
+ db1.load(0, versionToUniqueId.get(0))
db1.put("a", "1")
db1.put("b", "1")
db1.commit()
// commit version 1 via db2
- db2.load(0)
+ db2.load(0, versionToUniqueId.get(0))
db2.put("a", "1")
db2.put("b", "1")
db2.commit()
// commit version 2 via db2
- db2.load(1)
+ db2.load(1, versionToUniqueId.get(1))
db2.put("a", "2")
db2.put("b", "2")
db2.commit()
// reload version 1, this should succeed
- db2.load(1)
- db1.load(1)
+ db2.load(1, versionToUniqueId.get(1))
+ db1.load(1, versionToUniqueId.get(1))
// reload version 2, this should succeed
- db2.load(2)
- db1.load(2)
+ db2.load(2, versionToUniqueId.get(2))
+ db1.load(2, versionToUniqueId.get(2))
}
}
}
@@ -2748,33 +2836,33 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
versionToUniqueId = versionToUniqueId) { db2 =>
// commit version 1 via db1
- db1.load(0)
+ db1.load(0, versionToUniqueId.get(0))
db1.put("a", "1")
db1.put("b", "1")
db1.commit()
// commit version 1 via db2
- db2.load(0)
+ db2.load(0, versionToUniqueId.get(0))
db2.put("a", "1")
db2.put("b", "1")
db2.commit()
// commit version 2 via db2
- db2.load(1)
+ db2.load(1, versionToUniqueId.get(1))
db2.put("a", "2")
db2.put("b", "2")
db2.commit()
// reload version 1, this should succeed
- db2.load(1)
- db1.load(1)
+ db2.load(1, versionToUniqueId.get(1))
+ db1.load(1, versionToUniqueId.get(1))
// reload version 2, this should succeed
- db2.load(2)
- db1.load(2)
+ db2.load(2, versionToUniqueId.get(2))
+ db1.load(2, versionToUniqueId.get(2))
}
}
}
@@ -2798,33 +2886,33 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
versionToUniqueId = versionToUniqueId) { db2 =>
// commit version 1 via db2
- db2.load(0)
+ db2.load(0, versionToUniqueId.get(0))
db2.put("a", "1")
db2.put("b", "1")
db2.commit()
// commit version 1 via db1
- db1.load(0)
+ db1.load(0, versionToUniqueId.get(0))
db1.put("a", "1")
db1.put("b", "1")
db1.commit()
// commit version 2 via db2
- db2.load(1)
+ db2.load(1, versionToUniqueId.get(1))
db2.put("a", "2")
db2.put("b", "2")
db2.commit()
// reload version 1, this should succeed
- db2.load(1)
- db1.load(1)
+ db2.load(1, versionToUniqueId.get(1))
+ db1.load(1, versionToUniqueId.get(1))
// reload version 2, this should succeed
- db2.load(2)
- db1.load(2)
+ db2.load(2, versionToUniqueId.get(2))
+ db1.load(2, versionToUniqueId.get(2))
}
}
}
@@ -2844,33 +2932,33 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
versionToUniqueId = versionToUniqueId) { db2 =>
// commit version 1 via db2
- db2.load(0)
+ db2.load(0, versionToUniqueId.get(0))
db2.put("a", "1")
db2.put("b", "1")
db2.commit()
// commit version 1 via db1
- db1.load(0)
+ db1.load(0, versionToUniqueId.get(0))
db1.put("a", "1")
db1.put("b", "1")
db1.commit()
// commit version 2 via db2
- db2.load(1)
+ db2.load(1, versionToUniqueId.get(1))
db2.put("a", "2")
db2.put("b", "2")
db2.commit()
// reload version 1, this should succeed
- db2.load(1)
- db1.load(1)
+ db2.load(1, versionToUniqueId.get(1))
+ db1.load(1, versionToUniqueId.get(1))
// reload version 2, this should succeed
- db2.load(2)
- db1.load(2)
+ db2.load(2, versionToUniqueId.get(2))
+ db1.load(2, versionToUniqueId.get(2))
}
}
}
@@ -3163,7 +3251,13 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
version: Long,
ckptId: Option[String] = None,
readOnly: Boolean = false): RocksDB = {
- super.load(version, versionToUniqueId.get(version), readOnly)
+ // When a ckptId is defined, it means the test is explicitly using v2
semantic
+ // When it is not, it is possible that implicitly uses it.
+ // So still do a versionToUniqueId.get
+ ckptId match {
+ case Some(_) => super.load(version, ckptId, readOnly)
+ case None => super.load(version, versionToUniqueId.get(version),
readOnly)
+ }
}
override def commit(): Long = {
@@ -3184,6 +3278,11 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
hadoopConf: Configuration = hadoopConf,
useColumnFamilies: Boolean = false,
enableStateStoreCheckpointIds: Boolean = false,
+ // versionToUniqueId is used in checkpoint format v2, it simulates the
lineage
+ // stored in the commit log. The lineage will be automatically updated
in db.commit()
+ // When testing V2, please create a versionToUniqueId map
+ // and call versionToUniqueId.get(version) in the db.load() function.
+ // In V1, versionToUniqueId is not used and
versionToUniqueId.get(version) returns None.
versionToUniqueId : mutable.Map[Long, String] = mutable.Map[Long,
String](),
localDir: File = Utils.createTempDir())(
func: RocksDB => T): T = {
@@ -3207,7 +3306,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
loggingId = s"[Thread-${Thread.currentThread.getId}]",
useColumnFamilies = useColumnFamilies)
}
- db.load(version)
+ db.load(version, versionToUniqueId.get(version))
func(db)
} finally {
if (db != null) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]