This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new ac4b9154b58 [SPARK-45419][SS] Avoid reusing rocksdb sst files in a dfferent rocksdb instance ac4b9154b58 is described below commit ac4b9154b5822779023e66f2efb24d05e20b1cca Author: Chaoqin Li <chaoqin...@databricks.com> AuthorDate: Tue Oct 10 11:03:19 2023 +0900 [SPARK-45419][SS] Avoid reusing rocksdb sst files in a dfferent rocksdb instance ### What changes were proposed in this pull request? When loading a rocksdb instance, remove file version map entry of larger versions to avoid rocksdb sst file unique id mismatch exception. The SST files in larger versions can't be reused even if they have the same size and name because they belong to another rocksdb instance. ### Why are the changes needed? Avoid rocksdb file mismatch exception that may occur in runtime. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add rocksdb unit test. Closes #43174 from chaoqin-li1123/rocksdb_mismatch. Authored-by: Chaoqin Li <chaoqin...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../streaming/state/RocksDBFileManager.scala | 4 +++ .../execution/streaming/state/RocksDBSuite.scala | 29 ++++++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index 0891d773713..faf9cd701ae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -207,6 +207,10 @@ class RocksDBFileManager( */ def loadCheckpointFromDfs(version: Long, localDir: File): RocksDBCheckpointMetadata = { logInfo(s"Loading checkpoint files for version $version") + // The unique ids of SST files are checked when opening a rocksdb instance. The SST files + // in larger versions can't be reused even if they have the same size and name because + // they belong to another rocksdb instance. + versionToRocksDBFiles.keySet().removeIf(_ >= version) val metadata = if (version == 0) { if (localDir.exists) Utils.deleteRecursively(localDir) localDir.mkdirs() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index e31b05c362f..91dd8582207 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -214,6 +214,35 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared } } + testWithChangelogCheckpointingEnabled("SPARK-45419: Do not reuse SST files" + + " in different RocksDB instances") { + val remoteDir = Utils.createTempDir().toString + val conf = dbConf.copy(minDeltasForSnapshot = 0, compactOnCommit = false) + new File(remoteDir).delete() // to make sure that the directory gets created + withDB(remoteDir, conf = conf) { db => + for (version <- 0 to 2) { + db.load(version) + db.put(version.toString, version.toString) + db.commit() + } + // upload snapshot 3.zip + db.doMaintenance() + // Roll back to version 1 and start to process data. + for (version <- 1 to 3) { + db.load(version) + db.put(version.toString, version.toString) + db.commit() + } + // Upload snapshot 4.zip, should not reuse the SST files in 3.zip + db.doMaintenance() + } + + withDB(remoteDir, conf = conf) { db => + // Open the db to verify that the state in 4.zip is no corrupted. + db.load(4) + } + } + // A rocksdb instance with changelog checkpointing enabled should be able to load // an existing checkpoint without changelog. testWithChangelogCheckpointingEnabled( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org