This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 783b3e351d51 [SPARK-50622][SS][MINOR] RocksDB Refactor
783b3e351d51 is described below

commit 783b3e351d510a2805e1c85592446e64412a2c2d
Author: Wei Liu <[email protected]>
AuthorDate: Fri Dec 27 11:42:00 2024 +0900

    [SPARK-50622][SS][MINOR] RocksDB Refactor
    
    ### What changes were proposed in this pull request?
    
    Refactor the rocksDB file for style improvement and future maintainability
    
    ### Why are the changes needed?
    
    Code improvement
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing CI
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #49242 from WweiL/rocksdb-improvement.
    
    Authored-by: Wei Liu <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../sql/execution/streaming/state/RocksDB.scala    | 206 ++++++++++-----------
 1 file changed, 103 insertions(+), 103 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 795d29c16bcf..56f253b52335 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -77,20 +77,6 @@ class RocksDB(
 
   import RocksDB._
 
-  case class RocksDBSnapshot(
-      checkpointDir: File,
-      version: Long,
-      numKeys: Long,
-      columnFamilyMapping: Map[String, Short],
-      maxColumnFamilyId: Short,
-      dfsFileSuffix: String,
-      fileMapping: Map[String, RocksDBSnapshotFile],
-      uniqueId: Option[String] = None) {
-    def close(): Unit = {
-      silentDeleteRecursively(checkpointDir, s"Free up local checkpoint of 
snapshot $version")
-    }
-  }
-
   @volatile private var lastSnapshotVersion = 0L
 
   RocksDBLoader.loadLibrary()
@@ -785,50 +771,11 @@ class RocksDB(
     try {
       logInfo(log"Flushing updates for ${MDC(LogKeys.VERSION_NUM, 
newVersion)}")
 
-      var compactTimeMs = 0L
-      var flushTimeMs = 0L
-      var checkpointTimeMs = 0L
       var snapshot: Option[RocksDBSnapshot] = None
-
       if (shouldCreateSnapshot() || shouldForceSnapshot.get()) {
-        // Need to flush the change to disk before creating a checkpoint
-        // because rocksdb wal is disabled.
-        logInfo(log"Flushing updates for ${MDC(LogKeys.VERSION_NUM, 
newVersion)}")
-        flushTimeMs = timeTakenMs {
-          db.flush(flushOptions)
-        }
-
-        if (conf.compactOnCommit) {
-          logInfo("Compacting")
-          compactTimeMs = timeTakenMs {
-            db.compactRange()
-          }
-        }
-
-        checkpointTimeMs = timeTakenMs {
-          val checkpointDir = createTempDir("checkpoint")
-          logInfo(log"Creating checkpoint for ${MDC(LogKeys.VERSION_NUM, 
newVersion)} " +
-            log"in ${MDC(LogKeys.PATH, checkpointDir)}")
-          // Make sure the directory does not exist. Native RocksDB fails if 
the directory to
-          // checkpoint exists.
-          Utils.deleteRecursively(checkpointDir)
-          // We no longer pause background operation before creating a RocksDB 
checkpoint because
-          // it is unnecessary. The captured snapshot will still be consistent 
with ongoing
-          // background operations.
-          val cp = Checkpoint.create(db)
-          cp.createCheckpoint(checkpointDir.toString)
-          // if changelog checkpointing is disabled, the snapshot is uploaded 
synchronously
-          // inside the uploadSnapshot() called below.
-          // If changelog checkpointing is enabled, snapshot will be uploaded 
asynchronously
-          // during state store maintenance.
-          snapshot = Some(createSnapshot(
-            checkpointDir,
-            newVersion,
-            colFamilyNameToIdMap.asScala.toMap,
-            maxColumnFamilyId.get().toShort,
-            sessionStateStoreCkptId))
-          lastSnapshotVersion = newVersion
-        }
+        val (newSnapshot, snapshotLatency) = createSnapshot(newVersion, 
sessionStateStoreCkptId)
+        snapshot = newSnapshot
+        commitLatencyMs ++= snapshotLatency
       }
 
       logInfo(log"Syncing checkpoint for ${MDC(LogKeys.VERSION_NUM, 
newVersion)} to DFS")
@@ -840,12 +787,7 @@ class RocksDB(
           var isUploaded = false
           if (shouldForceSnapshot.get()) {
             assert(snapshot.isDefined)
-            fileManagerMetrics = uploadSnapshot(
-              snapshot.get,
-              fileManager,
-              rocksDBFileMapping.snapshotsPendingUpload,
-              loggingId
-            )
+            uploadSnapshot(snapshot.get)
             isUploaded = true
             shouldForceSnapshot.set(false)
           }
@@ -863,12 +805,7 @@ class RocksDB(
         } else {
           assert(changelogWriter.isEmpty)
           assert(snapshot.isDefined)
-          fileManagerMetrics = uploadSnapshot(
-            snapshot.get,
-            fileManager,
-            rocksDBFileMapping.snapshotsPendingUpload,
-            loggingId
-          )
+          uploadSnapshot(snapshot.get)
         }
       }
 
