Repository: kafka Updated Branches: refs/heads/0.11.0 3bc5ee7dd -> d05112125
MINOR: streams memory management docs update streams memory management docs Author: Damian Guy <damian....@gmail.com> Reviewers: Bill Bejeck <bbej...@gmail.com>, Eno Thereska <eno.there...@gmail.com>, Guozhang Wang <wangg...@gmail.com> Closes #3633 from dguy/mem-doc-011 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d0511212 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d0511212 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d0511212 Branch: refs/heads/0.11.0 Commit: d05112125614ded3febe7fe7ffcdfefd0dd2eb6f Parents: 3bc5ee7 Author: Damian Guy <damian....@gmail.com> Authored: Wed Aug 9 11:55:57 2017 +0100 Committer: Damian Guy <damian....@gmail.com> Committed: Wed Aug 9 11:55:57 2017 +0100 ---------------------------------------------------------------------- .../streams-cache-and-commit-interval.png | Bin 0 -> 38648 bytes docs/streams/developer-guide.html | 175 +++++++++++++++++++ 2 files changed, 175 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d0511212/docs/images/streams-cache-and-commit-interval.png ---------------------------------------------------------------------- diff --git a/docs/images/streams-cache-and-commit-interval.png b/docs/images/streams-cache-and-commit-interval.png new file mode 100644 index 0000000..a663bc6 Binary files /dev/null and b/docs/images/streams-cache-and-commit-interval.png differ http://git-wip-us.apache.org/repos/asf/kafka/blob/d0511212/docs/streams/developer-guide.html ---------------------------------------------------------------------- diff --git a/docs/streams/developer-guide.html b/docs/streams/developer-guide.html index 6f08f38..15298a7 100644 --- a/docs/streams/developer-guide.html +++ b/docs/streams/developer-guide.html @@ -924,6 +924,181 @@ <li>Collectively, this allows us to query the full state of the entire application</li> </ul> + <h3><a id="streams_developer-guide_memory-management" href="#streams_developer-guide_memory-management">Memory Management</a></h3> + + + <h4><a id="streams_developer-guide_memory-management_record-cache" href="#streams_developer-guide_memory-management_record-cache">Record caches in the DSL</a></h4> + <p> + Developers of an application using the DSL have the option to specify, for an instance of a processing topology, the + total memory (RAM) size of a record cache that is leveraged by the following <code>KTable</code> instances: + </p> + + <ol> + <li>Source <code>KTable</code>, i.e. <code>KTable</code> instances that are created via <code>KStreamBuilder#table()</code> or <code>KStreamBuilder#globalTable()</code>.</li> + <li>Aggregation <code>KTable</code>, i.e. instances of <code>KTable</code> that are created as a result of aggregations</li> + </ol> + <p> + For such <code>KTable</code> instances, the record cache is used for: + </p> + <ol> + <li>Internal caching and compacting of output records before they are written by the underlying stateful processor node to its internal state store.</li> + <li>Internal caching and compacting of output records before they are forwarded from the underlying stateful processor node to any of its downstream processor nodes</li> + </ol> + <p> + Here is a motivating example: + </p> + + <ul> + <li>Imagine the input is a <code>KStream<String, Integer></code> with the records <code><A, 1>, <D, 5>, <A, 20>, <A, 300></code>. + Note that the focus in this example is on the records with key == <code>A</code> + </li> + <li> + An aggregation computes the sum of record values, grouped by key, for the input above and returns a <code>KTable<String, Integer></code>. + <ul> + <li><b>Without caching</b>, what is emitted for key <code>A</code> is a sequence of output records that represent changes in the + resulting aggregation table (here, the parentheses denote changes, where the left and right numbers denote the new + aggregate value and the previous aggregate value, respectively): + <code><A, (1, null)>, <A, (21, 1)>, <A, (321, 21)></code>.</li> + <li> + <b>With caching</b>, the aforementioned three output records for key <code>A</code> would likely be compacted in the cache, + leading to a single output record <code><A, (321, null)></code> that is written to the aggregation's internal state store + and being forwarded to any downstream operations. + </li> + </ul> + </li> + </ul> + + <p> + The cache size is specified through the <code>cache.max.bytes.buffering</code> parameter, which is a global setting per processing topology: + </p> + + <pre class="brush: java;"> + // Enable record cache of size 10 MB. + Properties streamsConfiguration = new Properties(); + streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L); + </pre> + + <p> + This parameter controls the number of bytes allocated for caching. + Specifically, for a processor topology instance with <code>T</code> threads and <code>C</code> bytes allocated for caching, + each thread will have an even <code>C/T</code> bytes to construct its own cache and use as it sees fit among its tasks. + I.e., there are as many caches as there are threads, but no sharing of caches across threads happens. + The basic API for the cache is made of <code>put()</code> and <code>get()</code> calls. + Records are evicted using a simple LRU scheme once the cache size is reached. + The first time a keyed record <code>R1 = <K1, V1></code> finishes processing at a node, it is marked as dirty in the cache. + Any other keyed record <code>R2 = <K1, V2></code> with the same key <code>K1</code> that is processed on that node during that time will overwrite <code><K1, V1></code>, which we also refer to as "being compacted". + Note that this has the same effect as <a href="https://kafka.apache.org/documentation.html#compaction">Kafka's log compaction</a>, but happens (a) earlier, while the + records are still in memory, and (b) within your client-side application rather than on the server-side aka the Kafka broker. + Upon flushing <code>R2</code> is (1) forwarded to the next processing node and (2) written to the local state store. + </p> + + <p> + The semantics of caching is that data is flushed to the state store and forwarded to the next downstream processor node + whenever the earliest of <code>commit.interval.ms</code> or <code>cache.max.bytes.buffering</code> (cache pressure) hits. + Both <code>commit.interval.ms</code> and <code>cache.max.bytes.buffering</code> are <b>global</b> parameters: they apply to all processor nodes in + the topology, i.e., it is not possible to specify different parameters for each node. + Below we provide some example settings for both parameters based on desired scenarios. + </p> + + <p>To turn off caching the cache size can be set to zero:</p> + <pre class="brush: java;"> + // Disable record cache + Properties streamsConfiguration = new Properties(); + streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + </pre> + + <p> + Turning off caching might result in high write traffic for the underlying RocksDB store. + With default settings caching is enabled within Kafka Streams but RocksDB caching is disabled. + Thus, to avoid high write traffic it is recommended to enable RocksDB caching if Kafka Streams caching is turned off. + </p> + + <p> + For example, the RocksDB Block Cache could be set to 100MB and Write Buffer size to 32 MB. + </p> + <p> + To enable caching but still have an upper bound on how long records will be cached, the commit interval can be set + appropriately (in this example, it is set to 1000 milliseconds): + </p> + <pre class="brush: java;"> + Properties streamsConfiguration = new Properties(); + // Enable record cache of size 10 MB. + streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L); + // Set commit interval to 1 second. + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + </pre> + + <p> + The illustration below shows the effect of these two configurations visually. + For simplicity we have records with 4 keys: blue, red, yellow and green. Without loss of generality, let's assume the cache has space for only 3 keys. + When the cache is disabled, we observer that all the input records will be output. With the cache enabled, we make the following observations. + First, most records are output at the end of a commit intervals (e.g., at <code>t1</code> one blue records is output, which is the final over-write of the blue key up to that time). + Second, some records are output because of cache pressure, i.e. before the end of a commit interval (cf. the red record right before t2). + With smaller cache sizes we expect cache pressure to be the primary factor that dictates when records are output. With large cache sizes, the commit interval will be the primary factor. + Third, the number of records output has been reduced (here: from 15 to 8). + </p> + + <img class="centered" src="/{{version}}/images/streams-cache-and-commit-interval.png" style="width:500pt;height:400pt;"> + <h4><a id="streams_developer-guide_memory-management_state-store-cache" href="#streams_developer-guide_memory-management_state-store-cache">State store caches in the Processor API</a></h4> + + <p> + Developers of a Kafka Streams application using the Processor API have the option to specify, for an instance of a + processing topology, the total memory (RAM) size of the <i>state store cache</i> that is used for: + </p> + + <ul><li>Internal <i>caching and compacting</i> of output records before they are written from a <b>stateful</b> processor node to its state stores.</li></ul> + + <p> + Note that, unlike <a href="#streams_developer-guide_memory-management_record-cache">record caches</a> in the DSL, the state + store cache in the Processor API <i>will not cache or compact</i> any output records that are being forwarded downstream. + In other words, downstream processor nodes see all records, whereas the state stores see a reduced number of records. + It is important to note that this does not impact correctness of the system but is merely a performance optimization + for the state stores. + </p> + <p> + A note on terminology: we use the narrower term <i>state store caches</i> when we refer to the Processor API and the + broader term <i>record caches</i> when we are writing about the DSL. + We made a conscious choice to not expose the more general record caches to the Processor API so that we keep it simple and flexible. + For example, developers of the Processor API might chose to store a record in a state store while forwarding a different value downstream, i.e., they + might not want to use the unified record cache for both state store and forwarding downstream. + </p> + <p> + Following from the example first shown in section <a href="#streams_processor_statestore">State Stores</a>, to enable caching, you can + add the <code>enableCaching</code> call (note that caches are disabled by default and there is no explicit <code>disableCaching</code> + call) : + </p> + <pre class="brush: java;"> + StateStoreSupplier countStoreSupplier = + Stores.create("Counts") + .withKeys(Serdes.String()) + .withValues(Serdes.Long()) + .persistent() + .enableCaching() + .build(); + </pre> + + <h4><a id="streams_developer-guide_memory-management_other_memory_usage" href="#streams_developer-guide_memory-management_other_memory_usage">Other memory usage</a></h4> + <p> + There are other modules inside Apache Kafka that allocate memory during runtime. They include the following: + </p> + <ul> + <li>Producer buffering, managed by the producer config <code>buffer.memory</code></li> + + <li>Consumer buffering, currently not strictly managed, but can be indirectly controlled by fetch size, i.e., + <code>fetch.max.bytes</code> and <code>fetch.max.wait.ms</code>.</li> + + <li>Both producer and consumer also have separate TCP send / receive buffers that are not counted as the buffering memory. + These are controlled by the <code>send.buffer.bytes</code> / <code>receive.buffer.bytes</code> configs.</li> + + <li>Deserialized objects buffering: after ``consumer.poll()`` returns records, they will be deserialized to extract + timestamp and buffered in the streams space. + Currently this is only indirectly controlled by <code>buffered.records.per.partition</code>.</li> + + <li>RocksDB's own memory usage, both on-heap and off-heap; critical configs (for RocksDB version 4.1.0) include + <code>block_cache_size</code>, <code>write_buffer_size</code> and <code>max_write_buffer_number</code>. + These can be specified through the ``rocksdb.config.setter`` configuration.</li> + </ul> + <h3><a id="streams_configure_execute" href="#streams_configure_execute">Application Configuration and Execution</a></h3> <p>