This is an automated email from the ASF dual-hosted git repository.

ashrigondekar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 01bebbc23f16 [SPARK-55892][SS] Fix unable to load state store because 
reused SST file was deleted by maintenance
01bebbc23f16 is described below

commit 01bebbc23f161f39b76930edb3e396d386e225e5
Author: micheal-o <[email protected]>
AuthorDate: Tue Mar 10 17:30:40 2026 -0700

    [SPARK-55892][SS] Fix unable to load state store because reused SST file 
was deleted by maintenance
    
    ### What changes were proposed in this pull request?
    
    When there is a large delay between when a snapshot was created and when it 
was uploaded by maintenance, it is possible for the files that were reused from 
old versions, which are now eligible for deletion by another executor's 
maintenance to be deleted.
    
    Hence, when trying to load the state store using the uploaded snapshot, it 
will fail with FileNotFoundException for the reused SST file.
    
    ### Why are the changes needed?
    
    To prevent state store corruption and streaming query failure
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    New test case
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Claude-4.5-opus
    
    Closes #54697 from micheal-o/fix_stale_snapshot_upload.
    
    Authored-by: micheal-o <[email protected]>
    Signed-off-by: Anish Shrigondekar <[email protected]>
---
 .../sql/execution/streaming/state/RocksDB.scala    | 93 ++++++++++++++++++----
 .../streaming/state/RocksDBFileManager.scala       | 17 ++++
 .../execution/streaming/state/RocksDBSuite.scala   | 64 +++++++++++++++
 3 files changed, 160 insertions(+), 14 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index a17c788d3207..d3db9aa482b1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -1857,7 +1857,20 @@ class RocksDB(
       }
 
       if (mostRecentSnapshot.isDefined) {
-        uploadSnapshot(mostRecentSnapshot.get)
+        // SPARK-55892: We are doing this because the time between when a 
snapshot was created,
+        // and the time when maintenance upload is happening could be very 
long, such that the
+        // reused files could have been deleted by maintenance on another 
executor due to
+        // the existing uploaded snapshots depending on it being deleted.
+        // This would also handle other cases where the fileMapping is 
referencing a file
+        // that is already deleted in DFS.
+        // This could lead to SST file missing, when the state store is being 
reloaded with
+        // this new snapshot.
+        val snapshotToUpload = if (conf.checkStaleReusedFilesInSnapshot) {
+          replaceStaleReusedFilesInSnapshot(mostRecentSnapshot.get)
+        } else {
+          mostRecentSnapshot.get
+        }
+        uploadSnapshot(snapshotToUpload)
       }
     }
     val cleanupTime = timeTakenMs {
@@ -1869,6 +1882,42 @@ class RocksDB(
     logInfo(log"Cleaned old data, time taken: ${MDC(LogKeys.TIME_UNITS, 
cleanupTime)} ms")
   }
 