@@ -892,9 +829,6 @@ class RocksDB(
       numKeysOnLoadedVersion = numKeysOnWritingVersion
       loadedVersion = newVersion
       commitLatencyMs ++= Map(
-        "flush" -> flushTimeMs,
-        "compact" -> compactTimeMs,
-        "checkpoint" -> checkpointTimeMs,
         "fileSync" -> fileSyncTimeMs
       )
       recordedMetrics = Some(metrics)
@@ -920,6 +854,69 @@ class RocksDB(
     } else true
   }
 
+  private def createSnapshot(
+      version: Long,
+      checkpointUniqueId: Option[String]): (Option[RocksDBSnapshot], 
Map[String, Long]) = {
+    // Need to flush the change to disk before creating a checkpoint
+    // because rocksdb wal is disabled.
+    logInfo(log"Flushing updates for ${MDC(LogKeys.VERSION_NUM, version)}")
+    val flushTimeMs = timeTakenMs {
+      db.flush(flushOptions)
+    }
+    val compactTimeMs = if (conf.compactOnCommit) {
+      logInfo(log"Compacting")
+      timeTakenMs { db.compactRange() }
+    } else 0L
+
+    var snapshot: Option[RocksDBSnapshot] = None
+
+    val checkpointTimeMs = timeTakenMs {
+      val checkpointDir = createTempDir("checkpoint")
+      logInfo(log"Creating checkpoint for ${MDC(LogKeys.VERSION_NUM, version)} 
in " +
+        log"${MDC(LogKeys.CHECKPOINT_PATH, checkpointDir)}")
+      // Make sure the directory does not exist. Native RocksDB fails if the 
directory to
+      // checkpoint exists.
+      Utils.deleteRecursively(checkpointDir)
+      // We no longer pause background operation before creating a RocksDB 
checkpoint because
+      // it is unnecessary. The captured snapshot will still be consistent 
with ongoing
+      // background operations.
+      val cp = Checkpoint.create(db)
+      cp.createCheckpoint(checkpointDir.toString)
+
+      val (dfsFileSuffix, immutableFileMapping) = 
rocksDBFileMapping.createSnapshotFileMapping(
+        fileManager, checkpointDir, version)
+      val newSnapshot = Some(RocksDBSnapshot(
+        checkpointDir,
+        version,
+        numKeysOnWritingVersion,
+        colFamilyNameToIdMap.asScala.toMap,
+        maxColumnFamilyId.get().toShort,
+        dfsFileSuffix,
+        immutableFileMapping,
+        checkpointUniqueId))
+
+        snapshot = newSnapshot
+        lastSnapshotVersion = version
+      }
+
+    (snapshot,
+      Map(
+        "flush" -> flushTimeMs,
+        "compact" -> compactTimeMs,
+        "checkpoint" -> checkpointTimeMs
+      )
+    )
+  }
+
+  private[sql] def uploadSnapshot(snapshot: RocksDBSnapshot): Unit = {
+    fileManagerMetrics = uploadSnapshot(
+      snapshot,
+      fileManager,
+      rocksDBFileMapping.snapshotsPendingUpload,
+      loggingId
+    )
+  }
+
   /**
    * Drop uncommitted changes, and roll back to previous version.
    */
@@ -957,16 +954,13 @@ class RocksDB(
       }
 
       if (mostRecentSnapshot.isDefined) {
-        fileManagerMetrics = uploadSnapshot(
-          mostRecentSnapshot.get,
-          fileManager,
-          rocksDBFileMapping.snapshotsPendingUpload,
-          loggingId
-        )
+        uploadSnapshot(mostRecentSnapshot.get)
       }
     }
     val cleanupTime = timeTakenMs {
-      fileManager.deleteOldVersions(conf.minVersionsToRetain, 
conf.minVersionsToDelete)
+      fileManager.deleteOldVersions(
+        numVersionsToRetain = conf.minVersionsToRetain,
+        minVersionsToDelete = conf.minVersionsToDelete)
     }
     logInfo(log"Cleaned old data, time taken: ${MDC(LogKeys.TIME_UNITS, 
cleanupTime)} ms")
   }
