[ 
https://issues.apache.org/jira/browse/KAFKA-5122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15984467#comment-15984467
 ] 

Jon Buffington commented on KAFKA-5122:
---------------------------------------

There was not a leak; I was miscalculating expected memory usage.

My suggestions is to expand documentation on estimating RocksDB off-heap memory 
usage for stores in the capacity planning guide. In my case, the guidance, "The 
default parameters may mean that each state store consumes anywhere between 
50-100 MB of memory." did not apply. My suggestion would be to provide a 
formula along the lines of:

{noformat}
write_buffer_size_mb = 32  # 0.10.2 default
write_buffer_count = 3 # 0.10.2 default
block_cache_size_mb = 16 # 0.10.2 default
estimate_per_segment = (write_buffer_size_mb * write_buffer_count) + 
block_cache_size_mb
estimate_per_segment = 196m
{noformat}

Then, estimating application RockDB memory requirements:
{noformat}
store_partitions = 40  # in my application
segments = 3 # the default for windowed stores in 0.10.2
estimate = estimate_per_segment * partitions * segments
estimate = 23520m
{noformat}

References:
* [https://github.com/facebook/rocksdb/issues/706]
* [https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB]

Based on my application's requirements and the above calculations, I ended up 
creating a RocksDBConfigSetter class:
{code}
class AppRocksDBConfigSetter extends RocksDBConfigSetter {
  override def setConfig(storeName: String, options: Options, configs: 
JMap[String, AnyRef]): Unit = {
    val n = Runtime.getRuntime.availableProcessors
    // Improve write throughput by increasing compaction throughput.
    options.setMaxBackgroundCompactions(n)

    val tableConfig = new org.rocksdb.BlockBasedTableConfig()
    tableConfig.setBlockCacheSize(16 * 1024 * 1024L)    // Reduce block cache 
size from 
<https://github.com/apache/kafka/blob/0.10.2/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L75>
                                                        // as total number of 
store RocksDB databases is partitions (40) * segments (3) = 120.
    tableConfig.setBlockSize(16 * 1024L)                // Modify default 
<https://github.com/apache/kafka/blob/0.10.2/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L76>
                                                        // per 
<https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#indexes-and-filter-blocks>.
    tableConfig.setCacheIndexAndFilterBlocks(true)      // Do not let index and 
filter blocks grow unbounded. See 
<https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-and-filter-blocks>
    options.setTableFormatConfig(tableConfig)

    options.setMaxWriteBufferNumber(2)                  // See 
<https://github.com/facebook/rocksdb/blob/8dee8cad9ee6b70fd6e1a5989a8156650a70c04f/include/rocksdb/advanced_options.h#L103>

    // Default Kafka streams 0.10.2 write_buffer_size is 32m per 
<https://github.com/apache/kafka/blob/0.10.2/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L74>.
    // Per RocksDB instance, the memory usage estimate is (32m * 2) + 16m = 80m
    // The page-view application estimated RocksDV off-heap memory is 9600m
  }
}
{code}

> Kafka Streams unexpected off-heap memory growth
> -----------------------------------------------
>
>                 Key: KAFKA-5122
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5122
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.0
>         Environment: Linux 64-bit
> Oracle JVM version "1.8.0_121"
>            Reporter: Jon Buffington
>            Priority: Minor
>
> I have a Kafka Streams application that leaks off-heap memory at a rate of 
> 20MB per commit interval. The application is configured with a 1G heap; the 
> heap memory does not show signs of leaking. The application reaches 16g of 
> system memory usage before terminating and restarting.
> Application facts:
> * The data pipeline is source -> map -> groupByKey -> reduce -> to.
> * The reduce operation uses a tumbling time window 
> TimeWindows.of(TimeUnit.HOURS.toMillis(1)).until(TimeUnit.HOURS.toMillis(168)).
> * The commit interval is five minutes (300000ms).
> * The application links to v0.10.2.0-cp1 of the Kakfa libraries. When I link 
> to the current 0.10.2.1 RC3, the leak rate changes to ~10MB per commit 
> interval.
> * The application uses the schema registry for two pairs of serdes. One serde 
> pair is used to read from a source topic that has 40 partitions. The other 
> serde pair is used by the internal changelog and repartition topics created 
> by the groupByKey/reduce operations.
> * The source input rate varies between 500-1500 records/sec. The source rate 
> variation does not change the size or frequency of the leak.
> * The application heap has been configured using both 1024m and 2048m. The 
> only observed difference between the two JVM heap sizes is more old gen 
> collections at 1024m although there is little difference in throughput. JVM 
> settings are {-server -Djava.awt.headless=true -Xss256k 
> -XX:MaxMetaspaceSize=128m -XX:ReservedCodeCacheSize=64m 
> -XX:CompressedClassSpaceSize=32m -XX:MaxDirectMemorySize=128m 
> -XX:+AlwaysPreTouch -XX:+UseG1GC -XX:MaxGCPauseMillis=50 
> -XX:InitiatingHeapOccupancyPercent=35 -XX:+PerfDisableSharedMem 
> -XX:+UseStringDeduplication -XX:MinMetaspaceFreeRatio=50 
> -XX:MaxMetaspaceFreeRatio=80}
> * We configure a custom RocksDBConfigSetter to set 
> options.setMaxBackgroundCompactions(Runtime.getRuntime.availableProcessors)
> * Per 
> <http://mail-archives.apache.org/mod_mbox/kafka-users/201702.mbox/%3ccahwhrrxxpwgyvr1ctwgoudkr7cqkaq+52phfpuzs4j-wv7k...@mail.gmail.com%3e>,
>  the SSTables are being compacted. Total disk usage for the state files 
> (RocksDB) is ~2.5g. Per partition and window, there are 3-4 SSTables.
> * The application is written in Scala and compiled using version 2.12.1.
> • Oracle JVM version "1.8.0_121"
> Various experiments that had no effect on the leak rate:
> * Tried different RocksDB block sizes (4k, 16k, and 32k).
> * Different numbers of instances (1, 2, and 4).
> * Different numbers of threads (1, 4, 10, 40).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to