This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f25ebe52b9b8 [SPARK-46796][SS] Ensure the correct remote files (mentioned in metadata.zip) are used on RocksDB version load f25ebe52b9b8 is described below commit f25ebe52b9b84ece9b3c5ae30b83eaaef52ec55b Author: Bhuwan Sahni <bhuwan.sa...@databricks.com> AuthorDate: Wed Jan 24 21:35:33 2024 +0900 [SPARK-46796][SS] Ensure the correct remote files (mentioned in metadata.zip) are used on RocksDB version load ### What changes were proposed in this pull request? This PR ensures that RocksDB loads do not run into SST file Version ID mismatch issue. RocksDB has added validation to ensure exact same SST file is used during database load from snapshot. Current streaming state suffers from certain edge cases where this condition is violated resulting in state load failure. The changes introduced are: 1. Ensure that the local SST file is exactly the same DFS file (as per mapping in metadata.zip). We keep track of the DFS file path for a local SST file, and re download the SST file in case DFS file has a different UUID in metadata zip. 2. Reset lastSnapshotVersion in RocksDB when Rocks DB is loaded. Changelog checkpoint relies on this version for future snapshots. Currently, if a older version is reloaded we were not uploading snapshots as lastSnapshotVersion was pointing to a higher snapshot of a cleanup database. ### Why are the changes needed? We need to ensure that the correct SST files are used on executor during RocksDB load as per mapping in metadata.zip. With current implementation, its possible that the executor uses a SST file (with a different UUID) from a older version which is not the exact file mapped in the metadata.zip. This can cause version Id mismatch errors while loading RocksDB leading to streaming query failures. See https://issues.apache.org/jira/browse/SPARK-46796 for failure scenarios. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added exhaustive unit testcases covering the scenarios. ### Was this patch authored or co-authored using generative AI tooling? No Closes #44837 from sahnib/SPARK-46796. Authored-by: Bhuwan Sahni <bhuwan.sa...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../sql/execution/streaming/state/RocksDB.scala | 3 + .../streaming/state/RocksDBFileManager.scala | 92 ++++-- .../execution/streaming/state/RocksDBSuite.scala | 314 ++++++++++++++++++++- 3 files changed, 372 insertions(+), 37 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 0284d4c9d303..8997e8df6c89 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -162,6 +162,8 @@ class RocksDB( val metadata = fileManager.loadCheckpointFromDfs(latestSnapshotVersion, workingDir) loadedVersion = latestSnapshotVersion + // reset last snapshot version + lastSnapshotVersion = 0L openDB() numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) { @@ -202,6 +204,7 @@ class RocksDB( */ private def replayChangelog(endVersion: Long): Unit = { for (v <- loadedVersion + 1 to endVersion) { + logInfo(s"replaying changelog from version $loadedVersion -> $endVersion") var changelogReader: StateStoreChangelogReader = null try { changelogReader = fileManager.getChangelogReader(v) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index 5e2b6afee68e..794e39e2bacc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -134,6 +134,15 @@ class RocksDBFileManager( import RocksDBImmutableFile._ private val versionToRocksDBFiles = new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]] + + + // used to keep a mapping of the exact Dfs file that was used to create a local SST file. + // The reason this is a separate map because versionToRocksDBFiles can contain multiple similar + // SST files to a particular local file (for example 1.sst can map to 1-UUID1.sst in v1 and + // 1-UUID2.sst in v2). We need to capture the exact file used to ensure Version ID compatibility + // across SST files and RocksDB manifest. + private[sql] val localFilesToDfsFiles = new ConcurrentHashMap[String, RocksDBImmutableFile] + private lazy val fm = CheckpointFileManager.create(new Path(dfsRootDir), hadoopConf) private val fs = new Path(dfsRootDir).getFileSystem(hadoopConf) private val onlyZipFiles = new PathFilter { @@ -223,6 +232,7 @@ class RocksDBFileManager( versionToRocksDBFiles.keySet().removeIf(_ >= version) val metadata = if (version == 0) { if (localDir.exists) Utils.deleteRecursively(localDir) + localFilesToDfsFiles.clear() localDir.mkdirs() RocksDBCheckpointMetadata(Seq.empty, 0) } else { @@ -459,44 +469,54 @@ class RocksDBFileManager( // Get the immutable files used in previous versions, as some of those uploaded files can be // reused for this version logInfo(s"Saving RocksDB files to DFS for $version") - val prevFilesToSizes = versionToRocksDBFiles.asScala.filter { case (k, _) => k < version } - .values.flatten.map { f => - f.localFileName -> f - }.toMap var bytesCopied = 0L var filesCopied = 0L var filesReused = 0L val immutableFiles = localFiles.map { localFile => - prevFilesToSizes - .get(localFile.getName) - .filter(_.isSameFile(localFile)) - .map { reusable => - filesReused += 1 - reusable - }.getOrElse { - val localFileName = localFile.getName - val dfsFileName = newDFSFileName(localFileName) - val dfsFile = dfsFilePath(dfsFileName) - // Note: The implementation of copyFromLocalFile() closes the output stream when there is - // any exception while copying. So this may generate partial files on DFS. But that is - // okay because until the main [version].zip file is written, those partial files are - // not going to be used at all. Eventually these files should get cleared. - fs.copyFromLocalFile( - new Path(localFile.getAbsoluteFile.toURI), dfsFile) - val localFileSize = localFile.length() - logInfo(s"Copied $localFile to $dfsFile - $localFileSize bytes") - filesCopied += 1 - bytesCopied += localFileSize - - RocksDBImmutableFile(localFile.getName, dfsFileName, localFileSize) - } + val existingDfsFile = localFilesToDfsFiles.asScala.get(localFile.getName) + if (existingDfsFile.isDefined && existingDfsFile.get.sizeBytes == localFile.length()) { + val dfsFile = existingDfsFile.get + filesReused += 1 + logInfo(s"reusing file $dfsFile for $localFile") + RocksDBImmutableFile(localFile.getName, dfsFile.dfsFileName, dfsFile.sizeBytes) + } else { + val localFileName = localFile.getName + val dfsFileName = newDFSFileName(localFileName) + val dfsFile = dfsFilePath(dfsFileName) + // Note: The implementation of copyFromLocalFile() closes the output stream when there is + // any exception while copying. So this may generate partial files on DFS. But that is + // okay because until the main [version].zip file is written, those partial files are + // not going to be used at all. Eventually these files should get cleared. + fs.copyFromLocalFile( + new Path(localFile.getAbsoluteFile.toURI), dfsFile) + val localFileSize = localFile.length() + logInfo(s"Copied $localFile to $dfsFile - $localFileSize bytes") + filesCopied += 1 + bytesCopied += localFileSize + + val immutableDfsFile = RocksDBImmutableFile(localFile.getName, dfsFileName, localFileSize) + localFilesToDfsFiles.put(localFileName, immutableDfsFile) + + immutableDfsFile + } } logInfo(s"Copied $filesCopied files ($bytesCopied bytes) from local to" + s" DFS for version $version. $filesReused files reused without copying.") versionToRocksDBFiles.put(version, immutableFiles) + // clean up deleted SST files from the localFilesToDfsFiles Map + val currentLocalFiles = localFiles.map(_.getName).toSet + val mappingsToClean = localFilesToDfsFiles.asScala + .keys + .filterNot(currentLocalFiles.contains) + + mappingsToClean.foreach { f => + logInfo(s"cleaning $f from the localFilesToDfsFiles map") + localFilesToDfsFiles.remove(f) + } + saveCheckpointMetrics = RocksDBFileManagerMetrics( bytesCopied = bytesCopied, filesCopied = filesCopied, @@ -516,11 +536,22 @@ class RocksDBFileManager( // Delete unnecessary local immutable files listRocksDBFiles(localDir)._1 .foreach { existingFile => - val isSameFile = - requiredFileNameToFileDetails.get(existingFile.getName).exists(_.isSameFile(existingFile)) + val requiredFile = requiredFileNameToFileDetails.get(existingFile.getName) + val prevDfsFile = localFilesToDfsFiles.asScala.get(existingFile.getName) + val isSameFile = if (requiredFile.isDefined && prevDfsFile.isDefined) { + requiredFile.get.dfsFileName == prevDfsFile.get.dfsFileName && + existingFile.length() == requiredFile.get.sizeBytes + } else { + false + } + if (!isSameFile) { existingFile.delete() - logInfo(s"Deleted local file $existingFile") + localFilesToDfsFiles.remove(existingFile.getName) + logInfo(s"Deleted local file $existingFile with size ${existingFile.length()} mapped" + + s" to previous dfsFile ${prevDfsFile.getOrElse("null")}") + } else { + logInfo(s"reusing $prevDfsFile present at $existingFile for $requiredFile") } } @@ -545,6 +576,7 @@ class RocksDBFileManager( } filesCopied += 1 bytesCopied += localFileSize + localFilesToDfsFiles.put(localFileName, file) logInfo(s"Copied $dfsFile to $localFile - $localFileSize bytes") } else { filesReused += 1 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 9ce2137df72c..88de68db1a9e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -24,19 +24,39 @@ import scala.language.implicitConversions import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path import org.rocksdb.CompressionType import org.scalactic.source.Position import org.scalatest.Tag import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.util.quietly -import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager +import org.apache.spark.sql.execution.streaming.{CreateAtomicTestManager, FileSystemBasedCheckpointFileManager} +import org.apache.spark.sql.execution.streaming.CheckpointFileManager.{CancellableFSDataOutputStream, RenameBasedFSDataOutputStream} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} import org.apache.spark.tags.SlowSQLTest import org.apache.spark.util.{ThreadUtils, Utils} import org.apache.spark.util.ArrayImplicits._ +class NoOverwriteFileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration) + extends FileSystemBasedCheckpointFileManager(path, hadoopConf) { + + override def createAtomic(path: Path, + overwriteIfPossible: Boolean): CancellableFSDataOutputStream = { + new RenameBasedFSDataOutputStream(this, path, overwriteIfPossible) + } + + override def renameTempFile(srcPath: Path, dstPath: Path, + overwriteIfPossible: Boolean): Unit = { + if (!fs.exists(dstPath)) { + // only write if a file does not exist at this location + super.renameTempFile(srcPath, dstPath, overwriteIfPossible) + } + } +} + trait RocksDBStateStoreChangelogCheckpointingTestUtil { val rocksdbChangelogCheckpointingConfKey: String = RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" @@ -711,19 +731,19 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared // Save SAME version again with different checkpoint files and load back again to verify // whether files were overwritten. val cpFiles1_ = Seq( - "sst-file1.sst" -> 10, // same SST file as before, but same version, so should get copied + "sst-file1.sst" -> 10, // same SST file as before, this should get reused "sst-file2.sst" -> 25, // new SST file with same name as before, but different length "sst-file3.sst" -> 30, // new SST file "other-file1" -> 100, // same non-SST file as before, should not get copied "other-file2" -> 210, // new non-SST file with same name as before, but different length "other-file3" -> 300, // new non-SST file - "archive/00001.log" -> 1000, // same log file as before and version, so should get copied + "archive/00001.log" -> 1000, // same log file as before, this should get reused "archive/00002.log" -> 2500, // new log file with same name as before, but different length "archive/00003.log" -> 3000 // new log file ) saveCheckpointFiles(fileManager, cpFiles1_, version = 1, numKeys = 1001) - assert(numRemoteSSTFiles === 5, "shouldn't copy same files again") // 2 old + 3 new SST files - assert(numRemoteLogFiles === 5, "shouldn't copy same files again") // 2 old + 3 new log files + assert(numRemoteSSTFiles === 4, "shouldn't copy same files again") // 2 old + 2 new SST files + assert(numRemoteLogFiles === 4, "shouldn't copy same files again") // 2 old + 2 new log files loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 1, cpFiles1_, 1001) // Save another version and verify @@ -733,8 +753,8 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared "archive/00004.log" -> 4000 ) saveCheckpointFiles(fileManager, cpFiles2, version = 2, numKeys = 1501) - assert(numRemoteSSTFiles === 6) // 1 new file over earlier 5 files - assert(numRemoteLogFiles === 6) // 1 new file over earlier 5 files + assert(numRemoteSSTFiles === 5) // 1 new file over earlier 4 files + assert(numRemoteLogFiles === 5) // 1 new file over earlier 4 files loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 2, cpFiles2, 1501) // Loading an older version should work @@ -1228,6 +1248,286 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared } } + test("time travel - validate successful RocksDB load") { + val remoteDir = Utils.createTempDir().toString + val conf = dbConf.copy(minDeltasForSnapshot = 1, compactOnCommit = false) + new File(remoteDir).delete() // to make sure that the directory gets created + withDB(remoteDir, conf = conf) { db => + for (version <- 0 to 1) { + db.load(version) + db.put(version.toString, version.toString) + db.commit() + } + // upload snapshot 2.zip + db.doMaintenance() + for (version <- Seq(2)) { + db.load(version) + db.put(version.toString, version.toString) + db.commit() + } + // upload snapshot 3.zip + db.doMaintenance() + // simulate db in another executor that override the zip file + withDB(remoteDir, conf = conf) { db1 => + for (version <- 0 to 1) { + db1.load(version) + db1.put(version.toString, version.toString) + db1.commit() + } + db1.doMaintenance() + } + db.load(2) + for (version <- Seq(2)) { + db.load(version) + db.put(version.toString, version.toString) + db.commit() + } + // upload snapshot 3.zip + db.doMaintenance() + // rollback to version 2 + db.load(2) + } + } + + test("time travel 2 - validate successful RocksDB load") { + Seq(1, 2).map(minDeltasForSnapshot => { + val remoteDir = Utils.createTempDir().toString + val conf = dbConf.copy(minDeltasForSnapshot = minDeltasForSnapshot, + compactOnCommit = false) + new File(remoteDir).delete() // to make sure that the directory gets created + withDB(remoteDir, conf = conf) { db => + for (version <- 0 to 1) { + db.load(version) + db.put(version.toString, version.toString) + db.commit() + } + // upload snapshot 2.zip + db.doMaintenance() + for (version <- 2 to 3) { + db.load(version) + db.put(version.toString, version.toString) + db.commit() + } + db.load(0) + // simulate db in another executor that override the zip file + withDB(remoteDir, conf = conf) { db1 => + for (version <- 0 to 1) { + db1.load(version) + db1.put(version.toString, version.toString) + db1.commit() + } + db1.doMaintenance() + } + for (version <- 2 to 3) { + db.load(version) + db.put(version.toString, version.toString) + db.commit() + } + // upload snapshot 4.zip + db.doMaintenance() + } + withDB(remoteDir, version = 4, conf = conf) { db => + } + }) + } + + test("time travel 3 - validate successful RocksDB load") { + val remoteDir = Utils.createTempDir().toString + val conf = dbConf.copy(minDeltasForSnapshot = 0, compactOnCommit = false) + new File(remoteDir).delete() // to make sure that the directory gets created + withDB(remoteDir, conf = conf) { db => + for (version <- 0 to 2) { + db.load(version) + db.put(version.toString, version.toString) + db.commit() + } + // upload snapshot 2.zip + db.doMaintenance() + for (version <- 1 to 3) { + db.load(version) + db.put(version.toString, version.toString) + db.commit() + } + // upload snapshot 4.zip + db.doMaintenance() + } + + withDB(remoteDir, version = 4, conf = conf) { db => + } + } + + test("validate Rocks DB SST files do not have a VersionIdMismatch" + + " when metadata file is not overwritten - scenario 1") { + val fmClass = "org.apache.spark.sql.execution.streaming.state." + + "NoOverwriteFileSystemBasedCheckpointFileManager" + withTempDir { dir => + val dbConf = RocksDBConf(StateStoreConf(new SQLConf())) + val hadoopConf = new Configuration() + hadoopConf.set(STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, fmClass) + + val remoteDir = dir.getCanonicalPath + withDB(remoteDir, conf = dbConf, hadoopConf = hadoopConf) { db1 => + withDB(remoteDir, conf = dbConf, hadoopConf = hadoopConf) { db2 => + // commit version 1 via db1 + db1.load(0) + db1.put("a", "1") + db1.put("b", "1") + + db1.commit() + + // commit version 1 via db2 + db2.load(0) + db2.put("a", "1") + db2.put("b", "1") + + db2.commit() + + // commit version 2 via db2 + db2.load(1) + db2.put("a", "2") + db2.put("b", "2") + + db2.commit() + + // reload version 1, this should succeed + db2.load(1) + db1.load(1) + + // reload version 2, this should succeed + db2.load(2) + db1.load(2) + } + } + } + } + + test("validate Rocks DB SST files do not have a VersionIdMismatch" + + " when metadata file is overwritten - scenario 1") { + withTempDir { dir => + val dbConf = RocksDBConf(StateStoreConf(new SQLConf())) + val hadoopConf = new Configuration() + val remoteDir = dir.getCanonicalPath + withDB(remoteDir, conf = dbConf, hadoopConf = hadoopConf) { db1 => + withDB(remoteDir, conf = dbConf, hadoopConf = hadoopConf) { db2 => + // commit version 1 via db1 + db1.load(0) + db1.put("a", "1") + db1.put("b", "1") + + db1.commit() + + // commit version 1 via db2 + db2.load(0) + db2.put("a", "1") + db2.put("b", "1") + + db2.commit() + + // commit version 2 via db2 + db2.load(1) + db2.put("a", "2") + db2.put("b", "2") + + db2.commit() + + // reload version 1, this should succeed + db2.load(1) + db1.load(1) + + // reload version 2, this should succeed + db2.load(2) + db1.load(2) + } + } + } + } + + test("validate Rocks DB SST files do not have a VersionIdMismatch" + + " when metadata file is not overwritten - scenario 2") { + val fmClass = "org.apache.spark.sql.execution.streaming.state." + + "NoOverwriteFileSystemBasedCheckpointFileManager" + withTempDir { dir => + val dbConf = RocksDBConf(StateStoreConf(new SQLConf())) + val hadoopConf = new Configuration() + hadoopConf.set(STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, fmClass) + + val remoteDir = dir.getCanonicalPath + withDB(remoteDir, conf = dbConf, hadoopConf = hadoopConf) { db1 => + withDB(remoteDir, conf = dbConf, hadoopConf = hadoopConf) { db2 => + // commit version 1 via db2 + db2.load(0) + db2.put("a", "1") + db2.put("b", "1") + + db2.commit() + + // commit version 1 via db1 + db1.load(0) + db1.put("a", "1") + db1.put("b", "1") + + db1.commit() + + // commit version 2 via db2 + db2.load(1) + db2.put("a", "2") + db2.put("b", "2") + + db2.commit() + + // reload version 1, this should succeed + db2.load(1) + db1.load(1) + + // reload version 2, this should succeed + db2.load(2) + db1.load(2) + } + } + } + } + + test("validate Rocks DB SST files do not have a VersionIdMismatch" + + " when metadata file is overwritten - scenario 2") { + withTempDir { dir => + val dbConf = RocksDBConf(StateStoreConf(new SQLConf())) + val hadoopConf = new Configuration() + val remoteDir = dir.getCanonicalPath + withDB(remoteDir, conf = dbConf, hadoopConf = hadoopConf) { db1 => + withDB(remoteDir, conf = dbConf, hadoopConf = hadoopConf) { db2 => + // commit version 1 via db2 + db2.load(0) + db2.put("a", "1") + db2.put("b", "1") + + db2.commit() + + // commit version 1 via db1 + db1.load(0) + db1.put("a", "1") + db1.put("b", "1") + + db1.commit() + + // commit version 2 via db2 + db2.load(1) + db2.put("a", "2") + db2.put("b", "2") + + db2.commit() + + // reload version 1, this should succeed + db2.load(1) + db1.load(1) + + // reload version 2, this should succeed + db2.load(2) + db1.load(2) + } + } + } + } + private def sqlConf = SQLConf.get.clone() private def dbConf = RocksDBConf(StateStoreConf(sqlConf)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org