@@ -1101,19 +1095,6 @@ class RocksDB(
     rocksDBMetricsOpt
   }
 
-  private def createSnapshot(
-      checkpointDir: File,
-      version: Long,
-      columnFamilyMapping: Map[String, Short],
-      maxColumnFamilyId: Short,
-      uniqueId: Option[String] = None): RocksDBSnapshot = {
-    val (dfsFileSuffix, immutableFileMapping) = 
rocksDBFileMapping.createSnapshotFileMapping(
-      fileManager, checkpointDir, version)
-
-    RocksDBSnapshot(checkpointDir, version, numKeysOnWritingVersion,
-      columnFamilyMapping, maxColumnFamilyId, dfsFileSuffix, 
immutableFileMapping, uniqueId)
-  }
-
   /**
    * Function to acquire RocksDB instance lock that allows for synchronized 
access to the state
    * store instance
@@ -1224,16 +1205,22 @@ class RocksDB(
 
   /** Upload the snapshot to DFS and remove it from snapshots pending */
   private def uploadSnapshot(
-    snapshot: RocksDB#RocksDBSnapshot,
-    fileManager: RocksDBFileManager,
-    snapshotsPendingUpload: Set[RocksDBVersionSnapshotInfo],
-    loggingId: String): RocksDBFileManagerMetrics = {
+      snapshot: RocksDBSnapshot,
+      fileManager: RocksDBFileManager,
+      snapshotsPendingUpload: Set[RocksDBVersionSnapshotInfo],
+      loggingId: String): RocksDBFileManagerMetrics = {
     var fileManagerMetrics: RocksDBFileManagerMetrics = null
     try {
       val uploadTime = timeTakenMs {
-        fileManager.saveCheckpointToDfs(snapshot.checkpointDir,
-          snapshot.version, snapshot.numKeys, snapshot.fileMapping,
-          Some(snapshot.columnFamilyMapping), 
Some(snapshot.maxColumnFamilyId), snapshot.uniqueId)
+        fileManager.saveCheckpointToDfs(
+          snapshot.checkpointDir,
+          snapshot.version,
+          snapshot.numKeys,
+          snapshot.fileMapping,
+          Some(snapshot.columnFamilyMapping),
+          Some(snapshot.maxColumnFamilyId),
+          snapshot.uniqueId
+        )
         fileManagerMetrics = fileManager.latestSaveCheckpointMetrics
 
         val snapshotInfo = RocksDBVersionSnapshotInfo(snapshot.version, 
snapshot.dfsFileSuffix)
@@ -1295,6 +1282,24 @@ class RocksDB(
     Utils.createDirectory(localRootDir.getAbsolutePath, prefix)
   }
 
+  override protected def logName: String = s"${super.logName} $loggingId"
+}
+
+object RocksDB extends Logging {
+  case class RocksDBSnapshot(
+      checkpointDir: File,
+      version: Long,
+      numKeys: Long,
+      columnFamilyMapping: Map[String, Short],
+      maxColumnFamilyId: Short,
+      dfsFileSuffix: String,
+      fileMapping: Map[String, RocksDBSnapshotFile],
+      uniqueId: Option[String] = None) {
+    def close(): Unit = {
+      silentDeleteRecursively(checkpointDir, s"Free up local checkpoint of 
snapshot $version")
+    }
+  }
+
   /** Attempt to delete recursively, and log the error if any */
   private def silentDeleteRecursively(file: File, msg: String): Unit = {
     try {
@@ -1306,15 +1311,10 @@ class RocksDB(
     }
   }
 
-  override protected def logName: String = s"${super.logName} $loggingId"
-}
-
-object RocksDB extends Logging {
   private def printLineageItems(lineage: Array[LineageItem]): String = 
lineage.map {
     case LineageItem(l, optStr) => s"$l:$optStr"
   }.mkString(" ")
 
-
   /** Records the duration of running `body` for the next query progress 
update. */
   private def timeTakenMs(body: => Unit): Long = Utils.timeTakenMs(body)._2
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to