This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new ac4b9154b58 [SPARK-45419][SS] Avoid reusing rocksdb sst files in a
dfferent rocksdb instance
ac4b9154b58 is described below
commit ac4b9154b5822779023e66f2efb24d05e20b1cca
Author: Chaoqin Li <[email protected]>
AuthorDate: Tue Oct 10 11:03:19 2023 +0900
[SPARK-45419][SS] Avoid reusing rocksdb sst files in a dfferent rocksdb
instance
### What changes were proposed in this pull request?
When loading a rocksdb instance, remove file version map entry of larger
versions to avoid rocksdb sst file unique id mismatch exception. The SST files
in larger versions can't be reused even if they have the same size and name
because they belong to another rocksdb instance.
### Why are the changes needed?
Avoid rocksdb file mismatch exception that may occur in runtime.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Add rocksdb unit test.
Closes #43174 from chaoqin-li1123/rocksdb_mismatch.
Authored-by: Chaoqin Li <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../streaming/state/RocksDBFileManager.scala | 4 +++
.../execution/streaming/state/RocksDBSuite.scala | 29 ++++++++++++++++++++++
2 files changed, 33 insertions(+)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index 0891d773713..faf9cd701ae 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -207,6 +207,10 @@ class RocksDBFileManager(
*/
def loadCheckpointFromDfs(version: Long, localDir: File):
RocksDBCheckpointMetadata = {
logInfo(s"Loading checkpoint files for version $version")
+ // The unique ids of SST files are checked when opening a rocksdb
instance. The SST files
+ // in larger versions can't be reused even if they have the same size and
name because
+ // they belong to another rocksdb instance.
+ versionToRocksDBFiles.keySet().removeIf(_ >= version)
val metadata = if (version == 0) {
if (localDir.exists) Utils.deleteRecursively(localDir)
localDir.mkdirs()
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index e31b05c362f..91dd8582207 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -214,6 +214,35 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
}
}
+ testWithChangelogCheckpointingEnabled("SPARK-45419: Do not reuse SST files" +
+ " in different RocksDB instances") {
+ val remoteDir = Utils.createTempDir().toString
+ val conf = dbConf.copy(minDeltasForSnapshot = 0, compactOnCommit = false)
+ new File(remoteDir).delete() // to make sure that the directory gets
created
+ withDB(remoteDir, conf = conf) { db =>
+ for (version <- 0 to 2) {
+ db.load(version)
+ db.put(version.toString, version.toString)
+ db.commit()
+ }
+ // upload snapshot 3.zip
+ db.doMaintenance()
+ // Roll back to version 1 and start to process data.
+ for (version <- 1 to 3) {
+ db.load(version)
+ db.put(version.toString, version.toString)
+ db.commit()
+ }
+ // Upload snapshot 4.zip, should not reuse the SST files in 3.zip
+ db.doMaintenance()
+ }
+
+ withDB(remoteDir, conf = conf) { db =>
+ // Open the db to verify that the state in 4.zip is no corrupted.
+ db.load(4)
+ }
+ }
+
// A rocksdb instance with changelog checkpointing enabled should be able to
load
// an existing checkpoint without changelog.
testWithChangelogCheckpointingEnabled(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]