This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ff87c0958f7 [SPARK-45419][SS] Avoid reusing rocksdb sst files in a dfferent rocksdb instance ff87c0958f7 is described below commit ff87c0958f79b16c2f276e0dc53a855fca558347 Author: Chaoqin Li <chaoqin...@databricks.com> AuthorDate: Tue Oct 10 11:03:19 2023 +0900 [SPARK-45419][SS] Avoid reusing rocksdb sst files in a dfferent rocksdb instance ### What changes were proposed in this pull request? When loading a rocksdb instance, remove file version map entry of larger versions to avoid rocksdb sst file unique id mismatch exception. The SST files in larger versions can't be reused even if they have the same size and name because they belong to another rocksdb instance. ### Why are the changes needed? Avoid rocksdb file mismatch exception that may occur in runtime. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add rocksdb unit test. Closes #43174 from chaoqin-li1123/rocksdb_mismatch. Authored-by: Chaoqin Li <chaoqin...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../streaming/state/RocksDBFileManager.scala | 4 +++ .../execution/streaming/state/RocksDBSuite.scala | 29 ++++++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index 05fd845accb..eae9aac3c0a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -208,6 +208,10 @@ class RocksDBFileManager( */ def loadCheckpointFromDfs(version: Long, localDir: File): RocksDBCheckpointMetadata = { logInfo(s"Loading checkpoint files for version $version") + // The unique ids of SST files are checked when opening a rocksdb instance. The SST files + // in larger versions can't be reused even if they have the same size and name because + // they belong to another rocksdb instance. + versionToRocksDBFiles.keySet().removeIf(_ >= version) val metadata = if (version == 0) { if (localDir.exists) Utils.deleteRecursively(localDir) localDir.mkdirs() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index b4b67f381d2..764358dc1f0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -240,6 +240,35 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared } } + testWithChangelogCheckpointingEnabled("SPARK-45419: Do not reuse SST files" + + " in different RocksDB instances") { + val remoteDir = Utils.createTempDir().toString + val conf = dbConf.copy(minDeltasForSnapshot = 0, compactOnCommit = false) + new File(remoteDir).delete() // to make sure that the directory gets created + withDB(remoteDir, conf = conf) { db => + for (version <- 0 to 2) { + db.load(version) + db.put(version.toString, version.toString) + db.commit() + } + // upload snapshot 3.zip + db.doMaintenance() + // Roll back to version 1 and start to process data. + for (version <- 1 to 3) { + db.load(version) + db.put(version.toString, version.toString) + db.commit() + } + // Upload snapshot 4.zip, should not reuse the SST files in 3.zip + db.doMaintenance() + } + + withDB(remoteDir, conf = conf) { db => + // Open the db to verify that the state in 4.zip is no corrupted. + db.load(4) + } + } + // A rocksdb instance with changelog checkpointing enabled should be able to load // an existing checkpoint without changelog. testWithChangelogCheckpointingEnabled( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org