This is an automated email from the ASF dual-hosted git repository.

ashrigondekar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 35c299a1e3e3 [SPARK-53002][SS] Show accurate memory usage statistics 
when bounded memory is enabled for RocksDB
35c299a1e3e3 is described below

commit 35c299a1e3e373e20ae45d7604df51c83ff1dbe2
Author: Eric Marnadi <eric.marn...@databricks.com>
AuthorDate: Mon Aug 4 13:07:21 2025 -0700

    [SPARK-53002][SS] Show accurate memory usage statistics when bounded memory 
is enabled for RocksDB
    
    ### What changes were proposed in this pull request?
    
    Currently, RocksDB metrics show 0 bytes used if bounded memory is enabled. 
This PR will provide an approximation of memory used by fetching the memory 
used by RocksDB per executor, then dividing it by the open RocksDB instances 
per executor.
    
    ### Why are the changes needed?
    
    To show more accurate memory usage for when bounded memory is enabled.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Unit tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #51709 from ericm-db/bounded-mem-fix.
    
    Authored-by: Eric Marnadi <eric.marn...@databricks.com>
    Signed-off-by: Anish Shrigondekar <anish.shrigonde...@databricks.com>
---
 .../sql/execution/streaming/state/RocksDB.scala    | 48 +++++++----------
 .../streaming/state/RocksDBMemoryManager.scala     | 23 ++++++++
 .../state/RocksDBStateStoreProvider.scala          |  4 +-
 .../FailureInjectionCheckpointFileManager.scala    |  4 +-
 .../RocksDBCheckpointFailureInjectionSuite.scala   |  2 +-
 .../state/RocksDBStateStoreIntegrationSuite.scala  | 63 ++++++++++++++++++++++
 .../execution/streaming/state/RocksDBSuite.scala   | 14 ++---
 7 files changed, 112 insertions(+), 46 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index dc07351e7914..b47b2db3f9cb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -75,7 +75,7 @@ class RocksDB(
     enableStateStoreCheckpointIds: Boolean = false,
     partitionId: Int = 0,
     eventForwarder: Option[RocksDBEventForwarder] = None,
-    uniqueId: String = "") extends Logging {
+    uniqueId: Option[String] = None) extends Logging {
 
   import RocksDB._
 
@@ -197,11 +197,12 @@ class RocksDB(
   // This prevents performance impact from querying RocksDB memory too 
frequently.
   private val memoryUpdateIntervalMs = conf.memoryUpdateIntervalMs
 
-  // Register with RocksDBMemoryManager if we have a unique ID
-  if (uniqueId.nonEmpty) {
-    // Initial registration with zero memory usage
-    RocksDBMemoryManager.updateMemoryUsage(uniqueId, 0L, 
conf.boundedMemoryUsage)
-  }
+  // Generate a unique ID if not provided to ensure proper memory tracking
+  private val instanceUniqueId = uniqueId.getOrElse(UUID.randomUUID().toString)
+
+  // Register with RocksDBMemoryManager
+  // Initial registration with zero memory usage
+  RocksDBMemoryManager.updateMemoryUsage(instanceUniqueId, 0L, 
conf.boundedMemoryUsage)
 
   @volatile private var numKeysOnLoadedVersion = 0L
   @volatile private var numKeysOnWritingVersion = 0L
@@ -1309,14 +1310,12 @@ class RocksDB(
       }
 
       // Unregister from RocksDBMemoryManager
-      if (uniqueId.nonEmpty) {
-        try {
-          RocksDBMemoryManager.unregisterInstance(uniqueId)
-        } catch {
-          case NonFatal(e) =>
-            logWarning(log"Failed to unregister from RocksDBMemoryManager " +
-              log"${MDC(LogKeys.EXCEPTION, e)}")
-        }
+      try {
+        RocksDBMemoryManager.unregisterInstance(instanceUniqueId)
+      } catch {
+        case NonFatal(e) =>
+          logWarning(log"Failed to unregister from RocksDBMemoryManager " +
+            log"${MDC(LogKeys.EXCEPTION, e)}")
       }
 
       silentDeleteRecursively(localRootDir, "closing RocksDB")
@@ -1351,9 +1350,6 @@ class RocksDB(
   private def metrics: RocksDBMetrics = {
     import HistogramType._
     val totalSSTFilesBytes = getDBProperty("rocksdb.total-sst-files-size")
-    val readerMemUsage = getDBProperty("rocksdb.estimate-table-readers-mem")
-    val memTableMemUsage = getDBProperty("rocksdb.size-all-mem-tables")
-    val blockCacheUsage = getDBProperty("rocksdb.block-cache-usage")
     val pinnedBlocksMemUsage = 
getDBProperty("rocksdb.block-cache-pinned-usage")
     val nativeOpsHistograms = Seq(
       "get" -> DB_GET,
@@ -1387,14 +1383,8 @@ class RocksDB(
       nativeStats.getTickerCount(typ)
     }
 
-    // if bounded memory usage is enabled, we share the block cache across all 
state providers
-    // running on the same node and account the usage to this single cache. In 
this case, its not
-    // possible to provide partition level or query level memory usage.
-    val memoryUsage = if (conf.boundedMemoryUsage) {
-      0L
-    } else {
-      readerMemUsage + memTableMemUsage + blockCacheUsage
-    }
+    // Use RocksDBMemoryManager to calculate the memory usage accounting
+    val memoryUsage = 
RocksDBMemoryManager.getInstanceMemoryUsage(instanceUniqueId, getMemoryUsage)
 
     RocksDBMetrics(
       numKeysOnLoadedVersion,
@@ -1463,7 +1453,6 @@ class RocksDB(
    * This is called from task thread operations, so it's already thread-safe.
    */
   def updateMemoryUsageIfNeeded(): Unit = {
-    if (uniqueId.isEmpty) return // No tracking without unique ID
 
     val currentTime = System.currentTimeMillis()
     val timeSinceLastUpdate = currentTime - lastMemoryUpdateTime.get()
@@ -1474,7 +1463,7 @@ class RocksDB(
         lastMemoryUpdateTime.set(currentTime)
         // Report usage to RocksDBMemoryManager
         RocksDBMemoryManager.updateMemoryUsage(
-          uniqueId,
+          instanceUniqueId,
           usage,
           conf.boundedMemoryUsage)
       } catch {
@@ -1606,9 +1595,8 @@ object RocksDB extends Logging {
 
   val mainMemorySources: Seq[String] = Seq(
     "rocksdb.estimate-table-readers-mem",
-    "rocksdb.cur-size-all-mem-tables",
-    "rocksdb.block-cache-usage",
-    "rocksdb.block-cache-pinned-usage")
+    "rocksdb.size-all-mem-tables",
+    "rocksdb.block-cache-usage")
 
   case class RocksDBSnapshot(
       checkpointDir: File,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala
index 2fc5c37814a4..5b0c7d6eacdd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala
@@ -97,6 +97,29 @@ object RocksDBMemoryManager extends Logging with 
UnmanagedMemoryConsumer {
     logDebug(s"Unregistered instance $uniqueId")
   }
 
+  def getNumRocksDBInstances(boundedMemory: Boolean): Long = {
+    instanceMemoryMap.values().asScala.count(_.isBoundedMemory == 
boundedMemory)
+  }
+
+  /**
+   * Get the memory usage for a specific instance, accounting for bounded 
memory sharing.
+   * @param uniqueId The instance's unique identifier
+   * @param totalMemoryUsage The total memory usage of this instance
+   * @return The adjusted memory usage accounting for sharing in bounded 
memory mode
+   */
+  def getInstanceMemoryUsage(uniqueId: String, totalMemoryUsage: Long): Long = 
{
+    val instanceInfo = instanceMemoryMap.get(uniqueId)
+    if (instanceInfo.isBoundedMemory) {
+      // In bounded memory mode, divide by the number of bounded instances
+      // since they share the same memory pool
+      val numBoundedInstances = getNumRocksDBInstances(true)
+      totalMemoryUsage / numBoundedInstances
+    } else {
+      // In unbounded memory mode, each instance has its own memory
+      totalMemoryUsage
+    }
+  }
+
   def getOrCreateRocksDBMemoryManagerAndCache(conf: RocksDBConf): 
(WriteBufferManager, Cache)
     = synchronized {
     // Register with UnifiedMemoryManager (idempotent operation)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
index f044d9301597..0cf32244fc65 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
@@ -787,7 +787,7 @@ private[sql] class RocksDBStateStoreProvider
       enableStateStoreCheckpointIds: Boolean,
       partitionId: Int = 0,
       eventForwarder: Option[RocksDBEventForwarder] = None,
-      uniqueId: String = ""): RocksDB = {
+      uniqueId: Option[String] = None): RocksDB = {
     new RocksDB(
       dfsRootDir,
       conf,
@@ -810,7 +810,7 @@ private[sql] class RocksDBStateStoreProvider
     val localRootDir = Utils.createTempDir(Utils.getLocalDir(sparkConf), 
storeIdStr)
     createRocksDB(dfsRootDir, RocksDBConf(storeConf), localRootDir, 
hadoopConf, loggingId,
       useColumnFamilies, storeConf.enableStateStoreCheckpointIds, 
stateStoreId.partitionId,
-      rocksDBEventForwarder, stateStoreProviderId.toString)
+      rocksDBEventForwarder, Some(stateStoreProviderId.toString))
   }
 
   private val keyValueEncoderMap = new 
java.util.concurrent.ConcurrentHashMap[String,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/FailureInjectionCheckpointFileManager.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/FailureInjectionCheckpointFileManager.scala
index fb698c89ff8e..28e10affbbf2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/FailureInjectionCheckpointFileManager.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/FailureInjectionCheckpointFileManager.scala
@@ -259,7 +259,7 @@ class FailureInjectionRocksDBStateStoreProvider extends 
RocksDBStateStoreProvide
       enableStateStoreCheckpointIds: Boolean,
       partitionId: Int,
       eventForwarder: Option[RocksDBEventForwarder] = None,
-      uniqueId: String): RocksDB = {
+      uniqueId: Option[String]): RocksDB = {
     FailureInjectionRocksDBStateStoreProvider.createRocksDBWithFaultInjection(
       dfsRootDir,
       conf,
@@ -289,7 +289,7 @@ object FailureInjectionRocksDBStateStoreProvider {
       enableStateStoreCheckpointIds: Boolean,
       partitionId: Int,
       eventForwarder: Option[RocksDBEventForwarder],
-      uniqueId: String): RocksDB = {
+      uniqueId: Option[String]): RocksDB = {
     new RocksDB(
       dfsRootDir,
       conf = conf,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBCheckpointFailureInjectionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBCheckpointFailureInjectionSuite.scala
index 4018971d20f4..cb80a77be39a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBCheckpointFailureInjectionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBCheckpointFailureInjectionSuite.scala
@@ -603,7 +603,7 @@ class RocksDBCheckpointFailureInjectionSuite extends 
StreamTest
         enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
         partitionId = 0,
         eventForwarder = None,
-        uniqueId = "")
+        uniqueId = None)
       db.load(version, checkpointId)
       func(db)
     } finally {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
index 9c7b03ac06f5..d9de78dc0430 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
@@ -382,4 +382,67 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest
       }
     }
   }
+
+  testWithColumnFamilies("bounded memory usage calculation",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+    withTempDir { dir =>
+      withSQLConf(
+        (SQLConf.STATE_STORE_PROVIDER_CLASS.key -> 
classOf[RocksDBStateStoreProvider].getName),
+        (SQLConf.CHECKPOINT_LOCATION.key -> dir.getCanonicalPath),
+        (SQLConf.SHUFFLE_PARTITIONS.key -> "2"), // Use 2 partitions to test 
multiple providers
+        (s"${RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX}.boundedMemoryUsage" -> 
"true")) {
+
+        // Clear any existing providers from previous tests
+        RocksDBMemoryManager.resetWriteBufferManagerAndCache
+
+        val inputData = MemoryStream[Int]
+
+        val query = inputData.toDS().toDF("value")
+          .select($"value")
+          .groupBy($"value")
+          .agg(count("*"))
+          .writeStream
+          .format("console")
+          .outputMode("complete")
+          .start()
+
+        try {
+          // Initially no providers should be registered
+          assert(RocksDBMemoryManager.getNumRocksDBInstances(true) == 0)
+
+          // Add data to trigger state store creation
+          inputData.addData(1, 2, 3, 4)
+          query.processAllAvailable()
+
+          // With 2 partitions, we should have 2 bounded memory providers 
registered
+          assert(RocksDBMemoryManager.getNumRocksDBInstances(true) == 2)
+
+          assert(RocksDBMemoryManager.getNumRocksDBInstances(false) == 0)
+
+          // Add more data and check providers remain registered
+          inputData.addData(5, 6, 7, 8)
+          query.processAllAvailable()
+
+          // Should still have 2 instances
+          assert(RocksDBMemoryManager.getNumRocksDBInstances(true) == 2)
+
+          // Verify that the progress contains reasonable memory usage values
+          // With bounded memory, each provider should report its share of 
total memory
+          // (not 0L as in the old implementation)
+          val progress = query.lastProgress
+          val stateOperators = progress.stateOperators
+          assert(stateOperators.nonEmpty)
+
+          // Check that memory usage is reported at the operator level
+          stateOperators.foreach { op =>
+            // Memory usage is reported in memoryUsedBytes, not in 
customMetrics
+            val memUsage = op.memoryUsedBytes
+            assert(memUsage > 0L, s"Memory usage should be greater than 0, but 
was $memUsage")
+          }
+        } finally {
+          query.stop()
+        }
+      }
+    }
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 69373641e850..8aa3edafa8fc 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -2597,11 +2597,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
               db.load(0)
               db.put("a", "1")
               db.commit()
-              if (boundedMemoryUsage == "true") {
-                assert(db.metricsOpt.get.totalMemUsageBytes === 0)
-              } else {
-                assert(db.metricsOpt.get.totalMemUsageBytes > 0)
-              }
+              assert(db.metricsOpt.get.totalMemUsageBytes > 0)
               db.getWriteBufferManagerAndCache()
             }
 
@@ -2612,11 +2608,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
               db.load(0)
               db.put("a", "1")
               db.commit()
-              if (boundedMemoryUsage == "true") {
-                assert(db.metricsOpt.get.totalMemUsageBytes === 0)
-              } else {
-                assert(db.metricsOpt.get.totalMemUsageBytes > 0)
-              }
+              assert(db.metricsOpt.get.totalMemUsageBytes > 0)
               db.getWriteBufferManagerAndCache()
             }
 
@@ -2666,7 +2658,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
             db.remove("a")
             db.put("c", "3")
             db.commit()
-            assert(db.metricsOpt.get.totalMemUsageBytes === 0)
+            assert(db.metricsOpt.get.totalMemUsageBytes > 0)
           }
         } finally {
           RocksDBMemoryManager.resetWriteBufferManagerAndCache


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to