This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a0b4205d9251 [SPARK-50151][SS][ROCKSDB HARDENING] Fix ineffective file 
reuse bug in the new file management change
a0b4205d9251 is described below

commit a0b4205d92513f68cf1b71e7c9827387af350b2a
Author: micheal-o <[email protected]>
AuthorDate: Mon Nov 4 07:31:12 2024 +0900

    [SPARK-50151][SS][ROCKSDB HARDENING] Fix ineffective file reuse bug in the 
new file management change
    
    ### What changes were proposed in this pull request?
    There are 2 bugs in the recently added new approach for RocksDB SST file 
mapping in this PR: https://github.com/apache/spark/pull/47875 (cc sahnib )
    
    1. The file mapping version is not properly advancing and only advances 
when we are reopening the RocksDB. This causes ineffective file reuse and we 
end up not reusing files that should have been reused. Leading to a lot of 
unnecessary file upload/download, and in the worst case it will make us act 
like file reuse is disabled.
    
    2. Ineffective file reuse when creating a checkpoint. We currently will not 
reuse the files for creating checkpoint, if it was added in the current version 
i.e. if you do load(v1) -> save(v2), the SST files loaded in v1 will not be 
reused in v2, and we will upload them again.
    
    3. These two bugs were not caught even though we have tests to catch them, 
because there was also a bug in the test.
    
    NOTE: these bugs will not cause corruption and more of a performance bug. 
They just make file reuse ineffective. And end up working like there's no file 
reuse enabled. i.e. A lot of file upload/download
    
    **To Repro**:
    You can add the test changes in this PR to current master and run RocksDB 
test, you will see test failures for file mapping version not advancing and for 
incorrect file reuse.
    
    ### Why are the changes needed?
    Bug fix. Without this change, it will work like file reuse is disabled. 
