This is an automated email from the ASF dual-hosted git repository.

cloud-fan pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.2 by this push:
     new 105ad9a81b5c [SPARK-57183][4.2][SS] Close LRUCache on RocksDB.close() 
in unbounded memory mode
105ad9a81b5c is described below

commit 105ad9a81b5c108922f4ae8be9b646e4650a3ebe
Author: Iván Morales <[email protected]>
AuthorDate: Wed Jun 3 10:37:54 2026 +0800

    [SPARK-57183][4.2][SS] Close LRUCache on RocksDB.close() in unbounded 
memory mode
    
    ### What changes were proposed in this pull request?
    
    Backport of #56234 to branch-4.2.
    
    ### Why are the changes needed?
    
    See #56234.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    See #56234.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Co-authored with Claude (Anthropic), used for analysis, code generation and 
review assistance.
    Generated-by: Claude Sonnet 4.6
    
    Closes #56281 from kete1987/SPARK-57183-rocksdb-lrucache-leak-branch-4.2.
    
    Authored-by: Iván Morales <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../sql/execution/streaming/state/RocksDB.scala    |  9 +++++
 .../execution/streaming/state/RocksDBSuite.scala   | 43 ++++++++++++++++++++++
 2 files changed, 52 insertions(+)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 89e7a3058f3d..bd479ffc6d2a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -2133,6 +2133,15 @@ class RocksDB(
       nativeStats.close()
       rocksDbOptions.close()
       dbLogger.close()
+      // In unbounded memory mode each RocksDB instance owns its LRUCache. 
Without explicit
+      // close() the native C++ cache object is only freed when the JVM GC 
finalizes the Java
+      // wrapper -- which rarely happens under low heap pressure. Closing 
explicitly here
+      // ensures native memory is reclaimed deterministically when the 
instance is released.
+      // In bounded mode the cache is a shared singleton managed by 
RocksDBMemoryManager
+      // and must not be closed here.
+      if (!conf.boundedMemoryUsage && lruCache != null) {
+        lruCache.close()
+      }
 
       var snapshot = snapshotsToUploadQueue.poll()
       while (snapshot != null) {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 87560d174956..dc697f5b99dc 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -3276,6 +3276,49 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
     }
   }
 
+  Seq(true, false).foreach { boundedMemoryUsage =>
+    testWithColumnFamilies(
+      s"SPARK-57183: LRUCache is handled correctly on RocksDB.close() " +
+        s"with boundedMemoryUsage=$boundedMemoryUsage",
+      TestWithBothChangelogCheckpointingEnabledAndDisabled) { 
colFamiliesEnabled =>
+      withTempDir { dir =>
+        try {
+          val sqlConf = new SQLConf
+          sqlConf.setConfString(
+            RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + "." +
+              RocksDBConf.BOUNDED_MEMORY_USAGE_CONF_KEY, 
boundedMemoryUsage.toString)
+          if (boundedMemoryUsage) {
+            sqlConf.setConfString(
+              RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + "." +
+                RocksDBConf.MAX_MEMORY_USAGE_MB_CONF_KEY, "128")
+          }
+          val dbConf = RocksDBConf(StateStoreConf(sqlConf))
+
+          val (_, cache) = withDB(dir.getCanonicalPath, conf = dbConf,
+            useColumnFamilies = colFamiliesEnabled) { db =>
+            db.load(0)
+            db.put("k", "v")
+            db.commit()
+            db.getWriteBufferManagerAndCache()
+          }
+          if (boundedMemoryUsage) {
+            // Shared singleton -- must remain open after a single instance 
closes
+            assert(cache.isOwningHandle,
+              "Shared LRUCache handle must not be released after a single 
RocksDB.close() " +
+                "in bounded mode")
+          } else {
+            // Per-instance cache -- must be released deterministically on 
close()
+            assert(!cache.isOwningHandle,
+              "LRUCache native handle should be released after RocksDB.close() 
" +
+                "in unbounded mode")
+          }
+        } finally {
+          RocksDBMemoryManager.resetWriteBufferManagerAndCache
+        }
+      }
+    }
+  }
+
   Seq("100", "1000", "100000").foreach { totalMemorySizeMB =>
     testWithColumnFamilies(s"Memory mgmt - valid config " +
       s"with totalMemorySizeMB=$totalMemorySizeMB",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to