This is an automated email from the ASF dual-hosted git repository.
cloud-fan pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.2 by this push:
new 105ad9a81b5c [SPARK-57183][4.2][SS] Close LRUCache on RocksDB.close()
in unbounded memory mode
105ad9a81b5c is described below
commit 105ad9a81b5c108922f4ae8be9b646e4650a3ebe
Author: Iván Morales <[email protected]>
AuthorDate: Wed Jun 3 10:37:54 2026 +0800
[SPARK-57183][4.2][SS] Close LRUCache on RocksDB.close() in unbounded
memory mode
### What changes were proposed in this pull request?
Backport of #56234 to branch-4.2.
### Why are the changes needed?
See #56234.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
See #56234.
### Was this patch authored or co-authored using generative AI tooling?
Co-authored with Claude (Anthropic), used for analysis, code generation and
review assistance.
Generated-by: Claude Sonnet 4.6
Closes #56281 from kete1987/SPARK-57183-rocksdb-lrucache-leak-branch-4.2.
Authored-by: Iván Morales <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../sql/execution/streaming/state/RocksDB.scala | 9 +++++
.../execution/streaming/state/RocksDBSuite.scala | 43 ++++++++++++++++++++++
2 files changed, 52 insertions(+)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 89e7a3058f3d..bd479ffc6d2a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -2133,6 +2133,15 @@ class RocksDB(
nativeStats.close()
rocksDbOptions.close()
dbLogger.close()
+ // In unbounded memory mode each RocksDB instance owns its LRUCache.
Without explicit
+ // close() the native C++ cache object is only freed when the JVM GC
finalizes the Java
+ // wrapper -- which rarely happens under low heap pressure. Closing
explicitly here
+ // ensures native memory is reclaimed deterministically when the
instance is released.
+ // In bounded mode the cache is a shared singleton managed by
RocksDBMemoryManager
+ // and must not be closed here.
+ if (!conf.boundedMemoryUsage && lruCache != null) {
+ lruCache.close()
+ }
var snapshot = snapshotsToUploadQueue.poll()
while (snapshot != null) {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 87560d174956..dc697f5b99dc 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -3276,6 +3276,49 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
}
}
+ Seq(true, false).foreach { boundedMemoryUsage =>
+ testWithColumnFamilies(
+ s"SPARK-57183: LRUCache is handled correctly on RocksDB.close() " +
+ s"with boundedMemoryUsage=$boundedMemoryUsage",
+ TestWithBothChangelogCheckpointingEnabledAndDisabled) {
colFamiliesEnabled =>
+ withTempDir { dir =>
+ try {
+ val sqlConf = new SQLConf
+ sqlConf.setConfString(
+ RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + "." +
+ RocksDBConf.BOUNDED_MEMORY_USAGE_CONF_KEY,
boundedMemoryUsage.toString)
+ if (boundedMemoryUsage) {
+ sqlConf.setConfString(
+ RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + "." +
+ RocksDBConf.MAX_MEMORY_USAGE_MB_CONF_KEY, "128")
+ }
+ val dbConf = RocksDBConf(StateStoreConf(sqlConf))
+
+ val (_, cache) = withDB(dir.getCanonicalPath, conf = dbConf,
+ useColumnFamilies = colFamiliesEnabled) { db =>
+ db.load(0)
+ db.put("k", "v")
+ db.commit()
+ db.getWriteBufferManagerAndCache()
+ }
+ if (boundedMemoryUsage) {
+ // Shared singleton -- must remain open after a single instance
closes
+ assert(cache.isOwningHandle,
+ "Shared LRUCache handle must not be released after a single
RocksDB.close() " +
+ "in bounded mode")
+ } else {
+ // Per-instance cache -- must be released deterministically on
close()
+ assert(!cache.isOwningHandle,
+ "LRUCache native handle should be released after RocksDB.close()
" +
+ "in unbounded mode")
+ }
+ } finally {
+ RocksDBMemoryManager.resetWriteBufferManagerAndCache
+ }
+ }
+ }
+ }
+
Seq("100", "1000", "100000").foreach { totalMemorySizeMB =>
testWithColumnFamilies(s"Memory mgmt - valid config " +
s"with totalMemorySizeMB=$totalMemorySizeMB",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]