This is an automated email from the ASF dual-hosted git repository.
ashrigondekar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 01bebbc23f16 [SPARK-55892][SS] Fix unable to load state store because
reused SST file was deleted by maintenance
01bebbc23f16 is described below
commit 01bebbc23f161f39b76930edb3e396d386e225e5
Author: micheal-o <[email protected]>
AuthorDate: Tue Mar 10 17:30:40 2026 -0700
[SPARK-55892][SS] Fix unable to load state store because reused SST file
was deleted by maintenance
### What changes were proposed in this pull request?
When there is a large delay between when a snapshot was created and when it
was uploaded by maintenance, it is possible for the files that were reused from
old versions, which are now eligible for deletion by another executor's
maintenance to be deleted.
Hence, when trying to load the state store using the uploaded snapshot, it
will fail with FileNotFoundException for the reused SST file.
### Why are the changes needed?
To prevent state store corruption and streaming query failure
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
New test case
### Was this patch authored or co-authored using generative AI tooling?
Claude-4.5-opus
Closes #54697 from micheal-o/fix_stale_snapshot_upload.
Authored-by: micheal-o <[email protected]>
Signed-off-by: Anish Shrigondekar <[email protected]>
---
.../sql/execution/streaming/state/RocksDB.scala | 93 ++++++++++++++++++----
.../streaming/state/RocksDBFileManager.scala | 17 ++++
.../execution/streaming/state/RocksDBSuite.scala | 64 +++++++++++++++
3 files changed, 160 insertions(+), 14 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index a17c788d3207..d3db9aa482b1 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -1857,7 +1857,20 @@ class RocksDB(
}
if (mostRecentSnapshot.isDefined) {
- uploadSnapshot(mostRecentSnapshot.get)
+ // SPARK-55892: We are doing this because the time between when a
snapshot was created,
+ // and the time when maintenance upload is happening could be very
long, such that the
+ // reused files could have been deleted by maintenance on another
executor due to
+ // the existing uploaded snapshots depending on it being deleted.
+ // This would also handle other cases where the fileMapping is
referencing a file
+ // that is already deleted in DFS.
+ // This could lead to SST file missing, when the state store is being
reloaded with
+ // this new snapshot.
+ val snapshotToUpload = if (conf.checkStaleReusedFilesInSnapshot) {
+ replaceStaleReusedFilesInSnapshot(mostRecentSnapshot.get)
+ } else {
+ mostRecentSnapshot.get
+ }
+ uploadSnapshot(snapshotToUpload)
}
}
val cleanupTime = timeTakenMs {
@@ -1869,6 +1882,42 @@ class RocksDB(
logInfo(log"Cleaned old data, time taken: ${MDC(LogKeys.TIME_UNITS,
cleanupTime)} ms")
}
+ /**
+ * This replaces stale reused files in the snapshot with new ones to be
uploaded.
+ * Stale means they are potential candidates for deletion by another
+ * maintenance that could have happened on another executor before this
snapshot is uploaded.
+ * Checking if the file exists in DFS is not sufficient, since that could
lead to race
+ * with an ongoing maintenance on another executor.
+ *
+ * @param snapshot The snapshot we want to upload
+ * @return The snapshot with stale reused files replaced with new DFS files
to be uploaded
+ */
+ private def replaceStaleReusedFilesInSnapshot(snapshot: RocksDBSnapshot):
RocksDBSnapshot = {
+ val isReusingFiles =
snapshot.fileMapping.exists(_._2.reusedFromVersion.isDefined)
+
+ if (isReusingFiles) {
+ val latestSnapshotVersionInDfs = fileManager.getLatestSnapshotVersion()
+ // Any other maintenance wouldn't delete files after this version
+ val maxVersionToDelete = Math.max(latestSnapshotVersionInDfs -
conf.minVersionsToRetain, 0)
+
+ val newFileMapping = snapshot.fileMapping.map { case (localFileName,
snapshotFile) =>
+ if (snapshotFile.reusedFromVersion.isDefined &&
+ snapshotFile.reusedFromVersion.get <= maxVersionToDelete) {
+ // Don't reuse the file, lets upload a new one
+ val newDfsFileName = fileManager.newDFSFileName(localFileName,
snapshot.dfsFileSuffix)
+ val newDfsFile = RocksDBImmutableFile(
+ localFileName, newDfsFileName,
snapshotFile.immutableFile.sizeBytes)
+ localFileName -> RocksDBSnapshotFile(newDfsFile, reusedFromVersion =
None)
+ } else {
+ localFileName -> snapshotFile
+ }
+ }.toMap
+ snapshot.copy(fileMapping = newFileMapping)
+ } else {
+ snapshot
+ }
+ }
+
/** Release all resources */
def close(): Unit = {
// Acquire DB instance lock and release at the end to allow for
synchronized access
@@ -2222,7 +2271,11 @@ case class RocksDBVersionSnapshotInfo(version: Long,
dfsFilesUUID: String)
// Encapsulates a RocksDB immutable file, and the information whether it has
been previously
// uploaded to DFS. Already uploaded files can be skipped during SST file
upload.
-case class RocksDBSnapshotFile(immutableFile: RocksDBImmutableFile,
isUploaded: Boolean)
+case class RocksDBSnapshotFile(
+ immutableFile: RocksDBImmutableFile,
+ reusedFromVersion: Option[Long]) {
+ def isUploaded: Boolean = reusedFromVersion.isDefined
+}
// Encapsulates the mapping of local SST files to DFS files. This mapping
prevents
// re-uploading the same SST file multiple times to DFS, saving I/O and
reducing snapshot
@@ -2273,7 +2326,7 @@ class RocksDBFileMapping {
// We can't reuse the current local file since it was added in the same
or newer version
// as the version we want to load
(fileVersion, _) => fileVersion >= versionToLoad
- )
+ ).map(_._1)
}
/**
@@ -2288,12 +2341,12 @@ class RocksDBFileMapping {
* e.g. we load(v1) -> save(v2), the loaded SST files from version 1
can be reused
* in version 2 upload.
*
- * @return - Option with the DFS file or None
+ * @return - Option with the DFS file and version or None
*/
private def getDfsFileForSave(
fileManager: RocksDBFileManager,
localFile: File,
- versionToSave: Long): Option[RocksDBImmutableFile] = {
+ versionToSave: Long): Option[(RocksDBImmutableFile, Long)] = {
getDfsFileWithIncompatibilityCheck(
fileManager,
localFile.getName,
@@ -2307,7 +2360,8 @@ class RocksDBFileMapping {
private def getDfsFileWithIncompatibilityCheck(
fileManager: RocksDBFileManager,
localFileName: String,
- isIncompatible: (Long, RocksDBImmutableFile) => Boolean):
Option[RocksDBImmutableFile] = {
+ isIncompatible: (Long, RocksDBImmutableFile) => Boolean
+ ): Option[(RocksDBImmutableFile, Long)] = {
localFileMappings.get(localFileName).map { case (dfsFileMappedVersion,
dfsFile) =>
val dfsFileSuffix = fileManager.dfsFileSuffix(dfsFile)
val versionSnapshotInfo =
RocksDBVersionSnapshotInfo(dfsFileMappedVersion, dfsFileSuffix)
@@ -2317,7 +2371,7 @@ class RocksDBFileMapping {
remove(localFileName)
None
} else {
- Some(dfsFile)
+ Some((dfsFile, dfsFileMappedVersion))
}
}.getOrElse(None)
}
@@ -2371,14 +2425,17 @@ class RocksDBFileMapping {
val dfsFilesSuffix = UUID.randomUUID().toString
val snapshotFileMapping = localImmutableFiles.map { f =>
val localFileName = f.getName
- val existingDfsFile = getDfsFileForSave(fileManager, f, version)
- val dfsFile = existingDfsFile.getOrElse {
- val newDfsFileName = fileManager.newDFSFileName(localFileName,
dfsFilesSuffix)
- val newDfsFile = RocksDBImmutableFile(localFileName, newDfsFileName,
sizeBytes = f.length())
- mapToDfsFile(localFileName, newDfsFile, version)
- newDfsFile
+ val existingDfsFileAndVersion = getDfsFileForSave(fileManager, f,
version)
+ val (dfsFile, reusedFromVersion) = existingDfsFileAndVersion match {
+ case Some((file, version)) => (file, Some(version))
+ case None =>
+ val newDfsFileName = fileManager.newDFSFileName(localFileName,
dfsFilesSuffix)
+ val newDfsFile =
+ RocksDBImmutableFile(localFileName, newDfsFileName, sizeBytes =
f.length())
+ mapToDfsFile(localFileName, newDfsFile, version)
+ (newDfsFile, None)
}
- localFileName -> RocksDBSnapshotFile(dfsFile, existingDfsFile.isDefined)
+ localFileName -> RocksDBSnapshotFile(dfsFile, reusedFromVersion)
}.toMap
syncWithLocalState(localImmutableFiles)
@@ -2426,6 +2483,7 @@ case class RocksDBConf(
memoryUpdateIntervalMs: Long,
compressionCodec: String,
verifyNonEmptyFilesInZip: Boolean,
+ checkStaleReusedFilesInSnapshot: Boolean,
allowFAllocate: Boolean,
compression: String,
reportSnapshotUploadLag: Boolean,
@@ -2541,6 +2599,12 @@ object RocksDBConf {
private val VERIFY_NON_EMPTY_FILES_IN_ZIP_CONF =
SQLConfEntry(VERIFY_NON_EMPTY_FILES_IN_ZIP_CONF_KEY, "true")
+ // Config to determine whether we should check for stale reused files
+ // during maintenance snapshot upload.
+ val CHECK_STALE_REUSED_FILES_IN_SNAPSHOT_CONF_KEY =
"checkStaleReusedFilesInSnapshot"
+ private val CHECK_STALE_REUSED_FILES_IN_SNAPSHOT_CONF =
+ SQLConfEntry(CHECK_STALE_REUSED_FILES_IN_SNAPSHOT_CONF_KEY, "true")
+
// Configuration to set the merge operator version for backward
compatibility.
// Version 1 (default): Uses comma "," as delimiter for StringAppendOperator
// Version 2: Uses empty string "" as delimiter (no delimiter, direct
concatenation)
@@ -2641,6 +2705,7 @@ object RocksDBConf {
getPositiveLongConf(MEMORY_UPDATE_INTERVAL_MS_CONF),
storeConf.compressionCodec,
getBooleanConf(VERIFY_NON_EMPTY_FILES_IN_ZIP_CONF),
+ getBooleanConf(CHECK_STALE_REUSED_FILES_IN_SNAPSHOT_CONF),
getBooleanConf(ALLOW_FALLOCATE_CONF),
getStringConf(COMPRESSION_CONF),
storeConf.reportSnapshotUploadLag,
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index 8d63b7b250c3..40ffe57e5b64 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -429,6 +429,23 @@ class RocksDBFileManager(
}
}
+ /** Get the latest snapshot version available in DFS. If none present, it
returns 0. */
+ def getLatestSnapshotVersion(): Long = {
+ val path = new Path(dfsRootDir)
+ if (fm.exists(path)) {
+ val files = fm.list(path).map(_.getPath)
+ files.filter(onlyZipFiles.accept)
+ .map { fileName =>
+ fileName.getName.stripSuffix(".zip").split("_") match {
+ case Array(version, _) => version.toLong
+ case Array(version) => version.toLong
+ }
+ }.foldLeft(0L)(math.max)
+ } else {
+ 0
+ }
+ }
+
/** Get all the snapshot versions that can be used to load this version */
def getEligibleSnapshotsForVersion(version: Long): Seq[Long] = {
SnapshotLoaderHelper.getEligibleSnapshotsForVersion(
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 8be2b610d099..bf0134772e1c 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -3982,6 +3982,70 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
}
}
+ testWithChangelogCheckpointingEnabled(
+ "SPARK-55892: Stale reused files in snapshot are replaced during
maintenance upload") {
+ // This test verifies that when a snapshot is uploaded after a delay,
files that were
+ // reused from old versions (which are now eligible for deletion by
another executor's
+ // maintenance) are replaced with new files to be uploaded.
+ //
+ // Scenario:
+ // db1 loads v0 and commits v1. Generates 1.changelog and 1.zip (but not
uploaded to DFS)
+ // db2 loads v1 but since no snapshot, it uses 1.changelog and generates
snapshot 2.zip
+ // db 1 maintenance uploads snapshot 1.zip to DFS
+ // db 3 loads v2 using 1.zip + 2.changelog and generates 3.zip but not
uploaded to DFS
+ // db2 maintenance uploads 2.zip, and deletes files < 2.zip, because
minVersionsToRetain = 1
+ // db 3 uploads 3.zip to DFS <-- (with new fix, shouldn't include reused
files from 1.zip)
+ // db 4 loads v3 using 3.zip, should be successful (previously cause
FileNotFoundException)
+
+ val conf = dbConf.copy(
+ minVersionsToRetain = 1,
+ minDeltasForSnapshot = 0, // Force snapshot on every commit
+ minVersionsToDelete = 0, // Allow immediate deletion of old versions
+ compactOnCommit = false
+ )
+
+ withTempDir { dir =>
+ val remoteDir = dir.getCanonicalPath
+ withDB(remoteDir = remoteDir, conf = conf) { db1 =>
+ db1.load(0)
+ db1.put("key1", "value1")
+ db1.commit() // produce 1.changelog and 1.zip but not uploaded to DFS
+
+ withDB(remoteDir = remoteDir, conf = conf) { db2 =>
+ db2.load(1)
+ db2.put("key2", "value2")
+ db2.commit() // produce 2.changelog and 2.zip but not uploaded to DFS
+
+ db1.doMaintenance() // db1 now uploads 1.zip to DFS
+
+ withDB(remoteDir = remoteDir, conf = conf) { db3 =>
+ db3.load(2) // load using 1.zip from db1
+ db3.put("key3", "value3")
+ db3.commit() // produce 3.changelog and 3.zip but not uploaded to
DFS
+
+ // db2 maintenance uploads 2.zip, and deletes files < 2.zip,
+ // so 1.zip and its associated SSTs are deleted
+ db2.doMaintenance()
+
+ // db3 now uploads 3.zip to DFS. Before the fix, it will reuse the
SST files
+ // from 1.zip, since 3.zip was created using 1.zip, leading to
FileNotFoundException.
+ // With the fix, during this maintenance, we check the reused
files and
+ // avoid reusing files that are eligible for deletion.
+ db3.doMaintenance()
+ }
+ }
+ }
+
+ // db4 load(3) should be successful, and no FileNotFoundException
+ withDB(remoteDir = remoteDir, conf = conf) { db4 =>
+ db4.load(3) // load using 3.zip from db3
+ assert(toStr(db4.get("key1")) == "value1") // record created in 1.zip
+ db4.put("key4", "value4")
+ db4.commit()
+ }
+ }
+ }
+
test("RocksDB task completion listener correctly releases for failed task") {
// This test verifies that a thread that locks the DB and then fails
// can rely on the completion listener to release the lock.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]