This is an automated email from the ASF dual-hosted git repository.
ashrigondekar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7d69f8f96180 [SPARK-55820][SS] Fix race condition in no-overwrite FS
when RocksDB version files cache is out of sync
7d69f8f96180 is described below
commit 7d69f8f96180762082dec569741180c74f48bb18
Author: Livia Zhu <[email protected]>
AuthorDate: Tue Mar 10 17:27:28 2026 -0700
[SPARK-55820][SS] Fix race condition in no-overwrite FS when RocksDB
version files cache is out of sync
### What changes were proposed in this pull request?
There exists the following race condition on no-overwrite filesystems if
minVersionsToRetain <= minDeltasForSnapshot:
1. Query run 1 uploads snapshot X.zip pointing to SST file Y.SST
2. Query run 1 is cancelled before the commit log is written
3. Query run 2 retries the batch, uploads Z.SST, tries to re-upload X.zip
pointing to Z.SST. The zip overwrite silently fails on no-overwrite FS, but
versionToRocksDBFiles maps version X -> Z.SST (stale)
4. Maintenance/cleanup uses the stale in-memory mapping, sees Y.SST as
untracked, and deletes it
5. A subsequent query run tries to load X.zip from cloud, which still
references Y.SST -> FileNotFoundException
This change fixes the race condition by opening the min retained version on
DFS and not cleaning up the files referenced there rather than relying on the
cache.
### Why are the changes needed?
Fix race condition leading to FileNotFound error
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
New unit test
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude 4.5
Closes #54602 from liviazhu/liviazhu-db/rocksdb-cleanup-fix.
Authored-by: Livia Zhu <[email protected]>
Signed-off-by: Anish Shrigondekar <[email protected]>
---
.../streaming/state/RocksDBFileManager.scala | 19 ++-
.../execution/streaming/state/RocksDBSuite.scala | 152 +++++++++++++++++++++
2 files changed, 169 insertions(+), 2 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index 7135421f4866..8d63b7b250c3 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -680,13 +680,28 @@ class RocksDBFileManager(
// Resolve RocksDB files for all the versions and find the max version
each file is used
val fileToMaxUsedVersion = new mutable.HashMap[String, Long]
sortedSnapshotVersionsAndUniqueIds.foreach { case (version, uniqueId) =>
- val files = Option(versionToRocksDBFiles.get((version,
uniqueId))).getOrElse {
+ var readFromCache = true
+ val cachedFiles = Option(versionToRocksDBFiles.get((version,
uniqueId))).getOrElse {
+ readFromCache = false
val newResolvedFiles = getImmutableFilesFromVersionZip(version,
uniqueId)
versionToRocksDBFiles.put((version, uniqueId), newResolvedFiles)
newResolvedFiles
}
- files.foreach(f => fileToMaxUsedVersion(f.dfsFileName) =
+ cachedFiles.foreach(f => fileToMaxUsedVersion(f.dfsFileName) =
math.max(version, fileToMaxUsedVersion.getOrElse(f.dfsFileName,
version)))
+
+ // For ckpt v1, for the min retained version, fetch metadata from cloud
zip
+ // to protect files that may differ from the in-memory cache. This
handles the case
+ // where a no-overwrite FS prevented a snapshot zip from being
overwritten: the
+ // in-memory cache may reference a newer set of SST files (from a
retry), while the
+ // cloud zip still references the original SST files.
+ // We do this for the min retained version, to make sure the files from
the original
+ // upload for this version are not seen as orphan files.
+ if (uniqueId.isEmpty && version == minVersionToRetain && readFromCache) {
+ val cloudFiles = getImmutableFilesFromVersionZip(version, uniqueId)
+ cloudFiles.foreach(f => fileToMaxUsedVersion(f.dfsFileName) =
+ math.max(version, fileToMaxUsedVersion.getOrElse(f.dfsFileName,
version)))
+ }
}
// Best effort attempt to delete SST files that were last used in
to-be-deleted versions
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index bb02dad76ac6..8be2b610d099 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -4109,6 +4109,158 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
}
}
+ // Test ensuring a race condition on no-overwrite filesystems (e.g., ABFS)
+ // does not occur:
+ // 1. Query run 1 uploads snapshot X.zip pointing to SST file Y.SST
+ // 2. Query run 1 is cancelled before the commit log is written
+ // 3. Query run 2 retries the batch, uploads Z.SST, tries to re-upload X.zip
+ // pointing to Z.SST. The zip overwrite silently fails on no-overwrite FS,
+ // but versionToRocksDBFiles maps version X -> Z.SST (stale)
+ // 4. Maintenance/cleanup uses the stale in-memory mapping, sees Y.SST as
+ // untracked, and deletes it
+ // 5. A subsequent query run tries to load X.zip from cloud, which still
+ // references Y.SST -> FileNotFoundException
+ testWithChangelogCheckpointingEnabled("no-overwrite FS maintenance " +
+ "does not delete SST files still referenced by zip in DFS") {
+ withTempDir { dir =>
+ val remoteDir = dir.getCanonicalPath
+ val fmClass = "org.apache.spark.sql.execution.streaming.state." +
+ "NoOverwriteFileSystemBasedCheckpointFileManager"
+ val noOverwriteConf = new Configuration()
+ noOverwriteConf.set(STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key,
fmClass)
+ noOverwriteConf.set(StreamExecution.RUN_ID_KEY,
UUID.randomUUID().toString)
+
+ // Snapshots at versions 10, 20, 30. With minVersionsToRetain = 2,
+ // minVersionToRetain = 20, so version 10 is deleted and cleanup runs.
+ // The fix fetches cloud metadata for the min retained version (20),
+ // protecting run 1's SSTs referenced by the on-disk 20.zip.
+ val conf = dbConf.copy(
+ compactOnCommit = false,
+ minVersionsToRetain = 2,
+ minVersionsToDelete = 0,
+ minDeltasForSnapshot = 10)
+
+ // Phase 1: versions 1-19 (snapshot at 10, rest changelog-only).
+ val localDir0 = Utils.createTempDir()
+ val db0 = new RocksDB(
+ remoteDir, conf = conf, localRootDir = localDir0,
+ hadoopConf = noOverwriteConf,
+ loggingId = s"[Thread-${Thread.currentThread.getId}]")
+ try {
+ db0.load(0)
+ db0.put("setup_key", "setup_value")
+ db0.commit() // version 1
+ db0.doMaintenance()
+ for (v <- 2 to 19) {
+ db0.load(v - 1)
+ db0.put(s"setup_key_v$v", s"setup_value_v$v")
+ db0.commit() // snapshot at v=10, changelog-only otherwise
+ db0.doMaintenance()
+ }
+ } finally {
+ db0.close()
+ }
+
+ val sstDir = new File(remoteDir, "SSTs")
+ val setupSstFiles = if (sstDir.exists()) {
+
sstDir.listFiles().filter(_.getName.endsWith(".sst")).map(_.getName).toSet
+ } else {
+ Set.empty[String]
+ }
+
+ // Phase 2: Run 1 commits version 20, creating 20.zip with run 1's SSTs.
+ val localDir1 = Utils.createTempDir()
+ val db1 = new RocksDB(
+ remoteDir, conf = conf, localRootDir = localDir1,
+ hadoopConf = noOverwriteConf,
+ loggingId = s"[Thread-${Thread.currentThread.getId}]")
+ try {
+ db1.load(19)
+ db1.put("key", "value_from_run1")
+ db1.commit() // version 20 -- snapshot queued
+ db1.doMaintenance() // uploads 20.zip + run 1's SST files
+ } finally {
+ db1.close()
+ }
+
+ // Verify 20.zip was created
+ val zipFilesAfterRun1 = new File(remoteDir).listFiles()
+ .filter(_.getName.endsWith(".zip"))
+ assert(zipFilesAfterRun1.exists(_.getName.startsWith("20")),
+ s"Expected 20.zip after query run 1, found: " +
+ s"${zipFilesAfterRun1.map(_.getName).mkString(", ")}")
+
+ // Identify query run 1's SST files
+ val sstFilesAfterRun1 = sstDir.listFiles()
+ .filter(_.getName.endsWith(".sst")).map(_.getName).toSet
+ val run1SstFiles = sstFilesAfterRun1 -- setupSstFiles
+ assert(run1SstFiles.nonEmpty,
+ "Expected new SST files from query run 1")
+
+ // Phase 3: Run 2 (retry) commits version 20 again. 20.zip is not
overwritten,
+ // but the in-memory cache now maps version 20 to run 2's SSTs.
+ val localDir2 = Utils.createTempDir()
+ val db2 = new RocksDB(
+ remoteDir,
+ conf = conf,
+ localRootDir = localDir2,
+ hadoopConf = noOverwriteConf,
+ loggingId = s"[Thread-${Thread.currentThread.getId}]")
+ try {
+ db2.load(19)
+ db2.put("key", "value_from_run2")
+ db2.commit() // version 20 -- run 2's SSTs created, snapshot queued
+ db2.doMaintenance() // run 2's SSTs uploaded, 20.zip silently not
overwritten
+
+ val sstFilesAfterRun2 = sstDir.listFiles()
+ .filter(_.getName.endsWith(".sst")).map(_.getName).toSet
+ val run2SstFiles = sstFilesAfterRun2 -- sstFilesAfterRun1
+ assert(run2SstFiles.nonEmpty,
+ "Expected new SST files from query run 2")
+
+ // Phase 4: versions 21-30. Snapshot at 30 triggers cleanup
+ // (minVersionToRetain = 20, deletes 10.zip).
+ for (v <- 21 to 30) {
+ db2.load(v - 1)
+ db2.put(s"key_v$v", s"value_v$v")
+ db2.commit()
+ db2.doMaintenance()
+ }
+
+ // Run 1's SSTs survive cleanup: the fix fetches cloud metadata for
+ // version 20, protecting files referenced by the on-disk 20.zip.
+ val sstFilesAfterCleanup = sstDir.listFiles()
+ .filter(_.getName.endsWith(".sst")).map(_.getName).toSet
+ run1SstFiles.foreach { name =>
+ assert(sstFilesAfterCleanup.contains(name),
+ s"Expected run 1 SST file $name to still exist (protected by cloud
" +
+ s"metadata fetch), but it was deleted.")
+ }
+
+ // Phase 5: Fresh instance loads version 29 from 20.zip + changelogs.
+ // Succeeds because run 1's SSTs (referenced by 20.zip) were preserved.
+ val localDir3 = Utils.createTempDir()
+ val db3 = new RocksDB(
+ remoteDir,
+ conf = conf,
+ localRootDir = localDir3,
+ hadoopConf = noOverwriteConf,
+ loggingId = s"[Thread-${Thread.currentThread.getId}]")
+ try {
+ db3.load(29)
+ val value = new String(db3.get("key"), "UTF-8")
+ assert(value == "value_from_run1",
+ s"Expected stale value 'value_from_run1' from 20.zip (run 1's
SSTs), " +
+ s"but got '$value'")
+ } finally {
+ db3.close()
+ }
+ } finally {
+ db2.close()
+ }
+ }
+ }
+
testWithChangelogCheckpointingEnabled("SPARK-52553 - v1 changelog with
invalid version number" +
" does not cause NumberFormatException") {
withTempDir { dir =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]