This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 4e20a2a6319e [SPARK-48931][SS] Reduce Cloud Store List API cost for
state store maintenance task
4e20a2a6319e is described below
commit 4e20a2a6319eb53bd2b52f4fb4e498bbcbc1f96c
Author: Riya Verma <[email protected]>
AuthorDate: Wed Jul 24 14:03:17 2024 +0900
[SPARK-48931][SS] Reduce Cloud Store List API cost for state store
maintenance task
### What changes were proposed in this pull request?
Currently, during the state store maintenance process, we find which old
version files of the **RocksDB** state store to delete by listing all existing
snapshotted version files in the checkpoint directory every 1 minute by
default. The frequent list calls in the cloud can result in high costs. To
address this concern and reduce the cost associated with state store
maintenance, we should aim to minimize the frequency of listing object stores
inside the maintenance task. To minimize th [...]
The changes include:
1. Adding new configuration variable
`ratioExtraVersionsAllowedInCheckpoint` in **SQLConf**. This determines the
ratio of extra versions files we want to retain in the checkpoint directory
compared to number of versions to retain for rollbacks (`minBatchesToRetain`).
2. Using this config and `minBatchesToRetain`, set `minVersionsToDelete`
config inside **StateStoreConf** to `minVersionsToDelete =
ratioExtraVersionsAllowedInCheckpoint * minBatchesToRetain.`
3. Using `minSeenVersion` and `maxSeenVersion` variables in
**RocksDBFileManager** to estimate min/max version present in directory and
control deletion frequency. This is done by ensuring number of stale versions
to delete is at least `minVersionsToDelete`
### Why are the changes needed?
Currently, maintenance operations like snapshotting, purging, deletion, and
file management is done asynchronously for each data partition. We want to
shift away periodic deletion and instead rely on the estimated number of files
in the checkpoint directory to reduce list calls and introduce batch deletion.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added unit tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #47393 from
riyaverm-db/reduce-cloud-store-list-api-cost-in-maintenance.
Authored-by: Riya Verma <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../scala/org/apache/spark/internal/LogKey.scala | 4 ++
.../org/apache/spark/sql/internal/SQLConf.scala | 13 +++++
.../sql/execution/streaming/state/RocksDB.scala | 11 +++-
.../streaming/state/RocksDBFileManager.scala | 64 +++++++++++++++++++++-
.../execution/streaming/state/StateStoreConf.scala | 8 +++
.../execution/streaming/state/RocksDBSuite.scala | 49 ++++++++++++++++-
6 files changed, 143 insertions(+), 6 deletions(-)
diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
index a6184038b523..b84f7d839c2a 100644
--- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
+++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
@@ -396,6 +396,7 @@ private[spark] object LogKeys {
case object MAX_NUM_PARTITIONS extends LogKey
case object MAX_NUM_POSSIBLE_BINS extends LogKey
case object MAX_NUM_ROWS_IN_MEMORY_BUFFER extends LogKey
+ case object MAX_SEEN_VERSION extends LogKey
case object MAX_SERVICE_NAME_LENGTH extends LogKey
case object MAX_SIZE extends LogKey
case object MAX_SLOTS extends LogKey
@@ -420,9 +421,11 @@ private[spark] object LogKeys {
case object MIN_NUM_FREQUENT_PATTERN extends LogKey
case object MIN_POINT_PER_CLUSTER extends LogKey
case object MIN_RATE extends LogKey
+ case object MIN_SEEN_VERSION extends LogKey
case object MIN_SHARE extends LogKey
case object MIN_SIZE extends LogKey
case object MIN_TIME extends LogKey
+ case object MIN_VERSIONS_TO_DELETE extends LogKey
case object MIN_VERSION_NUM extends LogKey
case object MISSING_PARENT_STAGES extends LogKey
case object MODEL_WEIGHTS extends LogKey
@@ -850,6 +853,7 @@ private[spark] object LogKeys {
case object USER_NAME extends LogKey
case object UUID extends LogKey
case object VALUE extends LogKey
+ case object VERSIONS_TO_DELETE extends LogKey
case object VERSION_NUM extends LogKey
case object VIEW_ACLS extends LogKey
case object VIEW_ACLS_GROUPS extends LogKey
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 04fdc7655bb3..a7110bc9feca 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2129,6 +2129,17 @@ object SQLConf {
.intConf
.createWithDefault(100)
+ val RATIO_EXTRA_SPACE_ALLOWED_IN_CHECKPOINT =
+ buildConf("spark.sql.streaming.ratioExtraSpaceAllowedInCheckpoint")
+ .internal()
+ .doc("The ratio of extra space allowed for batch deletion of files when
maintenance is" +
+ "invoked. When value > 0, it optimizes the cost of discovering and
deleting old checkpoint " +
+ "versions. The minimum number of stale versions we retain in checkpoint
location for batch " +
+ "deletion is calculated by minBatchesToRetain *
ratioExtraSpaceAllowedInCheckpoint.")
+ .version("4.0.0")
+ .doubleConf
+ .createWithDefault(0.3)
+
val MAX_BATCHES_TO_RETAIN_IN_MEMORY =
buildConf("spark.sql.streaming.maxBatchesToRetainInMemory")
.internal()
.doc("The maximum number of batches which will be retained in memory to
avoid " +
@@ -5405,6 +5416,8 @@ class SQLConf extends Serializable with Logging with
SqlApiConf {
def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN)
+ def ratioExtraSpaceAllowedInCheckpoint: Double =
getConf(RATIO_EXTRA_SPACE_ALLOWED_IN_CHECKPOINT)
+
def maxBatchesToRetainInMemory: Int =
getConf(MAX_BATCHES_TO_RETAIN_IN_MEMORY)
def streamingMaintenanceInterval: Long =
getConf(STREAMING_MAINTENANCE_INTERVAL)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 2f3f5a57261f..b454e0ba5c93 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -185,6 +185,8 @@ class RocksDB(
val latestSnapshotVersion =
fileManager.getLatestSnapshotVersion(version)
val metadata =
fileManager.loadCheckpointFromDfs(latestSnapshotVersion, workingDir)
loadedVersion = latestSnapshotVersion
+ // Initialize maxVersion upon successful load from DFS
+ fileManager.setMaxSeenVersion(version)
// reset last snapshot version
if (lastSnapshotVersion > latestSnapshotVersion) {
@@ -554,6 +556,11 @@ class RocksDB(
}
}
+ // Set maxVersion when checkpoint files are synced to DFS successfully
+ // We need to handle this explicitly in RocksDB as we could use different
+ // changeLogWriter instances in fileManager instance when committing
+ fileManager.setMaxSeenVersion(newVersion)
+
numKeysOnLoadedVersion = numKeysOnWritingVersion
loadedVersion = newVersion
commitLatencyMs ++= Map(
@@ -640,7 +647,7 @@ class RocksDB(
uploadSnapshot()
}
val cleanupTime = timeTakenMs {
- fileManager.deleteOldVersions(conf.minVersionsToRetain)
+ fileManager.deleteOldVersions(conf.minVersionsToRetain,
conf.minVersionsToDelete)
}
logInfo(log"Cleaned old data, time taken: ${MDC(LogKeys.TIME_UNITS,
cleanupTime)} ms")
}
@@ -896,6 +903,7 @@ class ByteArrayPair(var key: Array[Byte] = null, var value:
Array[Byte] = null)
*/
case class RocksDBConf(
minVersionsToRetain: Int,
+ minVersionsToDelete: Long,
minDeltasForSnapshot: Int,
compactOnCommit: Boolean,
enableChangelogCheckpointing: Boolean,
@@ -1078,6 +1086,7 @@ object RocksDBConf {
RocksDBConf(
storeConf.minVersionsToRetain,
+ storeConf.minVersionsToDelete,
storeConf.minDeltasForSnapshot,
getBooleanConf(COMPACT_ON_COMMIT_CONF),
getBooleanConf(ENABLE_CHANGELOG_CHECKPOINTING_CONF),
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index 6c8db12635fd..7e570bcfea7e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -146,6 +146,9 @@ class RocksDBFileManager(
private def codec = CompressionCodec.createCodec(sparkConf, codecName)
+ private var maxSeenVersion: Option[Long] = None
+ private var minSeenVersion = 1L
+
@volatile private var rootDirChecked: Boolean = false
@volatile private var fileMappings = RocksDBFileMappings(
new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]],
@@ -398,15 +401,63 @@ class RocksDBFileManager(
}
}
+ /**
+ * Set maxSeenVersion to max of itself and version we are uploading.
+ * This is to ensure accuracy in the case the query has restarted from a
particular version.
+ */
+ def setMaxSeenVersion(version: Long): Unit = {
+ if (maxSeenVersion.isDefined) {
+ maxSeenVersion = Some(Math.max(maxSeenVersion.get, version))
+ } else {
+ maxSeenVersion = Some(version)
+ }
+ }
+
+ /**
+ * Determines whether batch deletion of stale version files should be skipped
+ * based on the following parameters and estimates of maximum and minimum
+ * versions present in the checkpoint directory.
+ *
+ * @param numVersionsToRetain Number of versions to retain for rollbacks.
+ * @param minVersionsToDelete Minimum number of stale versions required to
trigger deletion.
+ * @return `true` if insufficient stale versions present, otherwise `false`.
+ */
+ private def shouldSkipDeletion(numVersionsToRetain: Int,
minVersionsToDelete: Long): Boolean = {
+ // If minVersionsToDelete <= 0, we call list every time maintenance is
invoked
+ // This is the original behaviour without list api call optimization
+ if (minVersionsToDelete > 0) {
+ // When maxSeenVersion is defined, we check the if number of stale
version files
+ // are at least the value of minVersionsToDelete for batch deletion of
files
+ // We still proceed with deletion if maxSeenVersion isn't set to ensure
the fallback
+ // is to clean up files if maxSeenVersion fails to be initialized
+ if (maxSeenVersion.isDefined) {
+ logInfo(log"Estimated maximum version is " +
+ log"${MDC(LogKeys.MAX_SEEN_VERSION, maxSeenVersion.get)}" +
+ log" and minimum version is ${MDC(LogKeys.MIN_SEEN_VERSION,
minSeenVersion)}")
+ val versionsToDelete = maxSeenVersion.get - minSeenVersion + 1 -
numVersionsToRetain
+ if (versionsToDelete < minVersionsToDelete) {
+ logInfo(log"Skipping deleting files." +
+ log" Need at least ${MDC(LogKeys.MIN_VERSIONS_TO_DELETE,
minVersionsToDelete)}" +
+ log" stale versions for batch deletion but found only" +
+ log" ${MDC(LogKeys.VERSIONS_TO_DELETE, versionsToDelete)}.")
+ return true
+ }
+ }
+ }
+ false
+ }
+
/**
* Delete old versions by deleting the associated version and SST files.
- * At a high-level, this method finds which versions to delete, and which
SST files that were
+ * At a high-level, when enough stale version files are present for batch
deletion,
+ * this method finds which versions to delete, and which SST files that were
* last used in those versions. It's safe to delete these SST files because
a SST file can
* be reused only in successive versions. Therefore, if a SST file F was
last used in version
* V, then it won't be used in version V+1 or later, and if version V can be
deleted, then
* F can safely be deleted as well.
*
- * To find old files, it does the following.
+ * First, it checks whether enough stale version files are present for batch
deletion.
+ * If true, it does the following to find old files.
* - List all the existing [version].zip files
* - Find the min version that needs to be retained based on the given
`numVersionsToRetain`.
* - Accordingly decide which versions should be deleted.
@@ -426,7 +477,10 @@ class RocksDBFileManager(
* - SST files that were used in a version, but that version got overwritten
with a different
* set of SST files.
*/
- def deleteOldVersions(numVersionsToRetain: Int): Unit = {
+ def deleteOldVersions(numVersionsToRetain: Int, minVersionsToDelete: Long =
0): Unit = {
+ // Check if enough stale version files present
+ if (shouldSkipDeletion(numVersionsToRetain, minVersionsToDelete)) return
+
val path = new Path(dfsRootDir)
val allFiles = fm.list(path).map(_.getPath)
val snapshotFiles = allFiles.filter(file => onlyZipFiles.accept(file))
@@ -526,6 +580,10 @@ class RocksDBFileManager(
.map(_.getName.stripSuffix(".changelog")).map(_.toLong)
.filter(_ < minVersionToRetain)
deleteChangelogFiles(changelogVersionsToDelete)
+
+ // Always set minSeenVersion for regular deletion frequency even if
deletion fails.
+ // This is safe because subsequent calls retry deleting old version files
+ minSeenVersion = minVersionToRetain
}
/** Save immutable files to DFS directory */
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
index c7004524097a..e199e1a4765e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
@@ -41,6 +41,14 @@ class StateStoreConf(
/** Minimum versions a State Store implementation should retain to allow
rollbacks */
val minVersionsToRetain: Int = sqlConf.minBatchesToRetain
+ /**
+ * Minimum number of stale checkpoint versions that need to be present in
the DFS
+ * checkpoint directory for old state checkpoint version deletion to be
invoked.
+ * This is to amortize the cost of discovering and deleting old checkpoint
versions.
+ */
+ val minVersionsToDelete: Long =
+ Math.round(sqlConf.ratioExtraSpaceAllowedInCheckpoint *
sqlConf.minBatchesToRetain)
+
/** Maximum count of versions a State Store implementation should retain in
memory */
val maxVersionsToRetainInMemory: Int = sqlConf.maxBatchesToRetainInMemory
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index b09e562d566b..181ee53d6bb9 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -230,12 +230,12 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
}
testWithColumnFamilies(
- "RocksDB: purge changelog and snapshots",
+ "RocksDB: purge changelog and snapshots with minVersionsToDelete = 0",
TestWithChangelogCheckpointingEnabled) { colFamiliesEnabled =>
val remoteDir = Utils.createTempDir().toString
new File(remoteDir).delete() // to make sure that the directory gets
created
val conf = dbConf.copy(enableChangelogCheckpointing = true,
- minVersionsToRetain = 3, minDeltasForSnapshot = 1)
+ minVersionsToRetain = 3, minDeltasForSnapshot = 1, minVersionsToDelete =
0)
withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled) {
db =>
db.load(0)
db.commit()
@@ -271,6 +271,51 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
}
}
+ testWithColumnFamilies(
+ "RocksDB: purge version files with minVersionsToDelete > 0",
+ TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled
=>
+ val remoteDir = Utils.createTempDir().toString
+ new File(remoteDir).delete() // to make sure that the directory gets
created
+ val conf = dbConf.copy(
+ minVersionsToRetain = 3, minDeltasForSnapshot = 1, minVersionsToDelete =
3)
+ withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled) {
db =>
+ // Commit 5 versions
+ // stale versions: (1, 2)
+ // keep versions: (3, 4, 5)
+ for (version <- 0 to 4) {
+ // Should upload latest snapshot but not delete any files
+ // since number of stale versions < minVersionsToDelete
+ db.load(version)
+ db.commit()
+ db.doMaintenance()
+ }
+
+ // Commit 1 more version
+ // stale versions: (1, 2, 3)
+ // keep versions: (4, 5, 6)
+ db.load(5)
+ db.commit()
+
+ // Checkpoint directory before maintenance
+ if (isChangelogCheckpointingEnabled) {
+ assert(snapshotVersionsPresent(remoteDir) == (1 to 5))
+ assert(changelogVersionsPresent(remoteDir) == (1 to 6))
+ } else {
+ assert(snapshotVersionsPresent(remoteDir) == (1 to 6))
+ }
+
+ // Should delete stale versions for zip files and change log files
+ // since number of stale versions >= minVersionsToDelete
+ db.doMaintenance()
+
+ // Checkpoint directory after maintenance
+ assert(snapshotVersionsPresent(remoteDir) == Seq(4, 5, 6))
+ if (isChangelogCheckpointingEnabled) {
+ assert(changelogVersionsPresent(remoteDir) == Seq(4, 5, 6))
+ }
+ }
+ }
+
testWithColumnFamilies(
"RocksDB: minDeltasForSnapshot",
TestWithChangelogCheckpointingEnabled) { colFamiliesEnabled =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]