This is a performance bug.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing tests fixed and added assertion
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #48685 from micheal-o/FixNewFileMapping.
    
    Authored-by: micheal-o <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../sql/execution/streaming/state/RocksDB.scala    | 67 +++++++++++++++-------
 .../streaming/state/RocksDBFileManager.scala       |  9 +--
 .../execution/streaming/state/RocksDBSuite.scala   |  2 -
 3 files changed, 51 insertions(+), 27 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 544035e11785..167aefc88b7d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -297,7 +297,6 @@ class RocksDB(
         (loadedStateStoreCkptId.isEmpty || stateStoreCkptId.get != 
loadedStateStoreCkptId.get))) {
         closeDB(ignoreException = false)
         val latestSnapshotVersion = 
fileManager.getLatestSnapshotVersion(version)
-        rocksDBFileMapping.currentVersion = latestSnapshotVersion
         val metadata = fileManager.loadCheckpointFromDfs(latestSnapshotVersion,
           workingDir, rocksDBFileMapping)
         loadedVersion = latestSnapshotVersion
@@ -402,7 +401,6 @@ class RocksDB(
    */
   private def replayFromCheckpoint(snapshotVersion: Long, endVersion: Long): 
Any = {
     closeDB()
-    rocksDBFileMapping.currentVersion = snapshotVersion
     val metadata = fileManager.loadCheckpointFromDfs(snapshotVersion,
       workingDir, rocksDBFileMapping)
     loadedVersion = snapshotVersion
@@ -1148,22 +1146,53 @@ class RocksDBFileMapping {
   // from reusing SST files which have not been yet persisted to DFS,
   val snapshotsPendingUpload: Set[RocksDBVersionSnapshotInfo] = 
ConcurrentHashMap.newKeySet()
 
-  // Current State Store version which has been loaded.
-  var currentVersion: Long = 0
+  /**
+   * Get the mapped DFS file for the given local file for a DFS load operation.
+   * If the currently mapped DFS file was mapped in the same or newer version 
as the version we
+   * want to load (or was generated in a version which has not been uploaded 
to DFS yet),
+   * the mapped DFS file is ignored. In this scenario, the local mapping to 
this DFS file
+   * will be cleared, and function will return None.
+   *
+   * @note For same version number, this is because we want to make sure we 
are using
+   *       the latest files in DFS, in case the previous zip file has been 
overwritten in DFS.
+   *
+   * @return - Option with the DFS file or None
+   */
+  def getDfsFileForLoad(
+      fileManager: RocksDBFileManager,
+      localFileName: String,
+      versionToLoad: Long): Option[RocksDBImmutableFile] = {
+    getDfsFileWithVersionCheck(fileManager, localFileName, _ >= versionToLoad)
+  }
+
+  /**
+   * Get the mapped DFS file for the given local file for a DFS save (i.e. 
checkpoint) operation.
+   * If the currently mapped DFS file was mapped in the same or newer version 
as the version we
+   * want to save (or was generated in a version which has not been uploaded 
to DFS yet),
+   * the mapped DFS file is ignored. In this scenario, the local mapping to 
this DFS file
+   * will be cleared, and function will return None.
+   *
+   * @note If the file was added in current version (i.e. versionToSave - 1), 
we can reuse it.
+   *       e.g. we load(v1) -> save(v2), the loaded SST files from version 1 
can be reused
+   *       in version 2 upload.
+   *
+   * @return - Option with the DFS file or None
+   */
+  private def getDfsFileForSave(
+      fileManager: RocksDBFileManager,
+      localFileName: String,
+      versionToSave: Long): Option[RocksDBImmutableFile] = {
+    getDfsFileWithVersionCheck(fileManager, localFileName, _ >= versionToSave)
+  }
 
-  // If the local file (with localFileName) has already been persisted to DFS, 
returns the
-  // DFS file, else returns None.
-  // If the currently mapped DFS file was committed in a newer version (or was 
generated
-  // in a version which has not been uploaded to DFS yet), the mapped DFS file 
is ignored (because
-  // it cannot be reused in this version). In this scenario, the local mapping 
to this DFS file
-  // will be cleared, and function will return None.
-  def getDfsFile(
+  private def getDfsFileWithVersionCheck(
       fileManager: RocksDBFileManager,
-      localFileName: String): Option[RocksDBImmutableFile] = {
-    localFileMappings.get(localFileName).map { case (dfsFileCommitVersion, 
dfsFile) =>
+      localFileName: String,
+      isIncompatibleVersion: Long => Boolean): Option[RocksDBImmutableFile] = {
+    localFileMappings.get(localFileName).map { case (dfsFileMappedVersion, 
dfsFile) =>
       val dfsFileSuffix = fileManager.dfsFileSuffix(dfsFile)
-      val versionSnapshotInfo = 
RocksDBVersionSnapshotInfo(dfsFileCommitVersion, dfsFileSuffix)
-      if (dfsFileCommitVersion >= currentVersion ||
+      val versionSnapshotInfo = 
RocksDBVersionSnapshotInfo(dfsFileMappedVersion, dfsFileSuffix)
+      if (isIncompatibleVersion(dfsFileMappedVersion) ||
         snapshotsPendingUpload.contains(versionSnapshotInfo)) {
         // the mapped dfs file cannot be used, delete from mapping
         remove(localFileName)
@@ -1174,7 +1203,7 @@ class RocksDBFileMapping {
     }.getOrElse(None)
   }
 
-  private def mapToDfsFile(
+  def mapToDfsFile(
       localFileName: String,
       dfsFile: RocksDBImmutableFile,
       version: Long): Unit = {
@@ -1185,10 +1214,6 @@ class RocksDBFileMapping {
     localFileMappings.remove(localFileName)
   }
 
-  def mapToDfsFileForCurrentVersion(localFileName: String, dfsFile: 
RocksDBImmutableFile): Unit = {
-    localFileMappings.put(localFileName, (currentVersion, dfsFile))
-  }
-
   private def syncWithLocalState(localFiles: Seq[File]): Unit = {
     val localFileNames = localFiles.map(_.getName).toSet
     val deletedFiles = 
localFileMappings.keys.filterNot(localFileNames.contains)
@@ -1209,7 +1234,7 @@ class RocksDBFileMapping {
     val dfsFilesSuffix = UUID.randomUUID().toString
     val snapshotFileMapping = localImmutableFiles.map { f =>
       val localFileName = f.getName
-      val existingDfsFile = getDfsFile(fileManager, localFileName)
+      val existingDfsFile = getDfsFileForSave(fileManager, localFileName, 
version)
       val dfsFile = existingDfsFile.getOrElse {
         val newDfsFileName = fileManager.newDFSFileName(localFileName, 
dfsFilesSuffix)
         val newDfsFile = RocksDBImmutableFile(localFileName, newDfsFileName, 
sizeBytes = f.length())
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index e503ea1737c0..6b13ff31c9d5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -288,7 +288,7 @@ class RocksDBFileManager(
       val metadata = RocksDBCheckpointMetadata.readFromFile(metadataFile)
       logInfo(log"Read metadata for version ${MDC(LogKeys.VERSION_NUM, 
version)}:\n" +
         log"${MDC(LogKeys.METADATA_JSON, metadata.prettyJson)}")
-      loadImmutableFilesFromDfs(metadata.immutableFiles, localDir, 
rocksDBFileMapping)
+      loadImmutableFilesFromDfs(metadata.immutableFiles, localDir, 
rocksDBFileMapping, version)
       versionToRocksDBFiles.put(version, metadata.immutableFiles)
       metadataFile.delete()
       metadata
@@ -628,7 +628,8 @@ class RocksDBFileManager(
   private def loadImmutableFilesFromDfs(
       immutableFiles: Seq[RocksDBImmutableFile],
       localDir: File,
-      rocksDBFileMapping: RocksDBFileMapping): Unit = {
+      rocksDBFileMapping: RocksDBFileMapping,
+      version: Long): Unit = {
     val requiredFileNameToFileDetails = immutableFiles.map(f => 
f.localFileName -> f).toMap
 
     val localImmutableFiles = listRocksDBFiles(localDir)._1
@@ -637,7 +638,7 @@ class RocksDBFileManager(
     localImmutableFiles.foreach { existingFile =>
       val existingFileSize = existingFile.length()
       val requiredFile = 
requiredFileNameToFileDetails.get(existingFile.getName)
-      val prevDfsFile = rocksDBFileMapping.getDfsFile(this, 
existingFile.getName)
+      val prevDfsFile = rocksDBFileMapping.getDfsFileForLoad(this, 
existingFile.getName, version)
       val isSameFile = if (requiredFile.isDefined && prevDfsFile.isDefined) {
         requiredFile.get.dfsFileName == prevDfsFile.get.dfsFileName &&
           existingFile.length() == requiredFile.get.sizeBytes
@@ -679,7 +680,7 @@ class RocksDBFileManager(
         }
         filesCopied += 1
         bytesCopied += localFileSize
-        rocksDBFileMapping.mapToDfsFileForCurrentVersion(localFileName, file)
+        rocksDBFileMapping.mapToDfsFile(localFileName, file, version)
         logInfo(log"Copied ${MDC(LogKeys.DFS_FILE, dfsFile)} to " +
           log"${MDC(LogKeys.FILE_NAME, localFile)} - " +
           log"${MDC(LogKeys.NUM_BYTES, localFileSize)} bytes")
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 3455e4f8387c..637eb4913030 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -916,7 +916,6 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
         "archive/00002.log" -> 2000
       )
 
-      rocksDBFileMapping.currentVersion = 1
       saveCheckpointFiles(fileManager, cpFiles1, version = 1,
         numKeys = 101, rocksDBFileMapping)
       assert(fileManager.getLatestVersion() === 1)
@@ -2472,7 +2471,6 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
       fileMapping: RocksDBFileMapping): Unit = {
     val checkpointDir = Utils.createTempDir().getAbsolutePath // local dir to 
create checkpoints
     generateFiles(checkpointDir, fileToLengths)
-    fileMapping.currentVersion = version - 1
     val (dfsFileSuffix, immutableFileMapping) = 
fileMapping.createSnapshotFileMapping(
       fileManager, checkpointDir, version)
     fileManager.saveCheckpointToDfs(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to