This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e20a2a6319e [SPARK-48931][SS] Reduce Cloud Store List API cost for 
state store maintenance task
4e20a2a6319e is described below

commit 4e20a2a6319eb53bd2b52f4fb4e498bbcbc1f96c
Author: Riya Verma <[email protected]>
AuthorDate: Wed Jul 24 14:03:17 2024 +0900

    [SPARK-48931][SS] Reduce Cloud Store List API cost for state store 
maintenance task
    
    ### What changes were proposed in this pull request?
    
    Currently, during the state store maintenance process, we find which old 
version files of the **RocksDB** state store to delete by listing all existing 
snapshotted version files in the checkpoint directory every 1 minute by 
default. The frequent list calls in the cloud can result in high costs. To 
address this concern and reduce the cost associated with state store 
maintenance, we should aim to minimize the frequency of listing object stores 
inside the maintenance task. To minimize th [...]
    
    The changes include:
    1.  Adding new configuration variable 
`ratioExtraVersionsAllowedInCheckpoint` in **SQLConf**. This determines the 
ratio of extra versions files we want to retain in the checkpoint directory 
compared to number of versions to retain for rollbacks (`minBatchesToRetain`).
    2. Using this config and `minBatchesToRetain`, set `minVersionsToDelete` 
config inside **StateStoreConf** to `minVersionsToDelete = 
ratioExtraVersionsAllowedInCheckpoint * minBatchesToRetain.`
    3.  Using `minSeenVersion` and `maxSeenVersion` variables in 
**RocksDBFileManager** to estimate min/max version present in directory and 
control deletion frequency. This is done by ensuring number of stale versions 
to delete is at least `minVersionsToDelete`
    
    ### Why are the changes needed?
    
    Currently, maintenance operations like snapshotting, purging, deletion, and 
file management is done asynchronously for each data partition. We want to 
shift away periodic deletion and instead rely on the estimated number of files 
in the checkpoint directory to reduce list calls and introduce batch deletion.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Added unit tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #47393 from 
riyaverm-db/reduce-cloud-store-list-api-cost-in-maintenance.
    
    Authored-by: Riya Verma <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../scala/org/apache/spark/internal/LogKey.scala   |  4 ++
 .../org/apache/spark/sql/internal/SQLConf.scala    | 13 +++++
 .../sql/execution/streaming/state/RocksDB.scala    | 11 +++-
 .../streaming/state/RocksDBFileManager.scala       | 64 +++++++++++++++++++++-
 .../execution/streaming/state/StateStoreConf.scala |  8 +++
 .../execution/streaming/state/RocksDBSuite.scala   | 49 ++++++++++++++++-
 6 files changed, 143 insertions(+), 6 deletions(-)

diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala 
b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
index a6184038b523..b84f7d839c2a 100644
--- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
+++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
@@ -396,6 +396,7 @@ private[spark] object LogKeys {
   case object MAX_NUM_PARTITIONS extends LogKey
   case object MAX_NUM_POSSIBLE_BINS extends LogKey
   case object MAX_NUM_ROWS_IN_MEMORY_BUFFER extends LogKey
+  case object MAX_SEEN_VERSION extends LogKey
   case object MAX_SERVICE_NAME_LENGTH extends LogKey
   case object MAX_SIZE extends LogKey
   case object MAX_SLOTS extends LogKey
@@ -420,9 +421,11 @@ private[spark] object LogKeys {
   case object MIN_NUM_FREQUENT_PATTERN extends LogKey
   case object MIN_POINT_PER_CLUSTER extends LogKey
   case object MIN_RATE extends LogKey
+  case object MIN_SEEN_VERSION extends LogKey
   case object MIN_SHARE extends LogKey
   case object MIN_SIZE extends LogKey
   case object MIN_TIME extends LogKey
+  case object MIN_VERSIONS_TO_DELETE extends LogKey
   case object MIN_VERSION_NUM extends LogKey
   case object MISSING_PARENT_STAGES extends LogKey
   case object MODEL_WEIGHTS extends LogKey
@@ -850,6 +853,7 @@ private[spark] object LogKeys {
   case object USER_NAME extends LogKey
   case object UUID extends LogKey
   case object VALUE extends LogKey
+  case object VERSIONS_TO_DELETE extends LogKey
   case object VERSION_NUM extends LogKey
   case object VIEW_ACLS extends LogKey
   case object VIEW_ACLS_GROUPS extends LogKey
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 04fdc7655bb3..a7110bc9feca 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2129,6 +2129,17 @@ object SQLConf {
     .intConf
     .createWithDefault(100)
 
+  val RATIO_EXTRA_SPACE_ALLOWED_IN_CHECKPOINT =
+    buildConf("spark.sql.streaming.ratioExtraSpaceAllowedInCheckpoint")
+    .internal()
+    .doc("The ratio of extra space allowed for batch deletion of files when 
maintenance is" +
+      "invoked. When value > 0, it optimizes the cost of discovering and 
deleting old checkpoint " +
+      "versions. The minimum number of stale versions we retain in checkpoint 
location for batch " +
+      "deletion is calculated by minBatchesToRetain * 
ratioExtraSpaceAllowedInCheckpoint.")
+    .version("4.0.0")
+    .doubleConf
+    .createWithDefault(0.3)
+
   val MAX_BATCHES_TO_RETAIN_IN_MEMORY = 
buildConf("spark.sql.streaming.maxBatchesToRetainInMemory")
     .internal()
     .doc("The maximum number of batches which will be retained in memory to 
avoid " +
@@ -5405,6 +5416,8 @@ class SQLConf extends Serializable with Logging with 
SqlApiConf {
 
   def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN)
 
+  def ratioExtraSpaceAllowedInCheckpoint: Double = 
getConf(RATIO_EXTRA_SPACE_ALLOWED_IN_CHECKPOINT)
+
   def maxBatchesToRetainInMemory: Int = 
getConf(MAX_BATCHES_TO_RETAIN_IN_MEMORY)
 
   def streamingMaintenanceInterval: Long = 
getConf(STREAMING_MAINTENANCE_INTERVAL)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 2f3f5a57261f..b454e0ba5c93 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -185,6 +185,8 @@ class RocksDB(
         val latestSnapshotVersion = 
fileManager.getLatestSnapshotVersion(version)
         val metadata = 
fileManager.loadCheckpointFromDfs(latestSnapshotVersion, workingDir)
         loadedVersion = latestSnapshotVersion
+        // Initialize maxVersion upon successful load from DFS
+        fileManager.setMaxSeenVersion(version)
 
         // reset last snapshot version
         if (lastSnapshotVersion > latestSnapshotVersion) {
@@ -554,6 +556,11 @@ class RocksDB(
         }
       }
 
+      // Set maxVersion when checkpoint files are synced to DFS successfully
+      // We need to handle this explicitly in RocksDB as we could use different
+      // changeLogWriter instances in fileManager instance when committing
+      fileManager.setMaxSeenVersion(newVersion)
+
       numKeysOnLoadedVersion = numKeysOnWritingVersion
       loadedVersion = newVersion
       commitLatencyMs ++= Map(
@@ -640,7 +647,7 @@ class RocksDB(
       uploadSnapshot()
     }
     val cleanupTime = timeTakenMs {
-      fileManager.deleteOldVersions(conf.minVersionsToRetain)
+      fileManager.deleteOldVersions(conf.minVersionsToRetain, 
conf.minVersionsToDelete)
     }
     logInfo(log"Cleaned old data, time taken: ${MDC(LogKeys.TIME_UNITS, 
cleanupTime)} ms")
   }
@@ -896,6 +903,7 @@ class ByteArrayPair(var key: Array[Byte] = null, var value: 
Array[Byte] = null)
  */
 case class RocksDBConf(
     minVersionsToRetain: Int,
+    minVersionsToDelete: Long,
     minDeltasForSnapshot: Int,
     compactOnCommit: Boolean,
     enableChangelogCheckpointing: Boolean,
@@ -1078,6 +1086,7 @@ object RocksDBConf {
 
     RocksDBConf(
       storeConf.minVersionsToRetain,
+      storeConf.minVersionsToDelete,
       storeConf.minDeltasForSnapshot,
       getBooleanConf(COMPACT_ON_COMMIT_CONF),
       getBooleanConf(ENABLE_CHANGELOG_CHECKPOINTING_CONF),
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index 6c8db12635fd..7e570bcfea7e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -146,6 +146,9 @@ class RocksDBFileManager(
 
   private def codec = CompressionCodec.createCodec(sparkConf, codecName)
 
+  private var maxSeenVersion: Option[Long] = None
+  private var minSeenVersion = 1L
+
   @volatile private var rootDirChecked: Boolean = false
   @volatile private var fileMappings = RocksDBFileMappings(
     new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]],
@@ -398,15 +401,63 @@ class RocksDBFileManager(
     }
   }
 
+  /**
+   *  Set maxSeenVersion to max of itself and version we are uploading.
+   *  This is to ensure accuracy in the case the query has restarted from a 
particular version.
+   */
+  def setMaxSeenVersion(version: Long): Unit = {
+    if (maxSeenVersion.isDefined) {
+      maxSeenVersion = Some(Math.max(maxSeenVersion.get, version))
+    } else {
+      maxSeenVersion = Some(version)
+    }
+  }
+
+  /**
+   * Determines whether batch deletion of stale version files should be skipped
+   * based on the following parameters and estimates of maximum and minimum
+   * versions present in the checkpoint directory.
+   *
+   * @param numVersionsToRetain Number of versions to retain for rollbacks.
+   * @param minVersionsToDelete Minimum number of stale versions required to 
trigger deletion.
+   * @return `true` if insufficient stale versions present, otherwise `false`.
+   */
+  private def shouldSkipDeletion(numVersionsToRetain: Int, 
minVersionsToDelete: Long): Boolean = {
+    // If minVersionsToDelete <= 0, we call list every time maintenance is 
invoked
+    // This is the original behaviour without list api call optimization
+    if (minVersionsToDelete > 0) {
+      // When maxSeenVersion is defined, we check the if number of stale 
version files
+      // are at least the value of minVersionsToDelete for batch deletion of 
files
+      // We still proceed with deletion if maxSeenVersion isn't set to ensure 
the fallback
+      // is to clean up files if maxSeenVersion fails to be initialized
+      if (maxSeenVersion.isDefined) {
+        logInfo(log"Estimated maximum version is " +
+          log"${MDC(LogKeys.MAX_SEEN_VERSION, maxSeenVersion.get)}" +
+          log" and minimum version is ${MDC(LogKeys.MIN_SEEN_VERSION, 
minSeenVersion)}")
+        val versionsToDelete = maxSeenVersion.get - minSeenVersion + 1 - 
numVersionsToRetain
+        if (versionsToDelete < minVersionsToDelete) {
+          logInfo(log"Skipping deleting files." +
+            log" Need at least ${MDC(LogKeys.MIN_VERSIONS_TO_DELETE, 
minVersionsToDelete)}" +
+            log" stale versions for batch deletion but found only" +
+            log" ${MDC(LogKeys.VERSIONS_TO_DELETE, versionsToDelete)}.")
+          return true
+        }
+      }
+    }
+    false
+  }
+
   /**
    * Delete old versions by deleting the associated version and SST files.
-   * At a high-level, this method finds which versions to delete, and which 
SST files that were
+   * At a high-level, when enough stale version files are present for batch 
deletion,
+   * this method finds which versions to delete, and which SST files that were
    * last used in those versions. It's safe to delete these SST files because 
a SST file can
    * be reused only in successive versions. Therefore, if a SST file F was 
last used in version
    * V, then it won't be used in version V+1 or later, and if version V can be 
deleted, then
    * F can safely be deleted as well.
    *
-   * To find old files, it does the following.
+   * First, it checks whether enough stale version files are present for batch 
deletion.
+   * If true, it does the following to find old files.
    * - List all the existing [version].zip files
    * - Find the min version that needs to be retained based on the given 
`numVersionsToRetain`.
    * - Accordingly decide which versions should be deleted.
@@ -426,7 +477,10 @@ class RocksDBFileManager(
    * - SST files that were used in a version, but that version got overwritten 
with a different
    *   set of SST files.
    */
-  def deleteOldVersions(numVersionsToRetain: Int): Unit = {
+  def deleteOldVersions(numVersionsToRetain: Int, minVersionsToDelete: Long = 
0): Unit = {
+    // Check if enough stale version files present
+    if (shouldSkipDeletion(numVersionsToRetain, minVersionsToDelete)) return
+
     val path = new Path(dfsRootDir)
     val allFiles = fm.list(path).map(_.getPath)
     val snapshotFiles = allFiles.filter(file => onlyZipFiles.accept(file))
@@ -526,6 +580,10 @@ class RocksDBFileManager(
       .map(_.getName.stripSuffix(".changelog")).map(_.toLong)
       .filter(_ < minVersionToRetain)
     deleteChangelogFiles(changelogVersionsToDelete)
+
+    // Always set minSeenVersion for regular deletion frequency even if 
deletion fails.
+    // This is safe because subsequent calls retry deleting old version files
+    minSeenVersion = minVersionToRetain
   }
 
   /** Save immutable files to DFS directory */
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
index c7004524097a..e199e1a4765e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
@@ -41,6 +41,14 @@ class StateStoreConf(
   /** Minimum versions a State Store implementation should retain to allow 
rollbacks */
   val minVersionsToRetain: Int = sqlConf.minBatchesToRetain
 
+  /**
+   * Minimum number of stale checkpoint versions that need to be present in 
the DFS
+   * checkpoint directory for old state checkpoint version deletion to be 
invoked.
+   * This is to amortize the cost of discovering and deleting old checkpoint 
versions.
+   */
+  val minVersionsToDelete: Long =
+    Math.round(sqlConf.ratioExtraSpaceAllowedInCheckpoint * 
sqlConf.minBatchesToRetain)
+
   /** Maximum count of versions a State Store implementation should retain in 
memory */
   val maxVersionsToRetainInMemory: Int = sqlConf.maxBatchesToRetainInMemory
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index b09e562d566b..181ee53d6bb9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -230,12 +230,12 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
   }
 
   testWithColumnFamilies(
-    "RocksDB: purge changelog and snapshots",
+    "RocksDB: purge changelog and snapshots with minVersionsToDelete = 0",
     TestWithChangelogCheckpointingEnabled) { colFamiliesEnabled =>
     val remoteDir = Utils.createTempDir().toString
     new File(remoteDir).delete() // to make sure that the directory gets 
created
     val conf = dbConf.copy(enableChangelogCheckpointing = true,
-      minVersionsToRetain = 3, minDeltasForSnapshot = 1)
+      minVersionsToRetain = 3, minDeltasForSnapshot = 1, minVersionsToDelete = 
0)
     withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled) { 
db =>
       db.load(0)
       db.commit()
@@ -271,6 +271,51 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
     }
   }
 
+  testWithColumnFamilies(
+    "RocksDB: purge version files with minVersionsToDelete > 0",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+    val remoteDir = Utils.createTempDir().toString
+    new File(remoteDir).delete() // to make sure that the directory gets 
created
+    val conf = dbConf.copy(
+      minVersionsToRetain = 3, minDeltasForSnapshot = 1, minVersionsToDelete = 
3)
+    withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled) { 
db =>
+      // Commit 5 versions
+      // stale versions: (1, 2)
+      // keep versions: (3, 4, 5)
+      for (version <- 0 to 4) {
+        // Should upload latest snapshot but not delete any files
+        // since number of stale versions < minVersionsToDelete
+        db.load(version)
+        db.commit()
+        db.doMaintenance()
+      }
+
+      // Commit 1 more version
+      // stale versions: (1, 2, 3)
+      // keep versions: (4, 5, 6)
+      db.load(5)
+      db.commit()
+
+      // Checkpoint directory before maintenance
+      if (isChangelogCheckpointingEnabled) {
+        assert(snapshotVersionsPresent(remoteDir) == (1 to 5))
+        assert(changelogVersionsPresent(remoteDir) == (1 to 6))
+      } else {
+        assert(snapshotVersionsPresent(remoteDir) == (1 to 6))
+      }
+
+      // Should delete stale versions for zip files and change log files
+      // since number of stale versions >= minVersionsToDelete
+      db.doMaintenance()
+
+      // Checkpoint directory after maintenance
+      assert(snapshotVersionsPresent(remoteDir) == Seq(4, 5, 6))
+      if (isChangelogCheckpointingEnabled) {
+        assert(changelogVersionsPresent(remoteDir) == Seq(4, 5, 6))
+      }
+    }
+  }
+
   testWithColumnFamilies(
     "RocksDB: minDeltasForSnapshot",
     TestWithChangelogCheckpointingEnabled) { colFamiliesEnabled =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to