Repository: kafka
Updated Branches:
  refs/heads/trunk 1cd86284e -> 2010aa067


MINOR: add memory management section to streams docs

Author: Damian Guy <damian....@gmail.com>

Reviewers: Guozhang Wang <wangg...@gmail.com>, Eno Thereska 
<eno.there...@gmail.com>

Closes #3604 from dguy/memory-management-docs


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2010aa06
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2010aa06
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2010aa06

Branch: refs/heads/trunk
Commit: 2010aa067ffb3adba1afd392d7da10f67e231eb8
Parents: 1cd8628
Author: Damian Guy <damian....@gmail.com>
Authored: Mon Aug 7 10:17:17 2017 +0100
Committer: Damian Guy <damian....@gmail.com>
Committed: Mon Aug 7 10:17:17 2017 +0100

----------------------------------------------------------------------
 .../streams-cache-and-commit-interval.png       | Bin 0 -> 38648 bytes
 docs/streams/developer-guide.html               | 175 +++++++++++++++++++
 2 files changed, 175 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2010aa06/docs/images/streams-cache-and-commit-interval.png
----------------------------------------------------------------------
diff --git a/docs/images/streams-cache-and-commit-interval.png 
b/docs/images/streams-cache-and-commit-interval.png
new file mode 100644
index 0000000..a663bc6
Binary files /dev/null and b/docs/images/streams-cache-and-commit-interval.png 
differ

http://git-wip-us.apache.org/repos/asf/kafka/blob/2010aa06/docs/streams/developer-guide.html
----------------------------------------------------------------------
diff --git a/docs/streams/developer-guide.html 
b/docs/streams/developer-guide.html
index 529261f..97de11c 100644
--- a/docs/streams/developer-guide.html
+++ b/docs/streams/developer-guide.html
@@ -958,6 +958,181 @@
         <li>Collectively, this allows us to query the full state of the entire 
application</li>
     </ul>
 
