[ 
https://issues.apache.org/jira/browse/SPARK-57183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anish Shrigondekar reassigned SPARK-57183:
------------------------------------------

    Assignee: Iván Morales

> RocksDB state store leaks native memory when LRUCache is not closed in 
> unbounded mode
> -------------------------------------------------------------------------------------
>
>                 Key: SPARK-57183
>                 URL: https://issues.apache.org/jira/browse/SPARK-57183
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 4.2.0
>            Reporter: Iván Morales
>            Assignee: Iván Morales
>            Priority: Major
>              Labels: pull-request-available
>
> In unbounded memory mode (the default, {{{}boundedMemoryUsage = false{}}}), 
> Spark's RocksDB state store wrapper creates a new {{LRUCache}} per instance 
> in {{RocksDBMemoryManager }}but never calls {{lruCache.close()}} in 
> {{{}RocksDB.close(){}}}. The Java {{LRUCache}} wrapper holds a C++ 
> {{{}shared_ptr<Cache>{}}}, preventing native memory from being freed until 
> the JVM GC finalizes the wrapper. Under low heap pressure (e.g., a 4 GB JVM 
> running streaming tests), GC rarely runs, so unclosed caches accumulate. Over 
> a CI run with ~49 RocksDB-heavy test suites this accumulates 4-8 GB of native 
> memory and causes OOM kills.
> *Affected file*
> {{sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala}}
> *Related:* SPARK-56523 (Statistics native memory leak – already fixed). This 
> is a separate, unfixed issue.
> h2. Root cause
> In {{{}RocksDBMemoryManager.scala{}}}, unbounded mode creates a new cache per 
> instance:
> {code:scala}
> } else {
>   (null, new LRUCache(conf.blockCacheSizeMB * 1024 * 1024))  // new instance 
> per RocksDB
> }
> {code}
> But {{RocksDB.close()}} never closes it:
> {code:scala}
> def close(): Unit = {
>   closeDB()
>   readOptions.close()
>   writeOptions.close()
>   flushOptions.close()
>   nativeStats.close()    // added by SPARK-56523 fix
>   rocksDbOptions.close()
>   dbLogger.close()
>   // lruCache.close()  <-- MISSING
> }
> {code}
> In bounded mode ({{{}boundedMemoryUsage = true{}}}) the cache is a shared 
> singleton in {{RocksDBMemoryManager}} and must not be closed per instance. In 
> unbounded mode (the default) each instance owns its cache and must close it.
> h2. Fix
> Add the following in {{{}RocksDB.close(){}}}, after {{{}dbLogger.close(){}}}:
> {code:scala}
> if (!conf.boundedMemoryUsage && lruCache != null) {
>   lruCache.close()
> }
> {code}
> h2. Evidence
> Standalone reproducer tool: [https://github.com/kete1987/rocksdb-leak-tool]
> Results (60 iterations, --no-gc, --cache-mb 8, Windows 11 / Java 11.0.28):
> ||Mode||Total growth||Per-iter (post-warmup)||Pattern||
> |leak|+578 MB|~8.5 MB|Linear|
> |fixed|+128 MB|~0 KB|Flat|
> The ~8.5 MB/iter matches exactly {{blockCacheSizeMB = 8}} (Spark default), 
> confirming {{LRUCache}} as the sole leak source.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to