This is an automated email from the ASF dual-hosted git repository.

ashrigondekar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d69f8f96180 [SPARK-55820][SS] Fix race condition in no-overwrite FS 
when RocksDB version files cache is out of sync
7d69f8f96180 is described below

commit 7d69f8f96180762082dec569741180c74f48bb18
Author: Livia Zhu <[email protected]>
AuthorDate: Tue Mar 10 17:27:28 2026 -0700

    [SPARK-55820][SS] Fix race condition in no-overwrite FS when RocksDB 
version files cache is out of sync
    
    ### What changes were proposed in this pull request?
    
    There exists the following race condition on no-overwrite filesystems if 
minVersionsToRetain <= minDeltasForSnapshot:
    
    1. Query run 1 uploads snapshot X.zip pointing to SST file Y.SST
    2. Query run 1 is cancelled before the commit log is written
    3. Query run 2 retries the batch, uploads Z.SST, tries to re-upload X.zip 
pointing to Z.SST. The zip overwrite silently fails on no-overwrite FS, but 
versionToRocksDBFiles maps version X -> Z.SST (stale)
    4. Maintenance/cleanup uses the stale in-memory mapping, sees Y.SST as 
untracked, and deletes it
    5. A subsequent query run tries to load X.zip from cloud, which still 
references Y.SST -> FileNotFoundException
    
    This change fixes the race condition by opening the min retained version on 
DFS and not cleaning up the files referenced there rather than relying on the 
cache.
    
    ### Why are the changes needed?
    
    Fix race condition leading to FileNotFound error
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    New unit test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Claude 4.5
    
    Closes #54602 from liviazhu/liviazhu-db/rocksdb-cleanup-fix.
    
    Authored-by: Livia Zhu <[email protected]>
    Signed-off-by: Anish Shrigondekar <[email protected]>
---
 .../streaming/state/RocksDBFileManager.scala       |  19 ++-
 .../execution/streaming/state/RocksDBSuite.scala   | 152 +++++++++++++++++++++
 2 files changed, 169 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index 7135421f4866..8d63b7b250c3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -680,13 +680,28 @@ class RocksDBFileManager(
     // Resolve RocksDB files for all the versions and find the max version 
each file is used
     val fileToMaxUsedVersion = new mutable.HashMap[String, Long]
     sortedSnapshotVersionsAndUniqueIds.foreach { case (version, uniqueId) =>
-      val files = Option(versionToRocksDBFiles.get((version, 
uniqueId))).getOrElse {
+      var readFromCache = true
+      val cachedFiles = Option(versionToRocksDBFiles.get((version, 
uniqueId))).getOrElse {
+        readFromCache = false
         val newResolvedFiles = getImmutableFilesFromVersionZip(version, 
uniqueId)
         versionToRocksDBFiles.put((version, uniqueId), newResolvedFiles)
         newResolvedFiles
       }
-      files.foreach(f => fileToMaxUsedVersion(f.dfsFileName) =
+      cachedFiles.foreach(f => fileToMaxUsedVersion(f.dfsFileName) =
         math.max(version, fileToMaxUsedVersion.getOrElse(f.dfsFileName, 
version)))
+
+      // For ckpt v1, for the min retained version, fetch metadata from cloud 
zip
+      // to protect files that may differ from the in-memory cache. This 
handles the case
+      // where a no-overwrite FS prevented a snapshot zip from being 
overwritten: the
+      // in-memory cache may reference a newer set of SST files (from a 
retry), while the
+      // cloud zip still references the original SST files.
+      // We do this for the min retained version, to make sure the files from 
the original
+      // upload for this version are not seen as orphan files.
+      if (uniqueId.isEmpty && version == minVersionToRetain && readFromCache) {
+        val cloudFiles = getImmutableFilesFromVersionZip(version, uniqueId)
+        cloudFiles.foreach(f => fileToMaxUsedVersion(f.dfsFileName) =
+          math.max(version, fileToMaxUsedVersion.getOrElse(f.dfsFileName, 
version)))
+      }
     }
 
     // Best effort attempt to delete SST files that were last used in 
to-be-deleted versions
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index bb02dad76ac6..8be2b610d099 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -4109,6 +4109,158 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
     }
   }
 
