[ https://issues.apache.org/jira/browse/KAFKA-5122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15984467#comment-15984467 ]
Jon Buffington commented on KAFKA-5122: --------------------------------------- There was not a leak; I was miscalculating expected memory usage. My suggestions is to expand documentation on estimating RocksDB off-heap memory usage for stores in the capacity planning guide. In my case, the guidance, "The default parameters may mean that each state store consumes anywhere between 50-100 MB of memory." did not apply. My suggestion would be to provide a formula along the lines of: {noformat} write_buffer_size_mb = 32 # 0.10.2 default write_buffer_count = 3 # 0.10.2 default block_cache_size_mb = 16 # 0.10.2 default estimate_per_segment = (write_buffer_size_mb * write_buffer_count) + block_cache_size_mb estimate_per_segment = 196m {noformat} Then, estimating application RockDB memory requirements: {noformat} store_partitions = 40 # in my application segments = 3 # the default for windowed stores in 0.10.2 estimate = estimate_per_segment * partitions * segments estimate = 23520m {noformat} References: * [https://github.com/facebook/rocksdb/issues/706] * [https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB] Based on my application's requirements and the above calculations, I ended up creating a RocksDBConfigSetter class: {code} class AppRocksDBConfigSetter extends RocksDBConfigSetter { override def setConfig(storeName: String, options: Options, configs: JMap[String, AnyRef]): Unit = { val n = Runtime.getRuntime.availableProcessors // Improve write throughput by increasing compaction throughput. options.setMaxBackgroundCompactions(n) val tableConfig = new org.rocksdb.BlockBasedTableConfig() tableConfig.setBlockCacheSize(16 * 1024 * 1024L) // Reduce block cache size from <https://github.com/apache/kafka/blob/0.10.2/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L75> // as total number of store RocksDB databases is partitions (40) * segments (3) = 120. tableConfig.setBlockSize(16 * 1024L) // Modify default <https://github.com/apache/kafka/blob/0.10.2/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L76> // per <https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#indexes-and-filter-blocks>. tableConfig.setCacheIndexAndFilterBlocks(true) // Do not let index and filter blocks grow unbounded. See <https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-and-filter-blocks> options.setTableFormatConfig(tableConfig) options.setMaxWriteBufferNumber(2) // See <https://github.com/facebook/rocksdb/blob/8dee8cad9ee6b70fd6e1a5989a8156650a70c04f/include/rocksdb/advanced_options.h#L103> // Default Kafka streams 0.10.2 write_buffer_size is 32m per <https://github.com/apache/kafka/blob/0.10.2/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L74>. // Per RocksDB instance, the memory usage estimate is (32m * 2) + 16m = 80m // The page-view application estimated RocksDV off-heap memory is 9600m } } {code} > Kafka Streams unexpected off-heap memory growth > ----------------------------------------------- > > Key: KAFKA-5122 > URL: https://issues.apache.org/jira/browse/KAFKA-5122 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.2.0 > Environment: Linux 64-bit > Oracle JVM version "1.8.0_121" > Reporter: Jon Buffington > Priority: Minor > > I have a Kafka Streams application that leaks off-heap memory at a rate of > 20MB per commit interval. The application is configured with a 1G heap; the > heap memory does not show signs of leaking. The application reaches 16g of > system memory usage before terminating and restarting. > Application facts: > * The data pipeline is source -> map -> groupByKey -> reduce -> to. > * The reduce operation uses a tumbling time window > TimeWindows.of(TimeUnit.HOURS.toMillis(1)).until(TimeUnit.HOURS.toMillis(168)). > * The commit interval is five minutes (300000ms). > * The application links to v0.10.2.0-cp1 of the Kakfa libraries. When I link > to the current 0.10.2.1 RC3, the leak rate changes to ~10MB per commit > interval. > * The application uses the schema registry for two pairs of serdes. One serde > pair is used to read from a source topic that has 40 partitions. The other > serde pair is used by the internal changelog and repartition topics created > by the groupByKey/reduce operations. > * The source input rate varies between 500-1500 records/sec. The source rate > variation does not change the size or frequency of the leak. > * The application heap has been configured using both 1024m and 2048m. The > only observed difference between the two JVM heap sizes is more old gen > collections at 1024m although there is little difference in throughput. JVM > settings are {-server -Djava.awt.headless=true -Xss256k > -XX:MaxMetaspaceSize=128m -XX:ReservedCodeCacheSize=64m > -XX:CompressedClassSpaceSize=32m -XX:MaxDirectMemorySize=128m > -XX:+AlwaysPreTouch -XX:+UseG1GC -XX:MaxGCPauseMillis=50 > -XX:InitiatingHeapOccupancyPercent=35 -XX:+PerfDisableSharedMem > -XX:+UseStringDeduplication -XX:MinMetaspaceFreeRatio=50 > -XX:MaxMetaspaceFreeRatio=80} > * We configure a custom RocksDBConfigSetter to set > options.setMaxBackgroundCompactions(Runtime.getRuntime.availableProcessors) > * Per > <http://mail-archives.apache.org/mod_mbox/kafka-users/201702.mbox/%3ccahwhrrxxpwgyvr1ctwgoudkr7cqkaq+52phfpuzs4j-wv7k...@mail.gmail.com%3e>, > the SSTables are being compacted. Total disk usage for the state files > (RocksDB) is ~2.5g. Per partition and window, there are 3-4 SSTables. > * The application is written in Scala and compiled using version 2.12.1. > • Oracle JVM version "1.8.0_121" > Various experiments that had no effect on the leak rate: > * Tried different RocksDB block sizes (4k, 16k, and 32k). > * Different numbers of instances (1, 2, and 4). > * Different numbers of threads (1, 4, 10, 40). -- This message was sent by Atlassian JIRA (v6.3.15#6346)