This is an automated email from the ASF dual-hosted git repository. ashrigondekar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 35c299a1e3e3 [SPARK-53002][SS] Show accurate memory usage statistics when bounded memory is enabled for RocksDB 35c299a1e3e3 is described below commit 35c299a1e3e373e20ae45d7604df51c83ff1dbe2 Author: Eric Marnadi <eric.marn...@databricks.com> AuthorDate: Mon Aug 4 13:07:21 2025 -0700 [SPARK-53002][SS] Show accurate memory usage statistics when bounded memory is enabled for RocksDB ### What changes were proposed in this pull request? Currently, RocksDB metrics show 0 bytes used if bounded memory is enabled. This PR will provide an approximation of memory used by fetching the memory used by RocksDB per executor, then dividing it by the open RocksDB instances per executor. ### Why are the changes needed? To show more accurate memory usage for when bounded memory is enabled. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #51709 from ericm-db/bounded-mem-fix. Authored-by: Eric Marnadi <eric.marn...@databricks.com> Signed-off-by: Anish Shrigondekar <anish.shrigonde...@databricks.com> --- .../sql/execution/streaming/state/RocksDB.scala | 48 +++++++---------- .../streaming/state/RocksDBMemoryManager.scala | 23 ++++++++ .../state/RocksDBStateStoreProvider.scala | 4 +- .../FailureInjectionCheckpointFileManager.scala | 4 +- .../RocksDBCheckpointFailureInjectionSuite.scala | 2 +- .../state/RocksDBStateStoreIntegrationSuite.scala | 63 ++++++++++++++++++++++ .../execution/streaming/state/RocksDBSuite.scala | 14 ++--- 7 files changed, 112 insertions(+), 46 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index dc07351e7914..b47b2db3f9cb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -75,7 +75,7 @@ class RocksDB( enableStateStoreCheckpointIds: Boolean = false, partitionId: Int = 0, eventForwarder: Option[RocksDBEventForwarder] = None, - uniqueId: String = "") extends Logging { + uniqueId: Option[String] = None) extends Logging { import RocksDB._ @@ -197,11 +197,12 @@ class RocksDB( // This prevents performance impact from querying RocksDB memory too frequently. private val memoryUpdateIntervalMs = conf.memoryUpdateIntervalMs - // Register with RocksDBMemoryManager if we have a unique ID - if (uniqueId.nonEmpty) { - // Initial registration with zero memory usage - RocksDBMemoryManager.updateMemoryUsage(uniqueId, 0L, conf.boundedMemoryUsage) - } + // Generate a unique ID if not provided to ensure proper memory tracking + private val instanceUniqueId = uniqueId.getOrElse(UUID.randomUUID().toString) + + // Register with RocksDBMemoryManager + // Initial registration with zero memory usage + RocksDBMemoryManager.updateMemoryUsage(instanceUniqueId, 0L, conf.boundedMemoryUsage) @volatile private var numKeysOnLoadedVersion = 0L @volatile private var numKeysOnWritingVersion = 0L @@ -1309,14 +1310,12 @@ class RocksDB( } // Unregister from RocksDBMemoryManager - if (uniqueId.nonEmpty) { - try { - RocksDBMemoryManager.unregisterInstance(uniqueId) - } catch { - case NonFatal(e) => - logWarning(log"Failed to unregister from RocksDBMemoryManager " + - log"${MDC(LogKeys.EXCEPTION, e)}") - } + try { + RocksDBMemoryManager.unregisterInstance(instanceUniqueId) + } catch { + case NonFatal(e) => + logWarning(log"Failed to unregister from RocksDBMemoryManager " + + log"${MDC(LogKeys.EXCEPTION, e)}") } silentDeleteRecursively(localRootDir, "closing RocksDB") @@ -1351,9 +1350,6 @@ class RocksDB( private def metrics: RocksDBMetrics = { import HistogramType._ val totalSSTFilesBytes = getDBProperty("rocksdb.total-sst-files-size") - val readerMemUsage = getDBProperty("rocksdb.estimate-table-readers-mem") - val memTableMemUsage = getDBProperty("rocksdb.size-all-mem-tables") - val blockCacheUsage = getDBProperty("rocksdb.block-cache-usage") val pinnedBlocksMemUsage = getDBProperty("rocksdb.block-cache-pinned-usage") val nativeOpsHistograms = Seq( "get" -> DB_GET, @@ -1387,14 +1383,8 @@ class RocksDB( nativeStats.getTickerCount(typ) } - // if bounded memory usage is enabled, we share the block cache across all state providers - // running on the same node and account the usage to this single cache. In this case, its not - // possible to provide partition level or query level memory usage. - val memoryUsage = if (conf.boundedMemoryUsage) { - 0L - } else { - readerMemUsage + memTableMemUsage + blockCacheUsage - } + // Use RocksDBMemoryManager to calculate the memory usage accounting + val memoryUsage = RocksDBMemoryManager.getInstanceMemoryUsage(instanceUniqueId, getMemoryUsage) RocksDBMetrics( numKeysOnLoadedVersion, @@ -1463,7 +1453,6 @@ class RocksDB( * This is called from task thread operations, so it's already thread-safe. */ def updateMemoryUsageIfNeeded(): Unit = { - if (uniqueId.isEmpty) return // No tracking without unique ID val currentTime = System.currentTimeMillis() val timeSinceLastUpdate = currentTime - lastMemoryUpdateTime.get() @@ -1474,7 +1463,7 @@ class RocksDB( lastMemoryUpdateTime.set(currentTime) // Report usage to RocksDBMemoryManager RocksDBMemoryManager.updateMemoryUsage( - uniqueId, + instanceUniqueId, usage, conf.boundedMemoryUsage) } catch { @@ -1606,9 +1595,8 @@ object RocksDB extends Logging { val mainMemorySources: Seq[String] = Seq( "rocksdb.estimate-table-readers-mem", - "rocksdb.cur-size-all-mem-tables", - "rocksdb.block-cache-usage", - "rocksdb.block-cache-pinned-usage") + "rocksdb.size-all-mem-tables", + "rocksdb.block-cache-usage") case class RocksDBSnapshot( checkpointDir: File, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala index 2fc5c37814a4..5b0c7d6eacdd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala @@ -97,6 +97,29 @@ object RocksDBMemoryManager extends Logging with UnmanagedMemoryConsumer { logDebug(s"Unregistered instance $uniqueId") } + def getNumRocksDBInstances(boundedMemory: Boolean): Long = { + instanceMemoryMap.values().asScala.count(_.isBoundedMemory == boundedMemory) + } + + /** + * Get the memory usage for a specific instance, accounting for bounded memory sharing. + * @param uniqueId The instance's unique identifier + * @param totalMemoryUsage The total memory usage of this instance + * @return The adjusted memory usage accounting for sharing in bounded memory mode + */ + def getInstanceMemoryUsage(uniqueId: String, totalMemoryUsage: Long): Long = { + val instanceInfo = instanceMemoryMap.get(uniqueId) + if (instanceInfo.isBoundedMemory) { + // In bounded memory mode, divide by the number of bounded instances + // since they share the same memory pool + val numBoundedInstances = getNumRocksDBInstances(true) + totalMemoryUsage / numBoundedInstances + } else { + // In unbounded memory mode, each instance has its own memory + totalMemoryUsage + } + } + def getOrCreateRocksDBMemoryManagerAndCache(conf: RocksDBConf): (WriteBufferManager, Cache) = synchronized { // Register with UnifiedMemoryManager (idempotent operation) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala index f044d9301597..0cf32244fc65 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala @@ -787,7 +787,7 @@ private[sql] class RocksDBStateStoreProvider enableStateStoreCheckpointIds: Boolean, partitionId: Int = 0, eventForwarder: Option[RocksDBEventForwarder] = None, - uniqueId: String = ""): RocksDB = { + uniqueId: Option[String] = None): RocksDB = { new RocksDB( dfsRootDir, conf, @@ -810,7 +810,7 @@ private[sql] class RocksDBStateStoreProvider val localRootDir = Utils.createTempDir(Utils.getLocalDir(sparkConf), storeIdStr) createRocksDB(dfsRootDir, RocksDBConf(storeConf), localRootDir, hadoopConf, loggingId, useColumnFamilies, storeConf.enableStateStoreCheckpointIds, stateStoreId.partitionId, - rocksDBEventForwarder, stateStoreProviderId.toString) + rocksDBEventForwarder, Some(stateStoreProviderId.toString)) } private val keyValueEncoderMap = new java.util.concurrent.ConcurrentHashMap[String, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/FailureInjectionCheckpointFileManager.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/FailureInjectionCheckpointFileManager.scala index fb698c89ff8e..28e10affbbf2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/FailureInjectionCheckpointFileManager.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/FailureInjectionCheckpointFileManager.scala @@ -259,7 +259,7 @@ class FailureInjectionRocksDBStateStoreProvider extends RocksDBStateStoreProvide enableStateStoreCheckpointIds: Boolean, partitionId: Int, eventForwarder: Option[RocksDBEventForwarder] = None, - uniqueId: String): RocksDB = { + uniqueId: Option[String]): RocksDB = { FailureInjectionRocksDBStateStoreProvider.createRocksDBWithFaultInjection( dfsRootDir, conf, @@ -289,7 +289,7 @@ object FailureInjectionRocksDBStateStoreProvider { enableStateStoreCheckpointIds: Boolean, partitionId: Int, eventForwarder: Option[RocksDBEventForwarder], - uniqueId: String): RocksDB = { + uniqueId: Option[String]): RocksDB = { new RocksDB( dfsRootDir, conf = conf, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBCheckpointFailureInjectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBCheckpointFailureInjectionSuite.scala index 4018971d20f4..cb80a77be39a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBCheckpointFailureInjectionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBCheckpointFailureInjectionSuite.scala @@ -603,7 +603,7 @@ class RocksDBCheckpointFailureInjectionSuite extends StreamTest enableStateStoreCheckpointIds = enableStateStoreCheckpointIds, partitionId = 0, eventForwarder = None, - uniqueId = "") + uniqueId = None) db.load(version, checkpointId) func(db) } finally { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala index 9c7b03ac06f5..d9de78dc0430 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala @@ -382,4 +382,67 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest } } } + + testWithColumnFamilies("bounded memory usage calculation", + TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => + withTempDir { dir => + withSQLConf( + (SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName), + (SQLConf.CHECKPOINT_LOCATION.key -> dir.getCanonicalPath), + (SQLConf.SHUFFLE_PARTITIONS.key -> "2"), // Use 2 partitions to test multiple providers + (s"${RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX}.boundedMemoryUsage" -> "true")) { + + // Clear any existing providers from previous tests + RocksDBMemoryManager.resetWriteBufferManagerAndCache + + val inputData = MemoryStream[Int] + + val query = inputData.toDS().toDF("value") + .select($"value") + .groupBy($"value") + .agg(count("*")) + .writeStream + .format("console") + .outputMode("complete") + .start() + + try { + // Initially no providers should be registered + assert(RocksDBMemoryManager.getNumRocksDBInstances(true) == 0) + + // Add data to trigger state store creation + inputData.addData(1, 2, 3, 4) + query.processAllAvailable() + + // With 2 partitions, we should have 2 bounded memory providers registered + assert(RocksDBMemoryManager.getNumRocksDBInstances(true) == 2) + + assert(RocksDBMemoryManager.getNumRocksDBInstances(false) == 0) + + // Add more data and check providers remain registered + inputData.addData(5, 6, 7, 8) + query.processAllAvailable() + + // Should still have 2 instances + assert(RocksDBMemoryManager.getNumRocksDBInstances(true) == 2) + + // Verify that the progress contains reasonable memory usage values + // With bounded memory, each provider should report its share of total memory + // (not 0L as in the old implementation) + val progress = query.lastProgress + val stateOperators = progress.stateOperators + assert(stateOperators.nonEmpty) + + // Check that memory usage is reported at the operator level + stateOperators.foreach { op => + // Memory usage is reported in memoryUsedBytes, not in customMetrics + val memUsage = op.memoryUsedBytes + assert(memUsage > 0L, s"Memory usage should be greater than 0, but was $memUsage") + } + } finally { + query.stop() + } + } + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 69373641e850..8aa3edafa8fc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -2597,11 +2597,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession db.load(0) db.put("a", "1") db.commit() - if (boundedMemoryUsage == "true") { - assert(db.metricsOpt.get.totalMemUsageBytes === 0) - } else { - assert(db.metricsOpt.get.totalMemUsageBytes > 0) - } + assert(db.metricsOpt.get.totalMemUsageBytes > 0) db.getWriteBufferManagerAndCache() } @@ -2612,11 +2608,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession db.load(0) db.put("a", "1") db.commit() - if (boundedMemoryUsage == "true") { - assert(db.metricsOpt.get.totalMemUsageBytes === 0) - } else { - assert(db.metricsOpt.get.totalMemUsageBytes > 0) - } + assert(db.metricsOpt.get.totalMemUsageBytes > 0) db.getWriteBufferManagerAndCache() } @@ -2666,7 +2658,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession db.remove("a") db.put("c", "3") db.commit() - assert(db.metricsOpt.get.totalMemUsageBytes === 0) + assert(db.metricsOpt.get.totalMemUsageBytes > 0) } } finally { RocksDBMemoryManager.resetWriteBufferManagerAndCache --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org