+    <h3><a id="streams_developer-guide_memory-management" 
href="#streams_developer-guide_memory-management">Memory Management</a></h3>
+
+
+    <h4><a id="streams_developer-guide_memory-management_record-cache" 
href="#streams_developer-guide_memory-management_record-cache">Record caches in 
the DSL</a></h4>
+    <p>
+    Developers of an application using the DSL have the option to specify, for 
an instance of a processing topology, the
+    total memory (RAM) size of a record cache that is leveraged by the 
following <code>KTable</code> instances:
+    </p>
+
+    <ol>
+        <li>Source <code>KTable</code>, i.e. <code>KTable</code> instances 
that are created via <code>StreamBuilder#table()</code> or 
<code>StreamBuilder#globalTable()</code>.</li>
+        <li>Aggregation <code>KTable</code>, i.e. instances of 
<code>KTable</code> that are created as a result of aggregations</li>
+    </ol>
+    <p>
+        For such <code>KTable</code> instances, the record cache is used for:
+    </p>
+    <ol>
+        <li>Internal caching and compacting of output records before they are 
written by the underlying stateful processor node to its internal state 
store.</li>
+        <li>Internal caching and compacting of output records before they are 
forwarded from the underlying stateful processor node to any of its downstream 
processor nodes</li>
+    </ol>
+    <p>
+        Here is a motivating example:
+    </p>
+
+    <ul>
+        <li>Imagine the input is a <code>KStream&lt;String, Integer&gt;</code> 
with the records <code>&lt;A, 1&gt;, &lt;D, 5&gt;, &lt;A, 20&gt;, &lt;A, 
300&gt;</code>.
+            Note that the focus in this example is on the records with key == 
<code>A</code>
+        </li>
+        <li>
+            An aggregation computes the sum of record values, grouped by key, 
for the input above and returns a <code>KTable&lt;String, Integer&gt;</code>.
+            <ul>
+                <li><b>Without caching</b>, what is emitted for key 
<code>A</code> is a sequence of output records that represent changes in the
+                    resulting aggregation table (here, the parentheses denote 
changes, where the left and right numbers denote the new
+                    aggregate value and the previous aggregate value, 
respectively):
+                    <code>&lt;A, (1, null)&gt;, &lt;A, (21, 1)&gt;, &lt;A, 
(321, 21)&gt;</code>.</li>
+                <li>
+                    <b>With caching</b>, the aforementioned three output 
records for key <code>A</code> would likely be compacted in the cache,
+                    leading to a single output record <code>&lt;A, (321, 
null)&gt;</code> that is written to the aggregation's internal state store
+                    and being forwarded to any downstream operations.
+                </li>
+            </ul>
+        </li>
+    </ul>
+
+    <p>
+        The cache size is specified through the 
<code>cache.max.bytes.buffering</code> parameter, which is a global setting per 
processing topology:
+    </p>
+
+    <pre class="brush: java;">
+        // Enable record cache of size 10 MB.
+        Properties streamsConfiguration = new Properties();
+        
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 
1024 * 1024L);
+    </pre>
+
+    <p>
+        This parameter controls the number of bytes allocated for caching.
+        Specifically, for a processor topology instance with <code>T</code> 
threads and <code>C</code> bytes allocated for caching,
+        each thread will have an even <code>C/T</code> bytes to construct its 
own cache and use as it sees fit among its tasks.
+        I.e., there are as many caches as there are threads, but no sharing of 
caches across threads happens.
+        The basic API for the cache is made of <code>put()</code> and 
<code>get()</code> calls.
+        Records are evicted using a simple LRU scheme once the cache size is 
reached.
+        The first time a keyed record <code>R1 = &lt;K1, V1&gt;</code> 
finishes processing at a node, it is marked as dirty in the cache.
+        Any other keyed record <code>R2 = &lt;K1, V2&gt;</code> with the same 
key <code>K1</code> that is processed on that node during that time will 
overwrite <code>&lt;K1, V1&gt;</code>, which we also refer to as "being 
compacted".
+        Note that this has the same effect as <a 
href="https://kafka.apache.org/documentation.html#compaction";>Kafka's log 
compaction</a>, but happens (a) earlier, while the
+        records are still in memory, and (b) within your client-side 
application rather than on the server-side aka the Kafka broker.
+        Upon flushing <code>R2</code> is (1) forwarded to the next processing 
node and (2) written to the local state store.
+    </p>
+
+    <p>
+        The semantics of caching is that data is flushed to the state store 
and forwarded to the next downstream processor node
+        whenever the earliest of <code>commit.interval.ms</code> or 
<code>cache.max.bytes.buffering</code> (cache pressure) hits.
+        Both <code>commit.interval.ms</code> and 
<code>cache.max.bytes.buffering</code> are <b>global</b> parameters:  they 
apply to all processor nodes in
+        the topology, i.e., it is not possible to specify different parameters 
for each node.
+        Below we provide some example settings for both parameters based on 
desired scenarios.
+    </p>
+
+    <p>To turn off caching the cache size can be set to zero:</p>
+    <pre class="brush: java;">
+        // Disable record cache
+        Properties streamsConfiguration = new Properties();
+        
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+    </pre>
+
+    <p>
+        Turning off caching might result in high write traffic for the 
underlying RocksDB store.
+        With default settings caching is enabled within Kafka Streams but 
RocksDB caching is disabled.
+        Thus, to avoid high write traffic it is recommended to enable RocksDB 
caching if Kafka Streams caching is turned off.
+    </p>
+
+    <p>
+        For example, the RocksDB Block Cache could be set to 100MB and Write 
Buffer size to 32 MB.
+    </p>
+    <p>
+        To enable caching but still have an upper bound on how long records 
will be cached, the commit interval can be set
+        appropriately (in this example, it is set to 1000 milliseconds):
+    </p>
+    <pre class="brush: java;">
+        Properties streamsConfiguration = new Properties();
+        // Enable record cache of size 10 MB.
+        
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 
1024 * 1024L);
+        // Set commit interval to 1 second.
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
1000);
+    </pre>
+
+    <p>
+        The illustration below shows the effect of these two configurations 
visually.
+        For simplicity we have records with 4 keys: blue, red, yellow and 
green. Without loss of generality, let's assume the cache has space for only 3 
keys.
+        When the cache is disabled, we observer that all the input records 
will be output. With the cache enabled, we make the following observations.
+        First, most records are output at the end of a commit intervals (e.g., 
at <code>t1</code> one blue records is output, which is the final over-write of 
the blue key up to that time).
+        Second, some records are output because of cache pressure, i.e. before 
the end of a commit interval (cf. the red record right before t2).
+        With smaller cache sizes we expect cache pressure to be the primary 
factor that dictates when records are output. With large cache sizes, the 
commit interval will be the primary factor.
+        Third, the number of records output has been reduced (here: from 15 to 
8).
+    </p>
+
+    <img class="centered" 
src="/{{version}}/images/streams-cache-and-commit-interval.png" 
style="width:500pt;height:400pt;">
+    <h4><a id="streams_developer-guide_memory-management_state-store-cache" 
href="#streams_developer-guide_memory-management_state-store-cache">State store 
caches in the Processor API</a></h4>
+
+    <p>
+        Developers of a Kafka Streams application using the Processor API have 
the option to specify, for an instance of a
+        processing topology, the total memory (RAM) size of the <i>state store 
cache</i> that is used for:
+    </p>
+
+    <ul><li>Internal <i>caching and compacting</i> of output records before 
they are written from a <b>stateful</b> processor node to its state 
stores.</li></ul>
+
+    <p>
+        Note that, unlike <a 
href="#streams_developer-guide_memory-management_record-cache">record 
caches</a> in the DSL, the state
+        store cache in the Processor API <i>will not cache or compact</i> any 
output records that are being forwarded downstream.
+        In other words, downstream processor nodes see all records, whereas 
the state stores see a reduced number of records.
+        It is important to note that this does not impact correctness of the 
system but is merely a performance optimization
+        for the state stores.
+    </p>
+    <p>
+        A note on terminology: we use the narrower term <i>state store 
caches</i> when we refer to the Processor API and the
+        broader term <i>record caches</i> when we are writing about the DSL.
+        We made a conscious choice to not expose the more general record 
caches to the Processor API so that we keep it simple and flexible.
+        For example, developers of the Processor API might chose to store a 
record in a state store while forwarding a different value downstream, i.e., 
they
+        might not want to use the unified record cache for both state store 
and forwarding downstream.
+    </p>
+    <p>
+        Following from the example first shown in section <a 
href="#streams_processor_statestore">State Stores</a>, to enable caching, you 
can
+        add the <code>enableCaching</code> call (note that caches are disabled 
by default and there is no explicit <code>disableCaching</code>
+        call) :
+    </p>
+    <pre class="brush: java;">
+        StateStoreSupplier countStoreSupplier =
+            Stores.create("Counts")
+                .withKeys(Serdes.String())
+                .withValues(Serdes.Long())
+                .persistent()
+                .enableCaching()
+                .build();
+    </pre>
+
+    <h4><a id="streams_developer-guide_memory-management_other_memory_usage" 
href="#streams_developer-guide_memory-management_other_memory_usage">Other 
memory usage</a></h4>
+    <p>
+    There are other modules inside Apache Kafka that allocate memory during 
runtime. They include the following:
+    </p>
+    <ul>
+        <li>Producer buffering, managed by the producer config 
<code>buffer.memory</code></li>
+
+        <li>Consumer buffering, currently not strictly managed, but can be 
indirectly controlled by fetch size, i.e.,
+            <code>fetch.max.bytes</code> and 
<code>fetch.max.wait.ms</code>.</li>
+
+        <li>Both producer and consumer also have separate TCP send / receive 
buffers that are not counted as the buffering memory.
+            These are controlled by the <code>send.buffer.bytes</code> / 
<code>receive.buffer.bytes</code> configs.</li>
+
+        <li>Deserialized objects buffering: after ``consumer.poll()`` returns 
records, they will be deserialized to extract
+            timestamp and buffered in the streams space.
+            Currently this is only indirectly controlled by 
<code>buffered.records.per.partition</code>.</li>
+
+        <li>RocksDB's own memory usage, both on-heap and off-heap; critical 
configs (for RocksDB version 4.1.0) include
+            <code>block_cache_size</code>, <code>write_buffer_size</code> and 
<code>max_write_buffer_number</code>.
+            These can be specified through the ``rocksdb.config.setter`` 
configuration.</li>
+    </ul>
+
     <h3><a id="streams_configure_execute" 
href="#streams_configure_execute">Application Configuration and 
Execution</a></h3>
 
     <p>

Reply via email to