+  /**
+   * This replaces stale reused files in the snapshot with new ones to be 
uploaded.
+   * Stale means they are potential candidates for deletion by another
+   * maintenance that could have happened on another executor before this 
snapshot is uploaded.
+   * Checking if the file exists in DFS is not sufficient, since that could 
lead to race
+   * with an ongoing maintenance on another executor.
+   *
+   * @param snapshot The snapshot we want to upload
+   * @return The snapshot with stale reused files replaced with new DFS files 
to be uploaded
+   */
+  private def replaceStaleReusedFilesInSnapshot(snapshot: RocksDBSnapshot): 
RocksDBSnapshot = {
+    val isReusingFiles = 
snapshot.fileMapping.exists(_._2.reusedFromVersion.isDefined)
+
+    if (isReusingFiles) {
+      val latestSnapshotVersionInDfs = fileManager.getLatestSnapshotVersion()
+      // Any other maintenance wouldn't delete files after this version
+      val maxVersionToDelete = Math.max(latestSnapshotVersionInDfs - 
conf.minVersionsToRetain, 0)
+
+      val newFileMapping = snapshot.fileMapping.map { case (localFileName, 
snapshotFile) =>
+        if (snapshotFile.reusedFromVersion.isDefined &&
+          snapshotFile.reusedFromVersion.get <= maxVersionToDelete) {
+          // Don't reuse the file, lets upload a new one
+          val newDfsFileName = fileManager.newDFSFileName(localFileName, 
snapshot.dfsFileSuffix)
+          val newDfsFile = RocksDBImmutableFile(
+            localFileName, newDfsFileName, 
snapshotFile.immutableFile.sizeBytes)
+          localFileName -> RocksDBSnapshotFile(newDfsFile, reusedFromVersion = 
None)
+        } else {
+          localFileName -> snapshotFile
+        }
+      }.toMap
+      snapshot.copy(fileMapping = newFileMapping)
+    } else {
+      snapshot
+    }
+  }
+
   /** Release all resources */
   def close(): Unit = {
     // Acquire DB instance lock and release at the end to allow for 
synchronized access
@@ -2222,7 +2271,11 @@ case class RocksDBVersionSnapshotInfo(version: Long, 
dfsFilesUUID: String)
 
 // Encapsulates a RocksDB immutable file, and the information whether it has 
been previously
 // uploaded to DFS. Already uploaded files can be skipped during SST file 
upload.
-case class RocksDBSnapshotFile(immutableFile: RocksDBImmutableFile, 
isUploaded: Boolean)
+case class RocksDBSnapshotFile(
+    immutableFile: RocksDBImmutableFile,
+    reusedFromVersion: Option[Long]) {
+  def isUploaded: Boolean = reusedFromVersion.isDefined
+}
 
 // Encapsulates the mapping of local SST files to DFS files. This mapping 
prevents
 // re-uploading the same SST file multiple times to DFS, saving I/O and 
reducing snapshot
@@ -2273,7 +2326,7 @@ class RocksDBFileMapping {
       // We can't reuse the current local file since it was added in the same 
or newer version
       // as the version we want to load
       (fileVersion, _) => fileVersion >= versionToLoad
-    )
+    ).map(_._1)
   }
 
   /**
@@ -2288,12 +2341,12 @@ class RocksDBFileMapping {
    *       e.g. we load(v1) -> save(v2), the loaded SST files from version 1 
can be reused
    *       in version 2 upload.
    *
-   * @return - Option with the DFS file or None
+   * @return - Option with the DFS file and version or None
    */
   private def getDfsFileForSave(
       fileManager: RocksDBFileManager,
       localFile: File,
-      versionToSave: Long): Option[RocksDBImmutableFile] = {
+      versionToSave: Long): Option[(RocksDBImmutableFile, Long)] = {
     getDfsFileWithIncompatibilityCheck(
       fileManager,
       localFile.getName,
@@ -2307,7 +2360,8 @@ class RocksDBFileMapping {
   private def getDfsFileWithIncompatibilityCheck(
       fileManager: RocksDBFileManager,
       localFileName: String,
-      isIncompatible: (Long, RocksDBImmutableFile) => Boolean): 
Option[RocksDBImmutableFile] = {
+      isIncompatible: (Long, RocksDBImmutableFile) => Boolean
+  ): Option[(RocksDBImmutableFile, Long)] = {
     localFileMappings.get(localFileName).map { case (dfsFileMappedVersion, 
dfsFile) =>
       val dfsFileSuffix = fileManager.dfsFileSuffix(dfsFile)
       val versionSnapshotInfo = 
RocksDBVersionSnapshotInfo(dfsFileMappedVersion, dfsFileSuffix)
@@ -2317,7 +2371,7 @@ class RocksDBFileMapping {
         remove(localFileName)
         None
       } else {
-        Some(dfsFile)
+        Some((dfsFile, dfsFileMappedVersion))
       }
     }.getOrElse(None)
   }
@@ -2371,14 +2425,17 @@ class RocksDBFileMapping {
     val dfsFilesSuffix = UUID.randomUUID().toString
     val snapshotFileMapping = localImmutableFiles.map { f =>
       val localFileName = f.getName
-      val existingDfsFile = getDfsFileForSave(fileManager, f, version)
-      val dfsFile = existingDfsFile.getOrElse {
-        val newDfsFileName = fileManager.newDFSFileName(localFileName, 
dfsFilesSuffix)
-        val newDfsFile = RocksDBImmutableFile(localFileName, newDfsFileName, 
sizeBytes = f.length())
-        mapToDfsFile(localFileName, newDfsFile, version)
-        newDfsFile
+      val existingDfsFileAndVersion = getDfsFileForSave(fileManager, f, 
version)
+      val (dfsFile, reusedFromVersion) = existingDfsFileAndVersion match {
+        case Some((file, version)) => (file, Some(version))
+        case None =>
+          val newDfsFileName = fileManager.newDFSFileName(localFileName, 
dfsFilesSuffix)
+          val newDfsFile =
+            RocksDBImmutableFile(localFileName, newDfsFileName, sizeBytes = 
f.length())
+          mapToDfsFile(localFileName, newDfsFile, version)
+          (newDfsFile, None)
       }
-      localFileName -> RocksDBSnapshotFile(dfsFile, existingDfsFile.isDefined)
+      localFileName -> RocksDBSnapshotFile(dfsFile, reusedFromVersion)
     }.toMap
 
     syncWithLocalState(localImmutableFiles)
@@ -2426,6 +2483,7 @@ case class RocksDBConf(
     memoryUpdateIntervalMs: Long,
     compressionCodec: String,
     verifyNonEmptyFilesInZip: Boolean,
+    checkStaleReusedFilesInSnapshot: Boolean,
     allowFAllocate: Boolean,
     compression: String,
     reportSnapshotUploadLag: Boolean,
@@ -2541,6 +2599,12 @@ object RocksDBConf {
   private val VERIFY_NON_EMPTY_FILES_IN_ZIP_CONF =
     SQLConfEntry(VERIFY_NON_EMPTY_FILES_IN_ZIP_CONF_KEY, "true")
 
+  // Config to determine whether we should check for stale reused files
+  // during maintenance snapshot upload.
+  val CHECK_STALE_REUSED_FILES_IN_SNAPSHOT_CONF_KEY = 
"checkStaleReusedFilesInSnapshot"
+  private val CHECK_STALE_REUSED_FILES_IN_SNAPSHOT_CONF =
+    SQLConfEntry(CHECK_STALE_REUSED_FILES_IN_SNAPSHOT_CONF_KEY, "true")
+
   // Configuration to set the merge operator version for backward 
compatibility.
   // Version 1 (default): Uses comma "," as delimiter for StringAppendOperator
   // Version 2: Uses empty string "" as delimiter (no delimiter, direct 
concatenation)
@@ -2641,6 +2705,7 @@ object RocksDBConf {
       getPositiveLongConf(MEMORY_UPDATE_INTERVAL_MS_CONF),
       storeConf.compressionCodec,
       getBooleanConf(VERIFY_NON_EMPTY_FILES_IN_ZIP_CONF),
+      getBooleanConf(CHECK_STALE_REUSED_FILES_IN_SNAPSHOT_CONF),
       getBooleanConf(ALLOW_FALLOCATE_CONF),
       getStringConf(COMPRESSION_CONF),
       storeConf.reportSnapshotUploadLag,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index 8d63b7b250c3..40ffe57e5b64 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -429,6 +429,23 @@ class RocksDBFileManager(
     }
   }
 
+  /** Get the latest snapshot version available in DFS. If none present, it 
returns 0. */
+  def getLatestSnapshotVersion(): Long = {
+    val path = new Path(dfsRootDir)
+    if (fm.exists(path)) {
+      val files = fm.list(path).map(_.getPath)
+      files.filter(onlyZipFiles.accept)
+        .map { fileName =>
+          fileName.getName.stripSuffix(".zip").split("_") match {
+            case Array(version, _) => version.toLong
+            case Array(version) => version.toLong
+          }
+        }.foldLeft(0L)(math.max)
+    } else {
+      0
+    }
+  }
+
   /** Get all the snapshot versions that can be used to load this version */
   def getEligibleSnapshotsForVersion(version: Long): Seq[Long] = {
     SnapshotLoaderHelper.getEligibleSnapshotsForVersion(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 8be2b610d099..bf0134772e1c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -3982,6 +3982,70 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
     }
   }
 
+  testWithChangelogCheckpointingEnabled(
+    "SPARK-55892: Stale reused files in snapshot are replaced during 
maintenance upload") {
+    // This test verifies that when a snapshot is uploaded after a delay, 
files that were
+    // reused from old versions (which are now eligible for deletion by 
another executor's
+    // maintenance) are replaced with new files to be uploaded.
+    //
+    // Scenario:
+    // db1 loads v0 and commits v1. Generates 1.changelog and 1.zip (but not 
uploaded to DFS)
+    // db2 loads v1 but since no snapshot, it uses 1.changelog and generates 
snapshot 2.zip
+    // db 1 maintenance uploads snapshot 1.zip to DFS
+    // db 3 loads v2 using 1.zip + 2.changelog and generates 3.zip but not 
uploaded to DFS
+    // db2 maintenance uploads 2.zip, and deletes files < 2.zip, because 
minVersionsToRetain = 1
+    // db 3 uploads 3.zip to DFS <-- (with new fix, shouldn't include reused 
files from 1.zip)
+    // db 4 loads v3 using 3.zip, should be successful (previously cause 
FileNotFoundException)
+
+    val conf = dbConf.copy(
+      minVersionsToRetain = 1,
+      minDeltasForSnapshot = 0, // Force snapshot on every commit
+      minVersionsToDelete = 0, // Allow immediate deletion of old versions
+      compactOnCommit = false
+    )
+
+    withTempDir { dir =>
+      val remoteDir = dir.getCanonicalPath
+      withDB(remoteDir = remoteDir, conf = conf) { db1 =>
+        db1.load(0)
+        db1.put("key1", "value1")
+        db1.commit() // produce 1.changelog and 1.zip but not uploaded to DFS
+
+        withDB(remoteDir = remoteDir, conf = conf) { db2 =>
+          db2.load(1)
+          db2.put("key2", "value2")
+          db2.commit() // produce 2.changelog and 2.zip but not uploaded to DFS
+
+          db1.doMaintenance() // db1 now uploads 1.zip to DFS
+
+          withDB(remoteDir = remoteDir, conf = conf) { db3 =>
+            db3.load(2) // load using 1.zip from db1
+            db3.put("key3", "value3")
+            db3.commit() // produce 3.changelog and 3.zip but not uploaded to 
DFS
+
+            // db2 maintenance uploads 2.zip, and deletes files < 2.zip,
+            // so 1.zip and its associated SSTs are deleted
+            db2.doMaintenance()
+
+            // db3 now uploads 3.zip to DFS. Before the fix, it will reuse the 
SST files
+            // from 1.zip, since 3.zip was created using 1.zip, leading to 
FileNotFoundException.
+            // With the fix, during this maintenance, we check the reused 
files and
+            // avoid reusing files that are eligible for deletion.
+            db3.doMaintenance()
+          }
+        }
+      }
+
+      // db4 load(3) should be successful, and no FileNotFoundException
+      withDB(remoteDir = remoteDir, conf = conf) { db4 =>
+        db4.load(3) // load using 3.zip from db3
+        assert(toStr(db4.get("key1")) == "value1") // record created in 1.zip
+        db4.put("key4", "value4")
+        db4.commit()
+      }
+    }
+  }
+
   test("RocksDB task completion listener correctly releases for failed task") {
     // This test verifies that a thread that locks the DB and then fails
     // can rely on the completion listener to release the lock.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to