[ 
https://issues.apache.org/jira/browse/SPARK-43244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17717393#comment-17717393
 ] 

Anish Shrigondekar commented on SPARK-43244:
--------------------------------------------

[~kimahriman] - we did some investigation here and we believe that it might not 
be ideal to close instance on task completion and reload it later. We think a 
better idea would be to remove writeBatch related changes since they are not 
required and rely on native db operations itself, with WAL disabled. Here is 
the ticket that I created: https://issues.apache.org/jira/browse/SPARK-43311

 

Here is the PR for the change: [https://github.com/apache/spark/pull/40981] 

 

cc - [~kabhwan] 

 

Please let us know if you have any questions/concerns here.

> RocksDB State Store can accumulate unbounded native memory
> ----------------------------------------------------------
>
>                 Key: SPARK-43244
>                 URL: https://issues.apache.org/jira/browse/SPARK-43244
>             Project: Spark
>          Issue Type: New Feature
>          Components: Structured Streaming
>    Affects Versions: 3.3.2
>            Reporter: Adam Binford
>            Priority: Major
>
> We noticed in one of our production stateful streaming jobs using RocksDB 
> that an executor with 20g of heap was using around 40g of resident memory. I 
> noticed a single RocksDB instance was using around 150 MiB of memory, and 
> only 5 MiB or so of this was from the write batch (which is now cleared after 
> committing).
> After reading about RocksDB memory usage (this link was helpful: 
> [https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md])
>  I realized a lot of this was likely the "Index and Filters" memory usage. 
> This job is doing a streaming duplicate with a lot of unique keys so it makes 
> sense these block usages would be high. The problem is that, because as it is 
> now the underlying RocksDB instance stays open on an executor as long as that 
> executor is assigned that stateful partition (to be reused across batches). 
> So a single executor can accumulate a large number of RocksDB instances open 
> at once, each using a certain amount of native memory. In the worst case, a 
> single executor could need to keep open every single partitions' RocksDB 
> instance at once. 
> There are a couple ways you can control the amount of memory used, such as 
> limiting the max open files, or adding the option to use the block cache for 
> the indices and filters, but neither of these solve the underlying problem of 
> accumulating native memory from multiple partitions on an executor.
> The real fix needs to be a mechanism and option to close the underlying 
> RocksDB instance at the end of each task, so you have the option to only ever 
> have one RocksDB instance open at a time, thus having predictable memory 
> usage no matter the size of your data or number of shuffle partitions. 
> We are running this on Spark 3.3, but just kicked off a test to see if things 
> are any different in Spark 3.4.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to