This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 783b3e351d51 [SPARK-50622][SS][MINOR] RocksDB Refactor
783b3e351d51 is described below
commit 783b3e351d510a2805e1c85592446e64412a2c2d
Author: Wei Liu <[email protected]>
AuthorDate: Fri Dec 27 11:42:00 2024 +0900
[SPARK-50622][SS][MINOR] RocksDB Refactor
### What changes were proposed in this pull request?
Refactor the rocksDB file for style improvement and future maintainability
### Why are the changes needed?
Code improvement
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing CI
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #49242 from WweiL/rocksdb-improvement.
Authored-by: Wei Liu <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../sql/execution/streaming/state/RocksDB.scala | 206 ++++++++++-----------
1 file changed, 103 insertions(+), 103 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 795d29c16bcf..56f253b52335 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -77,20 +77,6 @@ class RocksDB(
import RocksDB._
- case class RocksDBSnapshot(
- checkpointDir: File,
- version: Long,
- numKeys: Long,
- columnFamilyMapping: Map[String, Short],
- maxColumnFamilyId: Short,
- dfsFileSuffix: String,
- fileMapping: Map[String, RocksDBSnapshotFile],
- uniqueId: Option[String] = None) {
- def close(): Unit = {
- silentDeleteRecursively(checkpointDir, s"Free up local checkpoint of
snapshot $version")
- }
- }
-
@volatile private var lastSnapshotVersion = 0L
RocksDBLoader.loadLibrary()
@@ -785,50 +771,11 @@ class RocksDB(
try {
logInfo(log"Flushing updates for ${MDC(LogKeys.VERSION_NUM,
newVersion)}")
- var compactTimeMs = 0L
- var flushTimeMs = 0L
- var checkpointTimeMs = 0L
var snapshot: Option[RocksDBSnapshot] = None
-
if (shouldCreateSnapshot() || shouldForceSnapshot.get()) {
- // Need to flush the change to disk before creating a checkpoint
- // because rocksdb wal is disabled.
- logInfo(log"Flushing updates for ${MDC(LogKeys.VERSION_NUM,
newVersion)}")
- flushTimeMs = timeTakenMs {
- db.flush(flushOptions)
- }
-
- if (conf.compactOnCommit) {
- logInfo("Compacting")
- compactTimeMs = timeTakenMs {
- db.compactRange()
- }
- }
-
- checkpointTimeMs = timeTakenMs {
- val checkpointDir = createTempDir("checkpoint")
- logInfo(log"Creating checkpoint for ${MDC(LogKeys.VERSION_NUM,
newVersion)} " +
- log"in ${MDC(LogKeys.PATH, checkpointDir)}")
- // Make sure the directory does not exist. Native RocksDB fails if
the directory to
- // checkpoint exists.
- Utils.deleteRecursively(checkpointDir)
- // We no longer pause background operation before creating a RocksDB
checkpoint because
- // it is unnecessary. The captured snapshot will still be consistent
with ongoing
- // background operations.
- val cp = Checkpoint.create(db)
- cp.createCheckpoint(checkpointDir.toString)
- // if changelog checkpointing is disabled, the snapshot is uploaded
synchronously
- // inside the uploadSnapshot() called below.
- // If changelog checkpointing is enabled, snapshot will be uploaded
asynchronously
- // during state store maintenance.
- snapshot = Some(createSnapshot(
- checkpointDir,
- newVersion,
- colFamilyNameToIdMap.asScala.toMap,
- maxColumnFamilyId.get().toShort,
- sessionStateStoreCkptId))
- lastSnapshotVersion = newVersion
- }
+ val (newSnapshot, snapshotLatency) = createSnapshot(newVersion,
sessionStateStoreCkptId)
+ snapshot = newSnapshot
+ commitLatencyMs ++= snapshotLatency
}
logInfo(log"Syncing checkpoint for ${MDC(LogKeys.VERSION_NUM,
newVersion)} to DFS")
@@ -840,12 +787,7 @@ class RocksDB(
var isUploaded = false
if (shouldForceSnapshot.get()) {
assert(snapshot.isDefined)
- fileManagerMetrics = uploadSnapshot(
- snapshot.get,
- fileManager,
- rocksDBFileMapping.snapshotsPendingUpload,
- loggingId
- )
+ uploadSnapshot(snapshot.get)
isUploaded = true
shouldForceSnapshot.set(false)
}
@@ -863,12 +805,7 @@ class RocksDB(
} else {
assert(changelogWriter.isEmpty)
assert(snapshot.isDefined)
- fileManagerMetrics = uploadSnapshot(
- snapshot.get,
- fileManager,
- rocksDBFileMapping.snapshotsPendingUpload,
- loggingId
- )
+ uploadSnapshot(snapshot.get)
}
}
@@ -892,9 +829,6 @@ class RocksDB(
numKeysOnLoadedVersion = numKeysOnWritingVersion
loadedVersion = newVersion
commitLatencyMs ++= Map(
- "flush" -> flushTimeMs,
- "compact" -> compactTimeMs,
- "checkpoint" -> checkpointTimeMs,
"fileSync" -> fileSyncTimeMs
)
recordedMetrics = Some(metrics)
@@ -920,6 +854,69 @@ class RocksDB(
} else true
}
+ private def createSnapshot(
+ version: Long,
+ checkpointUniqueId: Option[String]): (Option[RocksDBSnapshot],
Map[String, Long]) = {
+ // Need to flush the change to disk before creating a checkpoint
+ // because rocksdb wal is disabled.
+ logInfo(log"Flushing updates for ${MDC(LogKeys.VERSION_NUM, version)}")
+ val flushTimeMs = timeTakenMs {
+ db.flush(flushOptions)
+ }
+ val compactTimeMs = if (conf.compactOnCommit) {
+ logInfo(log"Compacting")
+ timeTakenMs { db.compactRange() }
+ } else 0L
+
+ var snapshot: Option[RocksDBSnapshot] = None
+
+ val checkpointTimeMs = timeTakenMs {
+ val checkpointDir = createTempDir("checkpoint")
+ logInfo(log"Creating checkpoint for ${MDC(LogKeys.VERSION_NUM, version)}
in " +
+ log"${MDC(LogKeys.CHECKPOINT_PATH, checkpointDir)}")
+ // Make sure the directory does not exist. Native RocksDB fails if the
directory to
+ // checkpoint exists.
+ Utils.deleteRecursively(checkpointDir)
+ // We no longer pause background operation before creating a RocksDB
checkpoint because
+ // it is unnecessary. The captured snapshot will still be consistent
with ongoing
+ // background operations.
+ val cp = Checkpoint.create(db)
+ cp.createCheckpoint(checkpointDir.toString)
+
+ val (dfsFileSuffix, immutableFileMapping) =
rocksDBFileMapping.createSnapshotFileMapping(
+ fileManager, checkpointDir, version)
+ val newSnapshot = Some(RocksDBSnapshot(
+ checkpointDir,
+ version,
+ numKeysOnWritingVersion,
+ colFamilyNameToIdMap.asScala.toMap,
+ maxColumnFamilyId.get().toShort,
+ dfsFileSuffix,
+ immutableFileMapping,
+ checkpointUniqueId))
+
+ snapshot = newSnapshot
+ lastSnapshotVersion = version
+ }
+
+ (snapshot,
+ Map(
+ "flush" -> flushTimeMs,
+ "compact" -> compactTimeMs,
+ "checkpoint" -> checkpointTimeMs
+ )
+ )
+ }
+
+ private[sql] def uploadSnapshot(snapshot: RocksDBSnapshot): Unit = {
+ fileManagerMetrics = uploadSnapshot(
+ snapshot,
+ fileManager,
+ rocksDBFileMapping.snapshotsPendingUpload,
+ loggingId
+ )
+ }
+
/**
* Drop uncommitted changes, and roll back to previous version.
*/
@@ -957,16 +954,13 @@ class RocksDB(
}
if (mostRecentSnapshot.isDefined) {
- fileManagerMetrics = uploadSnapshot(
- mostRecentSnapshot.get,
- fileManager,
- rocksDBFileMapping.snapshotsPendingUpload,
- loggingId
- )
+ uploadSnapshot(mostRecentSnapshot.get)
}
}
val cleanupTime = timeTakenMs {
- fileManager.deleteOldVersions(conf.minVersionsToRetain,
conf.minVersionsToDelete)
+ fileManager.deleteOldVersions(
+ numVersionsToRetain = conf.minVersionsToRetain,
+ minVersionsToDelete = conf.minVersionsToDelete)
}
logInfo(log"Cleaned old data, time taken: ${MDC(LogKeys.TIME_UNITS,
cleanupTime)} ms")
}
@@ -1101,19 +1095,6 @@ class RocksDB(
rocksDBMetricsOpt
}
- private def createSnapshot(
- checkpointDir: File,
- version: Long,
- columnFamilyMapping: Map[String, Short],
- maxColumnFamilyId: Short,
- uniqueId: Option[String] = None): RocksDBSnapshot = {
- val (dfsFileSuffix, immutableFileMapping) =
rocksDBFileMapping.createSnapshotFileMapping(
- fileManager, checkpointDir, version)
-
- RocksDBSnapshot(checkpointDir, version, numKeysOnWritingVersion,
- columnFamilyMapping, maxColumnFamilyId, dfsFileSuffix,
immutableFileMapping, uniqueId)
- }
-
/**
* Function to acquire RocksDB instance lock that allows for synchronized
access to the state
* store instance
@@ -1224,16 +1205,22 @@ class RocksDB(
/** Upload the snapshot to DFS and remove it from snapshots pending */
private def uploadSnapshot(
- snapshot: RocksDB#RocksDBSnapshot,
- fileManager: RocksDBFileManager,
- snapshotsPendingUpload: Set[RocksDBVersionSnapshotInfo],
- loggingId: String): RocksDBFileManagerMetrics = {
+ snapshot: RocksDBSnapshot,
+ fileManager: RocksDBFileManager,
+ snapshotsPendingUpload: Set[RocksDBVersionSnapshotInfo],
+ loggingId: String): RocksDBFileManagerMetrics = {
var fileManagerMetrics: RocksDBFileManagerMetrics = null
try {
val uploadTime = timeTakenMs {
- fileManager.saveCheckpointToDfs(snapshot.checkpointDir,
- snapshot.version, snapshot.numKeys, snapshot.fileMapping,
- Some(snapshot.columnFamilyMapping),
Some(snapshot.maxColumnFamilyId), snapshot.uniqueId)
+ fileManager.saveCheckpointToDfs(
+ snapshot.checkpointDir,
+ snapshot.version,
+ snapshot.numKeys,
+ snapshot.fileMapping,
+ Some(snapshot.columnFamilyMapping),
+ Some(snapshot.maxColumnFamilyId),
+ snapshot.uniqueId
+ )
fileManagerMetrics = fileManager.latestSaveCheckpointMetrics
val snapshotInfo = RocksDBVersionSnapshotInfo(snapshot.version,
snapshot.dfsFileSuffix)
@@ -1295,6 +1282,24 @@ class RocksDB(
Utils.createDirectory(localRootDir.getAbsolutePath, prefix)
}
+ override protected def logName: String = s"${super.logName} $loggingId"
+}
+
+object RocksDB extends Logging {
+ case class RocksDBSnapshot(
+ checkpointDir: File,
+ version: Long,
+ numKeys: Long,
+ columnFamilyMapping: Map[String, Short],
+ maxColumnFamilyId: Short,
+ dfsFileSuffix: String,
+ fileMapping: Map[String, RocksDBSnapshotFile],
+ uniqueId: Option[String] = None) {
+ def close(): Unit = {
+ silentDeleteRecursively(checkpointDir, s"Free up local checkpoint of
snapshot $version")
+ }
+ }
+
/** Attempt to delete recursively, and log the error if any */
private def silentDeleteRecursively(file: File, msg: String): Unit = {
try {
@@ -1306,15 +1311,10 @@ class RocksDB(
}
}
- override protected def logName: String = s"${super.logName} $loggingId"
-}
-
-object RocksDB extends Logging {
private def printLineageItems(lineage: Array[LineageItem]): String =
lineage.map {
case LineageItem(l, optStr) => s"$l:$optStr"
}.mkString(" ")
-
/** Records the duration of running `body` for the next query progress
update. */
private def timeTakenMs(body: => Unit): Long = Utils.timeTakenMs(body)._2
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]