+  // Test ensuring a race condition on no-overwrite filesystems (e.g., ABFS)
+  // does not occur:
+  // 1. Query run 1 uploads snapshot X.zip pointing to SST file Y.SST
+  // 2. Query run 1 is cancelled before the commit log is written
+  // 3. Query run 2 retries the batch, uploads Z.SST, tries to re-upload X.zip
+  //    pointing to Z.SST. The zip overwrite silently fails on no-overwrite FS,
+  //    but versionToRocksDBFiles maps version X -> Z.SST (stale)
+  // 4. Maintenance/cleanup uses the stale in-memory mapping, sees Y.SST as
+  //    untracked, and deletes it
+  // 5. A subsequent query run tries to load X.zip from cloud, which still
+  //    references Y.SST -> FileNotFoundException
+  testWithChangelogCheckpointingEnabled("no-overwrite FS maintenance " +
+    "does not delete SST files still referenced by zip in DFS") {
+    withTempDir { dir =>
+      val remoteDir = dir.getCanonicalPath
+      val fmClass = "org.apache.spark.sql.execution.streaming.state." +
+        "NoOverwriteFileSystemBasedCheckpointFileManager"
+      val noOverwriteConf = new Configuration()
+      noOverwriteConf.set(STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, 
fmClass)
+      noOverwriteConf.set(StreamExecution.RUN_ID_KEY, 
UUID.randomUUID().toString)
+
+      // Snapshots at versions 10, 20, 30. With minVersionsToRetain = 2,
+      // minVersionToRetain = 20, so version 10 is deleted and cleanup runs.
+      // The fix fetches cloud metadata for the min retained version (20),
+      // protecting run 1's SSTs referenced by the on-disk 20.zip.
+      val conf = dbConf.copy(
+        compactOnCommit = false,
+        minVersionsToRetain = 2,
+        minVersionsToDelete = 0,
+        minDeltasForSnapshot = 10)
+
+      // Phase 1: versions 1-19 (snapshot at 10, rest changelog-only).
+      val localDir0 = Utils.createTempDir()
+      val db0 = new RocksDB(
+        remoteDir, conf = conf, localRootDir = localDir0,
+        hadoopConf = noOverwriteConf,
+        loggingId = s"[Thread-${Thread.currentThread.getId}]")
+      try {
+        db0.load(0)
+        db0.put("setup_key", "setup_value")
+        db0.commit() // version 1
+        db0.doMaintenance()
+        for (v <- 2 to 19) {
+          db0.load(v - 1)
+          db0.put(s"setup_key_v$v", s"setup_value_v$v")
+          db0.commit() // snapshot at v=10, changelog-only otherwise
+          db0.doMaintenance()
+        }
+      } finally {
+        db0.close()
+      }
+
+      val sstDir = new File(remoteDir, "SSTs")
+      val setupSstFiles = if (sstDir.exists()) {
+        
sstDir.listFiles().filter(_.getName.endsWith(".sst")).map(_.getName).toSet
+      } else {
+        Set.empty[String]
+      }
+
+      // Phase 2: Run 1 commits version 20, creating 20.zip with run 1's SSTs.
+      val localDir1 = Utils.createTempDir()
+      val db1 = new RocksDB(
+        remoteDir, conf = conf, localRootDir = localDir1,
+        hadoopConf = noOverwriteConf,
+        loggingId = s"[Thread-${Thread.currentThread.getId}]")
+      try {
+        db1.load(19)
+        db1.put("key", "value_from_run1")
+        db1.commit() // version 20 -- snapshot queued
+        db1.doMaintenance() // uploads 20.zip + run 1's SST files
+      } finally {
+        db1.close()
+      }
+
+      // Verify 20.zip was created
+      val zipFilesAfterRun1 = new File(remoteDir).listFiles()
+        .filter(_.getName.endsWith(".zip"))
+      assert(zipFilesAfterRun1.exists(_.getName.startsWith("20")),
+        s"Expected 20.zip after query run 1, found: " +
+          s"${zipFilesAfterRun1.map(_.getName).mkString(", ")}")
+
+      // Identify query run 1's SST files
+      val sstFilesAfterRun1 = sstDir.listFiles()
+        .filter(_.getName.endsWith(".sst")).map(_.getName).toSet
+      val run1SstFiles = sstFilesAfterRun1 -- setupSstFiles
+      assert(run1SstFiles.nonEmpty,
+        "Expected new SST files from query run 1")
+
+      // Phase 3: Run 2 (retry) commits version 20 again. 20.zip is not 
overwritten,
+      // but the in-memory cache now maps version 20 to run 2's SSTs.
+      val localDir2 = Utils.createTempDir()
+      val db2 = new RocksDB(
+        remoteDir,
+        conf = conf,
+        localRootDir = localDir2,
+        hadoopConf = noOverwriteConf,
+        loggingId = s"[Thread-${Thread.currentThread.getId}]")
+      try {
+        db2.load(19)
+        db2.put("key", "value_from_run2")
+        db2.commit() // version 20 -- run 2's SSTs created, snapshot queued
+        db2.doMaintenance() // run 2's SSTs uploaded, 20.zip silently not 
overwritten
+
+        val sstFilesAfterRun2 = sstDir.listFiles()
+          .filter(_.getName.endsWith(".sst")).map(_.getName).toSet
+        val run2SstFiles = sstFilesAfterRun2 -- sstFilesAfterRun1
+        assert(run2SstFiles.nonEmpty,
+          "Expected new SST files from query run 2")
+
+        // Phase 4: versions 21-30. Snapshot at 30 triggers cleanup
+        // (minVersionToRetain = 20, deletes 10.zip).
+        for (v <- 21 to 30) {
+          db2.load(v - 1)
+          db2.put(s"key_v$v", s"value_v$v")
+          db2.commit()
+          db2.doMaintenance()
+        }
+
+        // Run 1's SSTs survive cleanup: the fix fetches cloud metadata for
+        // version 20, protecting files referenced by the on-disk 20.zip.
+        val sstFilesAfterCleanup = sstDir.listFiles()
+          .filter(_.getName.endsWith(".sst")).map(_.getName).toSet
+        run1SstFiles.foreach { name =>
+          assert(sstFilesAfterCleanup.contains(name),
+            s"Expected run 1 SST file $name to still exist (protected by cloud 
" +
+              s"metadata fetch), but it was deleted.")
+        }
+
+        // Phase 5: Fresh instance loads version 29 from 20.zip + changelogs.
+        // Succeeds because run 1's SSTs (referenced by 20.zip) were preserved.
+        val localDir3 = Utils.createTempDir()
+        val db3 = new RocksDB(
+          remoteDir,
+          conf = conf,
+          localRootDir = localDir3,
+          hadoopConf = noOverwriteConf,
+          loggingId = s"[Thread-${Thread.currentThread.getId}]")
+        try {
+          db3.load(29)
+          val value = new String(db3.get("key"), "UTF-8")
+          assert(value == "value_from_run1",
+            s"Expected stale value 'value_from_run1' from 20.zip (run 1's 
SSTs), " +
+              s"but got '$value'")
+        } finally {
+          db3.close()
+        }
+      } finally {
+        db2.close()
+      }
+    }
+  }
+
   testWithChangelogCheckpointingEnabled("SPARK-52553 - v1 changelog with 
invalid version number" +
     " does not cause NumberFormatException") {
     withTempDir { dir =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to