http://git-wip-us.apache.org/repos/asf/kafka/blob/3e2fe17c/docs/streams/developer-guide/app-reset-tool.html
----------------------------------------------------------------------
diff --git a/docs/streams/developer-guide/app-reset-tool.html 
b/docs/streams/developer-guide/app-reset-tool.html
new file mode 100644
index 0000000..096d37a
--- /dev/null
+++ b/docs/streams/developer-guide/app-reset-tool.html
@@ -0,0 +1,173 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<script><!--#include virtual="../../js/templateData.js" --></script>
+
+<script id="content-template" type="text/x-handlebars-template">
+  <!-- h1>Developer Guide for Kafka Streams</h1 -->
+  <div class="sub-nav-sticky">
+    <div class="sticky-top">
+      <!-- div style="height:35px">
+        <a href="/{{version}}/documentation/streams/">Introduction</a>
+        <a class="active-menu-item" 
href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a>
+        <a href="/{{version}}/documentation/streams/core-concepts">Concepts</a>
+        <a href="/{{version}}/documentation/streams/quickstart">Run Demo 
App</a>
+        <a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write 
App</a>
+      </div -->
+    </div>
+  </div>
+
+    <div class="section" id="application-reset-tool">
+        <span id="streams-developer-guide-app-reset"></span><h1>Application 
Reset Tool<a class="headerlink" href="#application-reset-tool" title="Permalink 
to this headline"></a></h1>
+        <p>You can reset an application and force it to reprocess its data 
from scratch by using the application reset tool.
+            This can be useful for development and testing, or when fixing 
bugs.</p>
+        <p>The application reset tool handles the Kafka Streams <a 
class="reference internal" 
href="manage-topics.html#streams-developer-guide-topics-user"><span class="std 
std-ref">user topics</span></a> (input,
+            output, and intermediate topics) and <a class="reference internal" 
href="manage-topics.html#streams-developer-guide-topics-internal"><span 
class="std std-ref">internal topics</span></a> differently
+            when resetting the application.</p>
+        <p>Here&#8217;s what the application reset tool does for each topic 
type:</p>
+        <ul class="simple">
+            <li>Input topics: Reset to the beginning of the topic. This means 
that it sets the application&#8217;s committed consumer offsets for all 
partitions to each partition&#8217;s <code class="docutils literal"><span 
class="pre">earliest</span></code> offset (for consumer group <code 
class="docutils literal"><span class="pre">application.id</span></code>).</li>
+            <li>Intermediate topics: Skip to the end of the topic, i.e., set 
the application&#8217;s committed consumer offsets for all partitions to each 
partition&#8217;s <code class="docutils literal"><span 
class="pre">logSize</span></code> (for consumer group <code class="docutils 
literal"><span class="pre">application.id</span></code>).</li>
+            <li>Internal topics: Delete the internal topic (this automatically 
deletes any committed offsets).</li>
+        </ul>
+        <p>The application reset tool does not:</p>
+        <ul class="simple">
+            <li>Reset output topics of an application. If any output (or 
intermediate) topics are consumed by downstream
+                applications, it is your responsibility to adjust those 
downstream applications as appropriate when you reset the
+                upstream application.</li>
+            <li>Reset the local environment of your application instances.  It 
is your responsibility to delete the local
+                state on any machine on which an application instance was run. 
 See the instructions in section
+                <a class="reference internal" 
href="#streams-developer-guide-reset-local-environment"><span class="std 
std-ref">Step 2: Reset the local environments of your application 
instances</span></a> on how to do this.</li>
+        </ul>
+        <dl class="docutils">
+            <dt>Prerequisites</dt>
+            <dd><ul class="first last">
+                <li><p class="first">All instances of your application must be 
stopped. Otherwise, the application may enter an invalid state, crash, or 
produce incorrect results. You can verify whether the consumer group with ID 
<code class="docutils literal"><span class="pre">application.id</span></code> 
is still active by using <code class="docutils literal"><span 
class="pre">bin/kafka-consumer-groups</span></code>.</p>
+                </li>
+                <li><p class="first">Use this tool with care and double-check 
its parameters: If you provide wrong parameter values (e.g., typos in <code 
class="docutils literal"><span class="pre">application.id</span></code>) or 
specify parameters inconsistently (e.g., specify the wrong input topics for the 
application), this tool might invalidate the application&#8217;s state or even 
impact other applications, consumer groups, or your Kafka topics.</p>
+                </li>
+                <li><p class="first">You should manually delete and re-create 
any intermediate topics before running the application reset tool. This will 
free up disk space in Kafka brokers.</p>
+                </li>
+                <li><p class="first">You should delete and recreate 
intermediate topics before running the application reset tool, unless the 
following applies:</p>
+                    <blockquote>
+                        <div><ul class="simple">
+                            <li>You have external downstream consumers for the 
application&#8217;s intermediate topics.</li>
+                            <li>You are in a development environment where 
manually deleting and re-creating intermediate topics is unnecessary.</li>
+                        </ul>
+                        </div></blockquote>
+                </li>
+            </ul>
+            </dd>
+        </dl>
+        <div class="section" id="step-1-run-the-application-reset-tool">
+            <h2>Step 1: Run the application reset tool<a class="headerlink" 
href="#step-1-run-the-application-reset-tool" title="Permalink to this 
headline"></a></h2>
+            <p>Invoke the application reset tool from the command line</p>
+            <div class="highlight-bash"><div 
class="highlight"><pre><span></span>&lt;path-to-kafka&gt;/bin/kafka-streams-application-reset
+</pre></div>
+            </div>
+            <p>The tool accepts the following parameters:</p>
+            <div class="highlight-bash"><div 
class="highlight"><pre><span></span>Option <span class="o">(</span>* <span 
class="o">=</span> required<span class="o">)</span>                 Description
+---------------------                 -----------
+* --application-id &lt;String: id&gt;       The Kafka Streams application ID
+                                        <span 
class="o">(</span>application.id<span class="o">)</span>.
+--bootstrap-servers &lt;String: urls&gt;    Comma-separated list of broker 
urls with
+                                        format: HOST1:PORT1,HOST2:PORT2
+                                        <span class="o">(</span>default: 
localhost:9092<span class="o">)</span>
+--config-file &lt;String: file name&gt;     Property file containing configs 
to be
+                                        passed to admin clients and embedded
+                                        consumer.
+--dry-run                             Display the actions that would be
+                                        performed without executing the reset
+                                        commands.
+--input-topics &lt;String: list&gt;         Comma-separated list of user input
+                                        topics. For these topics, the tool will
+                                        reset the offset to the earliest
+                                        available offset.
+--intermediate-topics &lt;String: list&gt;  Comma-separated list of 
intermediate user
+                                        topics <span class="o">(</span>topics 
used in the through<span class="o">()</span>
+                                        method<span class="o">)</span>. For 
these topics, the tool
+                                        will skip to the end.
+--zookeeper                           Zookeeper option is deprecated by
+                                        bootstrap.servers, as the reset tool
+                                        would no longer access Zookeeper
+                                        directly.
+</pre></div>
+            </div>
+            <p>Parameters can be combined as needed.  For example, if you want 
to restart an application from an
+                empty internal state, but not reprocess previous data, simply 
omit the parameters <code class="docutils literal"><span 
class="pre">--input-topics</span></code> and
+                <code class="docutils literal"><span 
class="pre">--intermediate-topics</span></code>.</p>
+        </div>
+        <div class="section" 
id="step-2-reset-the-local-environments-of-your-application-instances">
+            <span 
id="streams-developer-guide-reset-local-environment"></span><h2>Step 2: Reset 
the local environments of your application instances<a class="headerlink" 
href="#step-2-reset-the-local-environments-of-your-application-instances" 
title="Permalink to this headline"></a></h2>
+            <p>For a complete application reset, you must delete the 
application&#8217;s local state directory on any machines where the
+                application instance was run. You must do this before 
restarting an application instance on the same machine.  You can
+                use either of these methods:</p>
+            <ul class="simple">
+                <li>The API method <code class="docutils literal"><span 
class="pre">KafkaStreams#cleanUp()</span></code> in your application code.</li>
+                <li>Manually delete the corresponding local state directory 
(default location: <code class="docutils literal"><span 
class="pre">/var/lib/kafka-streams/&lt;application.id&gt;</span></code>). For 
more information, see <a class="reference internal" 
href="../javadocs.html#streams-javadocs"><span class="std 
std-ref">state.dir</span></a> StreamsConfig class.</li>
+            </ul>
+</div>
+</div>
+
+
+               </div>
+              </div>
+  <div class="pagination">
+    <a href="/{{version}}/documentation/streams/developer-guide/security" 
class="pagination__btn pagination__btn__prev">Previous</a>
+      <a href="/{{version}}/documentation/streams/developer-guide/" 
class="pagination__btn pagination__btn__next">Next</a>
+  </div>
+</script>
+
+<!--#include virtual="../../../includes/_header.htm" -->
+<!--#include virtual="../../../includes/_top.htm" -->
+<div class="content documentation documentation--current">
+  <!--#include virtual="../../../includes/_nav.htm" -->
+  <div class="right">
+    <!--#include virtual="../../../includes/_docs_banner.htm" -->
+    <ul class="breadcrumbs">
+      <li><a href="/documentation">Documentation</a></li>
+      <li><a href="/documentation/streams">Kafka Streams</a></li>
+      <li><a href="/documentation/streams/developer-guide/">Developer 
Guide</a></li>
+    </ul>
+    <div class="p-content"></div>
+  </div>
+</div>
+<!--#include virtual="../../../includes/_footer.htm" -->
+<script>
+    $(function() {
+        // Show selected style on nav item
+        $('.b-nav__streams').addClass('selected');
+
+        //sticky secondary nav
+        var $navbar = $(".sub-nav-sticky"),
+            y_pos = $navbar.offset().top,
+            height = $navbar.height();
+
+        $(window).scroll(function() {
+            var scrollTop = $(window).scrollTop();
+
+            if (scrollTop > y_pos - height) {
+                $navbar.addClass("navbar-fixed")
+            } else if (scrollTop <= y_pos) {
+                $navbar.removeClass("navbar-fixed")
+            }
+        });
+
+        // Display docs subnav items
+        
$('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
+    });
+</script>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e2fe17c/docs/streams/developer-guide/config-streams.html
----------------------------------------------------------------------
diff --git a/docs/streams/developer-guide/config-streams.html 
b/docs/streams/developer-guide/config-streams.html
new file mode 100644
index 0000000..dbac7fb
--- /dev/null
+++ b/docs/streams/developer-guide/config-streams.html
@@ -0,0 +1,717 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<script><!--#include virtual="../../js/templateData.js" --></script>
+
+<script id="content-template" type="text/x-handlebars-template">
+  <!-- h1>Developer Guide for Kafka Streams</h1 -->
+  <div class="sub-nav-sticky">
+    <div class="sticky-top">
+      <!-- div style="height:35px">
+        <a href="/{{version}}/documentation/streams/">Introduction</a>
+        <a class="active-menu-item" 
href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a>
+        <a href="/{{version}}/documentation/streams/core-concepts">Concepts</a>
+        <a href="/{{version}}/documentation/streams/quickstart">Run Demo 
App</a>
+        <a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write 
App</a>
+      </div -->
+    </div>
+  </div>
+
+
+  <div class="section" id="configuring-a-streams-application">
+    <span id="streams-developer-guide-configuration"></span><h1>Configuring a 
Streams Application<a class="headerlink" 
href="#configuring-a-streams-application" title="Permalink to this 
headline"></a></h1>
+    <p>Kafka and Kafka Streams configuration options must be configured before 
using Streams. You can configure Kafka Streams by specifying parameters in a 
<code class="docutils literal"><span class="pre">StreamsConfig</span></code> 
instance.</p>
+    <ol class="arabic">
+      <li><p class="first">Create a <code class="docutils literal"><span 
class="pre">java.util.Properties</span></code> instance.</p>
+      </li>
+      <li><p class="first">Set the <a class="reference internal" 
href="#streams-developer-guide-required-configs"><span class="std 
std-ref">parameters</span></a>.</p>
+      </li>
+      <li><p class="first">Construct a <code class="docutils literal"><span 
class="pre">StreamsConfig</span></code> instance from the <code class="docutils 
literal"><span class="pre">Properties</span></code> instance. For example:</p>
+        <div class="highlight-java"><div 
class="highlight"><pre><span></span><span class="kn">import</span> <span 
class="nn">java.util.Properties</span><span class="o">;</span>
+<span class="kn">import</span> <span 
class="nn">org.apache.kafka.streams.StreamsConfig</span><span class="o">;</span>
+
+<span class="n">Properties</span> <span class="n">settings</span> <span 
class="o">=</span> <span class="k">new</span> <span 
class="n">Properties</span><span class="o">();</span>
+<span class="c1">// Set a few key parameters</span>
+<span class="n">settings</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="n">StreamsConfig</span><span class="o">.</span><span 
class="na">APPLICATION_ID_CONFIG</span><span class="o">,</span> <span 
class="s">&quot;my-first-streams-application&quot;</span><span 
class="o">);</span>
+<span class="n">settings</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="n">StreamsConfig</span><span class="o">.</span><span 
class="na">BOOTSTRAP_SERVERS_CONFIG</span><span class="o">,</span> <span 
class="s">&quot;kafka-broker1:9092&quot;</span><span class="o">);</span>
+<span class="c1">// Any further settings</span>
+<span class="n">settings</span><span class="o">.</span><span 
class="na">put</span><span class="o">(...</span> <span class="o">,</span> <span 
class="o">...);</span>
+
+<span class="c1">// Create an instance of StreamsConfig from the Properties 
instance</span>
+<span class="n">StreamsConfig</span> <span class="n">config</span> <span 
class="o">=</span> <span class="k">new</span> <span 
class="n">StreamsConfig</span><span class="o">(</span><span 
class="n">settings</span><span class="o">);</span>
+</pre></div>
+        </div>
+      </li>
+    </ol>
+    <div class="section" id="configuration-parameter-reference">
+      <span 
id="streams-developer-guide-required-configs"></span><h2>Configuration 
parameter reference<a class="headerlink" 
href="#configuration-parameter-reference" title="Permalink to this 
headline"></a></h2>
+      <p>This section contains the most common Streams configuration 
parameters. For a full reference, see the <a class="reference external" 
href="/current/streams/javadocs/index.html">Streams</a> and <a class="reference 
external" href="/current/clients/javadocs/index.html">Client</a> Javadocs.</p>
+      <div class="contents local topic" id="contents">
+        <ul class="simple">
+          <li><a class="reference internal" 
href="#required-configuration-parameters" id="id3">Required configuration 
parameters</a><ul>
+            <li><a class="reference internal" href="#application-id" 
id="id4">application.id</a></li>
+            <li><a class="reference internal" href="#bootstrap-servers" 
id="id5">bootstrap.servers</a></li>
+          </ul>
+          </li>
+          <li><a class="reference internal" 
href="#optional-configuration-parameters" id="id6">Optional configuration 
parameters</a><ul>
+            <li><a class="reference internal" 
href="#default-deserialization-exception-handler" 
id="id7">default.deserialization.exception.handler</a></li>
+            <li><a class="reference internal" href="#default-key-serde" 
id="id8">default.key.serde</a></li>
+            <li><a class="reference internal" href="#default-value-serde" 
id="id9">default.value.serde</a></li>
+            <li><a class="reference internal" href="#num-standby-replicas" 
id="id10">num.standby.replicas</a></li>
+            <li><a class="reference internal" href="#num-stream-threads" 
id="id11">num.stream.threads</a></li>
+            <li><a class="reference internal" href="#partition-grouper" 
id="id12">partition.grouper</a></li>
+            <li><a class="reference internal" href="#replication-factor" 
id="id13">replication.factor</a></li>
+            <li><a class="reference internal" href="#state-dir" 
id="id14">state.dir</a></li>
+            <li><a class="reference internal" href="#timestamp-extractor" 
id="id15">timestamp.extractor</a></li>
+          </ul>
+          </li>
+          <li><a class="reference internal" 
href="#kafka-consumers-and-producer-configuration-parameters" id="id16">Kafka 
consumers and producer configuration parameters</a><ul>
+            <li><a class="reference internal" href="#naming" 
id="id17">Naming</a></li>
+            <li><a class="reference internal" href="#default-values" 
id="id18">Default Values</a></li>
+            <li><a class="reference internal" href="#enable-auto-commit" 
id="id19">enable.auto.commit</a></li>
+            <li><a class="reference internal" href="#rocksdb-config-setter" 
id="id20">rocksdb.config.setter</a></li>
+          </ul>
+          </li>
+          <li><a class="reference internal" 
href="#recommended-configuration-parameters-for-resiliency" 
id="id21">Recommended configuration parameters for resiliency</a><ul>
+            <li><a class="reference internal" href="#acks" 
id="id22">acks</a></li>
+            <li><a class="reference internal" href="#id2" 
id="id23">replication.factor</a></li>
+          </ul>
+          </li>
+        </ul>
+      </div>
+      <div class="section" id="required-configuration-parameters">
+        <h3><a class="toc-backref" href="#id3">Required configuration 
parameters</a><a class="headerlink" href="#required-configuration-parameters" 
title="Permalink to this headline"></a></h3>
+        <p>Here are the required Streams configuration parameters.</p>
+        <table border="1" class="non-scrolling-table docutils">
+          <colgroup>
+            <col width="20%" />
+            <col width="5%" />
+            <col width="7%" />
+            <col width="38%" />
+            <col width="31%" />
+          </colgroup>
+          <thead valign="bottom">
+          <tr class="row-odd"><th class="head">Parameter Name</th>
+            <th class="head">Importance</th>
+            <th class="head" colspan="2">Description</th>
+            <th class="head">Default Value</th>
+          </tr>
+          </thead>
+          <tbody valign="top">
+          <tr class="row-even"><td>application.id</td>
+            <td>Required</td>
+            <td colspan="2">An identifier for the stream processing 
application.  Must be unique within the Kafka cluster.</td>
+            <td>None</td>
+          </tr>
+          <tr class="row-odd"><td>bootstrap.servers</td>
+            <td>Required</td>
+            <td colspan="2">A list of host/port pairs to use for establishing 
the initial connection to the Kafka cluster.</td>
+            <td>None</td>
+          </tr>
+          </tbody>
+        </table>
+        <div class="section" id="application-id">
+          <h4><a class="toc-backref" href="#id4">application.id</a><a 
class="headerlink" href="#application-id" title="Permalink to this 
headline"></a></h4>
+          <blockquote>
+            <div><p>(Required) The application ID. Each stream processing 
application must have a unique ID. The same ID must be given to
+              all instances of the application.  It is recommended to use only 
alphanumeric characters, <code class="docutils literal"><span 
class="pre">.</span></code> (dot), <code class="docutils literal"><span 
class="pre">-</span></code> (hyphen), and <code class="docutils literal"><span 
class="pre">_</span></code> (underscore). Examples: <code class="docutils 
literal"><span class="pre">&quot;hello_world&quot;</span></code>, <code 
class="docutils literal"><span 
class="pre">&quot;hello_world-v1.0.0&quot;</span></code></p>
+              <p>This ID is used in the following places to isolate resources 
used by the application from others:</p>
+              <ul class="simple">
+                <li>As the default Kafka consumer and producer <code 
class="docutils literal"><span class="pre">client.id</span></code> prefix</li>
+                <li>As the Kafka consumer <code class="docutils literal"><span 
class="pre">group.id</span></code> for coordination</li>
+                <li>As the name of the subdirectory in the state directory 
(cf. <code class="docutils literal"><span 
class="pre">state.dir</span></code>)</li>
+                <li>As the prefix of internal Kafka topic names</li>
+              </ul>
+              <dl class="docutils">
+                <dt>Tip:</dt>
+                <dd>When an application is updated, the <code class="docutils 
literal"><span class="pre">application.id</span></code> should be changed 
unless you want to reuse the existing data in internal topics and state stores.
+                  For example, you could embed the version information within 
<code class="docutils literal"><span class="pre">application.id</span></code>, 
as <code class="docutils literal"><span class="pre">my-app-v1.0.0</span></code> 
and <code class="docutils literal"><span 
class="pre">my-app-v1.0.2</span></code>.</dd>
+              </dl>
+            </div></blockquote>
+        </div>
+        <div class="section" id="bootstrap-servers">
+          <h4><a class="toc-backref" href="#id5">bootstrap.servers</a><a 
class="headerlink" href="#bootstrap-servers" title="Permalink to this 
headline"></a></h4>
+          <blockquote>
+            <div><p>(Required) The Kafka bootstrap servers. This is the same 
<a class="reference external" 
href="http://kafka.apache.org/documentation.html#producerconfigs";>setting</a> 
that is used by the underlying producer and consumer clients to connect to the 
Kafka cluster.
+              Example: <code class="docutils literal"><span 
class="pre">&quot;kafka-broker1:9092,kafka-broker2:9092&quot;</span></code>.</p>
+              <dl class="docutils">
+                <dt>Tip:</dt>
+                <dd>Kafka Streams applications can only communicate with a 
single Kafka cluster specified by this config value.
+                  Future versions of Kafka Streams will support connecting to 
different Kafka clusters for reading input
+                  streams and writing output streams.</dd>
+              </dl>
+            </div></blockquote>
+        </div>
+      </div>
+      <div class="section" id="optional-configuration-parameters">
+        <span id="streams-developer-guide-optional-configs"></span><h3><a 
class="toc-backref" href="#id6">Optional configuration parameters</a><a 
class="headerlink" href="#optional-configuration-parameters" title="Permalink 
to this headline"></a></h3>
+        <p>Here are the optional <a class="reference internal" 
href="../javadocs.html#streams-javadocs"><span class="std std-ref">Streams 
configuration parameters</span></a>, sorted by level of importance:</p>
+        <blockquote>
+          <div><ul class="simple">
+            <li>High: These parameters can have a significant impact on 
performance. Take care when deciding the values of these parameters.</li>
+            <li>Medium: These parameters can have some impact on performance. 
Your specific environment will determine how much tuning effort should be 
focused on these parameters.</li>
+            <li>Low: These parameters have a less general or less significant 
impact on performance.</li>
+          </ul>
+          </div></blockquote>
+        <table border="1" class="non-scrolling-table docutils">
+          <colgroup>
+            <col width="20%" />
+            <col width="5%" />
+            <col width="7%" />
+            <col width="38%" />
+            <col width="31%" />
+          </colgroup>
+          <thead valign="bottom">
+          <tr class="row-odd"><th class="head">Parameter Name</th>
+            <th class="head">Importance</th>
+            <th class="head" colspan="2">Description</th>
+            <th class="head">Default Value</th>
+          </tr>
+          </thead>
+          <tbody valign="top">
+          <tr class="row-even"><td>application.server</td>
+            <td>Low</td>
+            <td colspan="2">A host:port pair pointing to an embedded user 
defined endpoint that can be used for discovering the locations of
+              state stores within a single Kafka Streams application. The 
value of this must be different for each instance
+              of the application.</td>
+            <td>the empty string</td>
+          </tr>
+          <tr class="row-odd"><td>buffered.records.per.partition</td>
+            <td>Low</td>
+            <td colspan="2">The maximum number of records to buffer per 
partition.</td>
+            <td>1000</td>
+          </tr>
+          <tr class="row-even"><td>cache.max.bytes.buffering</td>
+            <td>Medium</td>
+            <td colspan="2">Maximum number of memory bytes to be used for 
record caches across all threads.</td>
+            <td>10485760 bytes</td>
+          </tr>
+          <tr class="row-odd"><td>client.id</td>
+            <td>Medium</td>
+            <td colspan="2">An ID string to pass to the server when making 
requests.
+              (This setting is passed to the consumer/producer clients used 
internally by Kafka Streams.)</td>
+            <td>the empty string</td>
+          </tr>
+          <tr class="row-even"><td>commit.interval.ms</td>
+            <td>Low</td>
+            <td colspan="2">The frequency with which to save the position 
(offsets in source topics) of tasks.</td>
+            <td>30000 milliseconds</td>
+          </tr>
+          <tr 
class="row-odd"><td>default.deserialization.exception.handler</td>
+            <td>Medium</td>
+            <td colspan="2">Exception handling class that implements the <code 
class="docutils literal"><span 
class="pre">DeserializationExceptionHandler</span></code> interface.</td>
+            <td>30000 milliseconds</td>
+          </tr>
+          <tr class="row-even"><td>key.serde</td>
+            <td>Medium</td>
+            <td colspan="2">Default serializer/deserializer class for record 
keys, implements the <code class="docutils literal"><span 
class="pre">Serde</span></code> interface (see also value.serde).</td>
+            <td><code class="docutils literal"><span 
class="pre">Serdes.ByteArray().getClass().getName()</span></code></td>
+          </tr>
+          <tr class="row-odd"><td>metric.reporters</td>
+            <td>Low</td>
+            <td colspan="2">A list of classes to use as metrics reporters.</td>
+            <td>the empty list</td>
+          </tr>
+          <tr class="row-even"><td>metrics.num.samples</td>
+            <td>Low</td>
+            <td colspan="2">The number of samples maintained to compute 
metrics.</td>
+            <td>2</td>
+          </tr>
+          <tr class="row-odd"><td>metrics.recording.level</td>
+            <td>Low</td>
+            <td colspan="2">The highest recording level for metrics.</td>
+            <td><code class="docutils literal"><span 
class="pre">INFO</span></code></td>
+          </tr>
+          <tr class="row-even"><td>metrics.sample.window.ms</td>
+            <td>Low</td>
+            <td colspan="2">The window of time a metrics sample is computed 
over.</td>
+            <td>30000 milliseconds</td>
+          </tr>
+          <tr class="row-odd"><td>num.standby.replicas</td>
+            <td>Medium</td>
+            <td colspan="2">The number of standby replicas for each task.</td>
+            <td>0</td>
+          </tr>
+          <tr class="row-even"><td>num.stream.threads</td>
+            <td>Medium</td>
+            <td colspan="2">The number of threads to execute stream 
processing.</td>
+            <td>1</td>
+          </tr>
+          <tr class="row-odd"><td>partition.grouper</td>
+            <td>Low</td>
+            <td colspan="2">Partition grouper class that implements the <code 
class="docutils literal"><span class="pre">PartitionGrouper</span></code> 
interface.</td>
+            <td>See <a class="reference internal" 
href="#streams-developer-guide-partition-grouper"><span class="std 
std-ref">Partition Grouper</span></a></td>
+          </tr>
+          <tr class="row-even"><td>poll.ms</td>
+            <td>Low</td>
+            <td colspan="2">The amount of time in milliseconds to block 
waiting for input.</td>
+            <td>100 milliseconds</td>
+          </tr>
+          <tr class="row-odd"><td>replication.factor</td>
+            <td>High</td>
+            <td colspan="2">The replication factor for changelog topics and 
repartition topics created by the application.</td>
+            <td>1</td>
+          </tr>
+          <tr class="row-even"><td>state.cleanup.delay.ms</td>
+            <td>Low</td>
+            <td colspan="2">The amount of time in milliseconds to wait before 
deleting state when a partition has migrated.</td>
+            <td>6000000 milliseconds</td>
+          </tr>
+          <tr class="row-odd"><td>state.dir</td>
+            <td>High</td>
+            <td colspan="2">Directory location for state stores.</td>
+            <td><code class="docutils literal"><span 
class="pre">/var/lib/kafka-streams</span></code></td>
+          </tr>
+          <tr class="row-even"><td>timestamp.extractor</td>
+            <td>Medium</td>
+            <td colspan="2">Timestamp extractor class that implements the 
<code class="docutils literal"><span 
class="pre">TimestampExtractor</span></code> interface.</td>
+            <td>See <a class="reference internal" 
href="#streams-developer-guide-timestamp-extractor"><span class="std 
std-ref">Timestamp Extractor</span></a></td>
+          </tr>
+          <tr class="row-odd"><td>value.serde</td>
+            <td>Medium</td>
+            <td colspan="2">Default serializer/deserializer class for record 
values, implements the <code class="docutils literal"><span 
class="pre">Serde</span></code> interface (see also key.serde).</td>
+            <td><code class="docutils literal"><span 
class="pre">Serdes.ByteArray().getClass().getName()</span></code></td>
+          </tr>
+          <tr 
class="row-even"><td>windowstore.changelog.additional.retention.ms</td>
+            <td>Low</td>
+            <td colspan="2">Added to a windows maintainMs to ensure data is 
not deleted from the log prematurely. Allows for clock drift.</td>
+            <td>86400000 milliseconds = 1 day</td>
+          </tr>
+          </tbody>
+        </table>
+        <div class="section" id="default-deserialization-exception-handler">
+          <span id="streams-developer-guide-deh"></span><h4><a 
class="toc-backref" href="#id7">default.deserialization.exception.handler</a><a 
class="headerlink" href="#default-deserialization-exception-handler" 
title="Permalink to this headline"></a></h4>
+          <blockquote>
+            <div><p>The default deserialization exception handler allows you 
to manage record exceptions that fail to deserialize. This
+              can be caused by corrupt data, incorrect serialization logic, or 
unhandled record types. These exception handlers
+              are available:</p>
+              <ul class="simple">
+                <li><a class="reference external" 
href="/4.0.0/streams/javadocs/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.html">LogAndContinueExceptionHandler</a>:
+                  This handler logs the deserialization exception and then 
signals the processing pipeline to continue processing more records.
+                  This log-and-skip strategy allows Kafka Streams to make 
progress instead of failing if there are records that fail
+                  to deserialize.</li>
+                <li><a class="reference external" 
href="/4.0.0/streams/javadocs/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.html">LogAndFailExceptionHandler</a>.
+                  This handler logs the deserialization exception and then 
signals the processing pipeline to stop processing more records.</li>
+              </ul>
+            </div></blockquote>
+        </div>
+        <div class="section" id="default-key-serde">
+          <h4><a class="toc-backref" href="#id8">default.key.serde</a><a 
class="headerlink" href="#default-key-serde" title="Permalink to this 
headline"></a></h4>
+          <blockquote>
+            <div><p>The default Serializer/Deserializer class for record keys. 
Serialization and deserialization in Kafka Streams happens
+              whenever data needs to be materialized, for example:</p>
+              <blockquote>
+                <div><ul class="simple">
+                  <li>Whenever data is read from or written to a <em>Kafka 
topic</em> (e.g., via the <code class="docutils literal"><span 
class="pre">StreamsBuilder#stream()</span></code> and <code class="docutils 
literal"><span class="pre">KStream#to()</span></code> methods).</li>
+                  <li>Whenever data is read from or written to a <em>state 
store</em>.</li>
+                </ul>
+                  <p>This is discussed in more detail in <a class="reference 
internal" href="datatypes.html#streams-developer-guide-serdes"><span class="std 
std-ref">Data types and serialization</span></a>.</p>
+                </div></blockquote>
+            </div></blockquote>
+        </div>
+        <div class="section" id="default-value-serde">
+          <h4><a class="toc-backref" href="#id9">default.value.serde</a><a 
class="headerlink" href="#default-value-serde" title="Permalink to this 
headline"></a></h4>
+          <blockquote>
+            <div><p>The default Serializer/Deserializer class for record 
values. Serialization and deserialization in Kafka Streams
+              happens whenever data needs to be materialized, for example:</p>
+              <ul class="simple">
+                <li>Whenever data is read from or written to a <em>Kafka 
topic</em> (e.g., via the <code class="docutils literal"><span 
class="pre">StreamsBuilder#stream()</span></code> and <code class="docutils 
literal"><span class="pre">KStream#to()</span></code> methods).</li>
+                <li>Whenever data is read from or written to a <em>state 
store</em>.</li>
+              </ul>
+              <p>This is discussed in more detail in <a class="reference 
internal" href="datatypes.html#streams-developer-guide-serdes"><span class="std 
std-ref">Data types and serialization</span></a>.</p>
+            </div></blockquote>
+        </div>
+        <div class="section" id="num-standby-replicas">
+          <span id="streams-developer-guide-standby-replicas"></span><h4><a 
class="toc-backref" href="#id10">num.standby.replicas</a><a class="headerlink" 
href="#num-standby-replicas" title="Permalink to this headline"></a></h4>
+          <blockquote>
+            <div>The number of standby replicas. Standby replicas are shadow 
copies of local state stores. Kafka Streams attempts to create the
+              specified number of replicas and keep them up to date as long as 
there are enough instances running.
+              Standby replicas are used to minimize the latency of task 
failover.  A task that was previously running on a failed instance is
+              preferred to restart on an instance that has standby replicas so 
that the local state store restoration process from its
+              changelog can be minimized.  Details about how Kafka Streams 
makes use of the standby replicas to minimize the cost of
+              resuming tasks on failover can be found in the <a 
class="reference internal" 
href="../architecture.html#streams-architecture-state"><span class="std 
std-ref">State</span></a> section.</div></blockquote>
+        </div>
+        <div class="section" id="num-stream-threads">
+          <h4><a class="toc-backref" href="#id11">num.stream.threads</a><a 
class="headerlink" href="#num-stream-threads" title="Permalink to this 
headline"></a></h4>
+          <blockquote>
+            <div>This specifies the number of stream threads in an instance of 
the Kafka Streams application. The stream processing code runs in these thread.
+              For more information about Kafka Streams threading model, see <a 
class="reference internal" 
href="../architecture.html#streams-architecture-threads"><span class="std 
std-ref">Threading Model</span></a>.</div></blockquote>
+        </div>
+        <div class="section" id="partition-grouper">
+          <span id="streams-developer-guide-partition-grouper"></span><h4><a 
class="toc-backref" href="#id12">partition.grouper</a><a class="headerlink" 
href="#partition-grouper" title="Permalink to this headline"></a></h4>
+          <blockquote>
+            <div>A partition grouper creates a list of stream tasks from the 
partitions of source topics, where each created task is assigned with a group 
of source topic partitions.
+              The default implementation provided by Kafka Streams is <a 
class="reference external" 
href="/4.0.0/streams/javadocs/org/apache/kafka/streams/processor/DefaultPartitionGrouper.html">DefaultPartitionGrouper</a>.
+              It assigns each task with one partition for each of the source 
topic partitions. The generated number of tasks equals the largest
+              number of partitions among the input topics. Usually an 
application does not need to customize the partition grouper.</div></blockquote>
+        </div>
+        <div class="section" id="replication-factor">
+          <span id="replication-factor-parm"></span><h4><a class="toc-backref" 
href="#id13">replication.factor</a><a class="headerlink" 
href="#replication-factor" title="Permalink to this headline"></a></h4>
+          <blockquote>
+            <div><p>This specifies the replication factor of internal topics 
that Kafka Streams creates when local states are used or a stream is
+              repartitioned for aggregation. Replication is important for 
fault tolerance. Without replication even a single broker failure
+              may prevent progress of the stream processing application. It is 
recommended to use a similar replication factor as source topics.</p>
+              <dl class="docutils">
+                <dt>Recommendation:</dt>
+                <dd>Increase the replication factor to 3 to ensure that the 
internal Kafka Streams topic can tolerate up to 2 broker failures.
+                  Note that you will require more storage space as well (3 
times more with the replication factor of 3).</dd>
+              </dl>
+            </div></blockquote>
+        </div>
+        <div class="section" id="state-dir">
+          <h4><a class="toc-backref" href="#id14">state.dir</a><a 
class="headerlink" href="#state-dir" title="Permalink to this 
headline"></a></h4>
+          <blockquote>
+            <div>The state directory. Kafka Streams persists local states 
under the state directory. Each application has a subdirectory on its hosting
+              machine that is located under the state directory. The name of 
the subdirectory is the application ID. The state stores associated
+              with the application are created under this 
subdirectory.</div></blockquote>
+        </div>
+        <div class="section" id="timestamp-extractor">
+          <span id="streams-developer-guide-timestamp-extractor"></span><h4><a 
class="toc-backref" href="#id15">timestamp.extractor</a><a class="headerlink" 
href="#timestamp-extractor" title="Permalink to this headline"></a></h4>
+          <blockquote>
+            <div><p>A timestamp extractor pulls a timestamp from an instance 
of <a class="reference external" 
href="/4.0.0/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.html">ConsumerRecord</a>.
+              Timestamps are used to control the progress of streams.</p>
+              <p>The default extractor is
+                <a class="reference external" 
href="/4.0.0/streams/javadocs/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.html">FailOnInvalidTimestamp</a>.
+                This extractor retrieves built-in timestamps that are 
automatically embedded into Kafka messages by the Kafka producer
+                client since
+                <a class="reference external" 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message";>Kafka
 version 0.10</a>.
+                Depending on the setting of Kafka&#8217;s server-side <code 
class="docutils literal"><span 
class="pre">log.message.timestamp.type</span></code> broker and <code 
class="docutils literal"><span class="pre">message.timestamp.type</span></code> 
topic parameters,
+                this extractor provides you with:</p>
+              <ul class="simple">
+                <li><strong>event-time</strong> processing semantics if <code 
class="docutils literal"><span 
class="pre">log.message.timestamp.type</span></code> is set to <code 
class="docutils literal"><span class="pre">CreateTime</span></code> aka 
&#8220;producer time&#8221;
+                  (which is the default).  This represents the time when a 
Kafka producer sent the original message.  If you use Kafka&#8217;s
+                  official producer client, the timestamp represents 
milliseconds since the epoch.</li>
+                <li><strong>ingestion-time</strong> processing semantics if 
<code class="docutils literal"><span 
class="pre">log.message.timestamp.type</span></code> is set to <code 
class="docutils literal"><span class="pre">LogAppendTime</span></code> aka 
&#8220;broker
+                  time&#8221;.  This represents the time when the Kafka broker 
received the original message, in milliseconds since the epoch.</li>
+              </ul>
+              <p>The <code class="docutils literal"><span 
class="pre">FailOnInvalidTimestamp</span></code> extractor throws an exception 
if a record contains an invalid (i.e. negative) built-in
+                timestamp, because Kafka Streams would not process this record 
but silently drop it.  Invalid built-in timestamps can
+                occur for various reasons:  if for example, you consume a 
topic that is written to by pre-0.10 Kafka producer clients
+                or by third-party producer clients that don&#8217;t support 
the new Kafka 0.10 message format yet;  another situation where
+                this may happen is after upgrading your Kafka cluster from 
<code class="docutils literal"><span class="pre">0.9</span></code> to <code 
class="docutils literal"><span class="pre">0.10</span></code>, where all the 
data that was generated
+                with <code class="docutils literal"><span 
class="pre">0.9</span></code> does not include the <code class="docutils 
literal"><span class="pre">0.10</span></code> message timestamps.</p>
+              <p>If you have data with invalid timestamps and want to process 
it, then there are two alternative extractors available.
+                Both work on built-in timestamps, but handle invalid 
timestamps differently.</p>
+              <ul class="simple">
+                <li><a class="reference external" 
href="/4.0.0/streams/javadocs/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.html">LogAndSkipOnInvalidTimestamp</a>:
+                  This extractor logs a warn message and returns the invalid 
timestamp to Kafka Streams, which will not process but
+                  silently drop the record.
+                  This log-and-skip strategy allows Kafka Streams to make 
progress instead of failing if there are records with an
+                  invalid built-in timestamp in your input data.</li>
+                <li><a class="reference external" 
href="/4.0.0/streams/javadocs/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.html">UsePreviousTimeOnInvalidTimestamp</a>.
+                  This extractor returns the record&#8217;s built-in timestamp 
if it is valid (i.e. not negative).  If the record does not
+                  have a valid built-in timestamps, the extractor returns the 
previously extracted valid timestamp from a record of the
+                  same topic partition as the current record as a timestamp 
estimation.  In case that no timestamp can be estimated, it
+                  throws an exception.</li>
+              </ul>
+              <p>Another built-in extractor is
+                <a class="reference external" 
href="/4.0.0/streams/javadocs/org/apache/kafka/streams/processor/WallclockTimestampExtractor.html">WallclockTimestampExtractor</a>.
+                This extractor does not actually &#8220;extract&#8221; a 
timestamp from the consumed record but rather returns the current time in
+                milliseconds from the system clock (think: <code 
class="docutils literal"><span 
class="pre">System.currentTimeMillis()</span></code>), which effectively means 
Streams will operate
+                on the basis of the so-called <strong>processing-time</strong> 
of events.</p>
+              <p>You can also provide your own timestamp extractors, for 
instance to retrieve timestamps embedded in the payload of
+                messages.  If you cannot extract a valid timestamp, you can 
either throw an exception, return a negative timestamp, or
+                estimate a timestamp.  Returning a negative timestamp will 
result in data loss &#8211; the corresponding record will not be
+                processed but silently dropped.  If you want to estimate a new 
timestamp, you can use the value provided via
+                <code class="docutils literal"><span 
class="pre">previousTimestamp</span></code> (i.e., a Kafka Streams timestamp 
estimation).  Here is an example of a custom
+                <code class="docutils literal"><span 
class="pre">TimestampExtractor</span></code> implementation:</p>
+              <div class="highlight-java"><div 
class="highlight"><pre><span></span><span class="kn">import</span> <span 
class="nn">org.apache.kafka.clients.consumer.ConsumerRecord</span><span 
class="o">;</span>
+<span class="kn">import</span> <span 
class="nn">org.apache.kafka.streams.processor.TimestampExtractor</span><span 
class="o">;</span>
+
+<span class="c1">// Extracts the embedded timestamp of a record (giving you 
&quot;event-time&quot; semantics).</span>
+<span class="kd">public</span> <span class="kd">class</span> <span 
class="nc">MyEventTimeExtractor</span> <span class="kd">implements</span> <span 
class="n">TimestampExtractor</span> <span class="o">{</span>
+
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="kt">long</span> <span 
class="nf">extract</span><span class="o">(</span><span class="kd">final</span> 
<span class="n">ConsumerRecord</span><span class="o">&lt;</span><span 
class="n">Object</span><span class="o">,</span> <span 
class="n">Object</span><span class="o">&gt;</span> <span 
class="n">record</span><span class="o">,</span> <span class="kd">final</span> 
<span class="kt">long</span> <span class="n">previousTimestamp</span><span 
class="o">)</span> <span class="o">{</span>
+    <span class="c1">// `Foo` is your own custom class, which we assume has a 
method that returns</span>
+    <span class="c1">// the embedded timestamp (milliseconds since midnight, 
January 1, 1970 UTC).</span>
+    <span class="kt">long</span> <span class="n">timestamp</span> <span 
class="o">=</span> <span class="o">-</span><span class="mi">1</span><span 
class="o">;</span>
+    <span class="kd">final</span> <span class="n">Foo</span> <span 
class="n">myPojo</span> <span class="o">=</span> <span class="o">(</span><span 
class="n">Foo</span><span class="o">)</span> <span class="n">record</span><span 
class="o">.</span><span class="na">value</span><span class="o">();</span>
+    <span class="k">if</span> <span class="o">(</span><span 
class="n">myPojo</span> <span class="o">!=</span> <span 
class="kc">null</span><span class="o">)</span> <span class="o">{</span>
+      <span class="n">timestamp</span> <span class="o">=</span> <span 
class="n">myPojo</span><span class="o">.</span><span 
class="na">getTimestampInMillis</span><span class="o">();</span>
+    <span class="o">}</span>
+    <span class="k">if</span> <span class="o">(</span><span 
class="n">timestamp</span> <span class="o">&lt;</span> <span 
class="mi">0</span><span class="o">)</span> <span class="o">{</span>
+      <span class="c1">// Invalid timestamp!  Attempt to estimate a new 
timestamp,</span>
+      <span class="c1">// otherwise fall back to wall-clock time 
(processing-time).</span>
+      <span class="k">if</span> <span class="o">(</span><span 
class="n">previousTimestamp</span> <span class="o">&gt;=</span> <span 
class="mi">0</span><span class="o">)</span> <span class="o">{</span>
+        <span class="k">return</span> <span 
class="n">previousTimestamp</span><span class="o">;</span>
+      <span class="o">}</span> <span class="k">else</span> <span 
class="o">{</span>
+        <span class="k">return</span> <span class="n">System</span><span 
class="o">.</span><span class="na">currentTimeMillis</span><span 
class="o">();</span>
+      <span class="o">}</span>
+    <span class="o">}</span>
+  <span class="o">}</span>
+
+<span class="o">}</span>
+</pre></div>
+              </div>
+              <p>You would then define the custom timestamp extractor in your 
Streams configuration as follows:</p>
+              <div class="highlight-java"><div 
class="highlight"><pre><span></span><span class="kn">import</span> <span 
class="nn">java.util.Properties</span><span class="o">;</span>
+<span class="kn">import</span> <span 
class="nn">org.apache.kafka.streams.StreamsConfig</span><span class="o">;</span>
+
+<span class="n">Properties</span> <span class="n">streamsConfiguration</span> 
<span class="o">=</span> <span class="k">new</span> <span 
class="n">Properties</span><span class="o">();</span>
+<span class="n">streamsConfiguration</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="n">StreamsConfig</span><span class="o">.</span><span 
class="na">TIMESTAMP_EXTRACTOR_CLASS_CONFIG</span><span class="o">,</span> 
<span class="n">MyEventTimeExtractor</span><span class="o">.</span><span 
class="na">class</span><span class="o">);</span>
+</pre></div>
+              </div>
+            </div></blockquote>
+        </div>
+      </div>
+      <div class="section" 
id="kafka-consumers-and-producer-configuration-parameters">
+        <h3><a class="toc-backref" href="#id16">Kafka consumers and producer 
configuration parameters</a><a class="headerlink" 
href="#kafka-consumers-and-producer-configuration-parameters" title="Permalink 
to this headline"></a></h3>
+        <p>You can specify parameters for the Kafka <a class="reference 
external" 
href="/4.0.0/clients/javadocs/org/apache/kafka/clients/consumer/package-summary.html">consumers</a>
 and <a class="reference external" 
href="/4.0.0/clients/javadocs/org/apache/kafka/clients/producer/package-summary.html">producers</a>
 that are used internally.  The consumer and producer settings
+          are defined by specifying parameters in a <code class="docutils 
literal"><span class="pre">StreamsConfig</span></code> instance.</p>
+        <p>In this example, the Kafka <a class="reference external" 
href="/4.0.0/clients/javadocs/org/apache/kafka/clients/consumer/ConsumerConfig.html#SESSION_TIMEOUT_MS_CONFIG">consumer
 session timeout</a> is configured to be 60000 milliseconds in the Streams 
settings:</p>
+        <div class="highlight-java"><div 
class="highlight"><pre><span></span><span class="n">Properties</span> <span 
class="n">streamsSettings</span> <span class="o">=</span> <span 
class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
+<span class="c1">// Example of a &quot;normal&quot; setting for Kafka 
Streams</span>
+<span class="n">streamsSettings</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="n">StreamsConfig</span><span class="o">.</span><span 
class="na">BOOTSTRAP_SERVERS_CONFIG</span><span class="o">,</span> <span 
class="s">&quot;kafka-broker-01:9092&quot;</span><span class="o">);</span>
+<span class="c1">// Customize the Kafka consumer settings of your Streams 
application</span>
+<span class="n">streamsSettings</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="n">ConsumerConfig</span><span class="o">.</span><span 
class="na">SESSION_TIMEOUT_MS_CONFIG</span><span class="o">,</span> <span 
class="mi">60000</span><span class="o">);</span>
+<span class="n">StreamsConfig</span> <span class="n">config</span> <span 
class="o">=</span> <span class="k">new</span> <span 
class="n">StreamsConfig</span><span class="o">(</span><span 
class="n">streamsSettings</span><span class="o">);</span>
+</pre></div>
+        </div>
+        <div class="section" id="naming">
+          <h4><a class="toc-backref" href="#id17">Naming</a><a 
class="headerlink" href="#naming" title="Permalink to this headline"></a></h4>
+          <p>Some consumer and producer configuration parameters use the same 
parameter name. For example, <code class="docutils literal"><span 
class="pre">send.buffer.bytes</span></code> and
+            <code class="docutils literal"><span 
class="pre">receive.buffer.bytes</span></code> are used to configure TCP 
buffers; <code class="docutils literal"><span 
class="pre">request.timeout.ms</span></code> and <code class="docutils 
literal"><span class="pre">retry.backoff.ms</span></code> control retries
+            for client request. You can avoid duplicate names by prefix 
parameter names with <code class="docutils literal"><span 
class="pre">consumer.</span></code> or <code class="docutils literal"><span 
class="pre">producer</span></code> (e.g., <code class="docutils literal"><span 
class="pre">consumer.send.buffer.bytes</span></code> and <code class="docutils 
literal"><span class="pre">producer.send.buffer.bytes</span></code>).</p>
+          <div class="highlight-java"><div 
class="highlight"><pre><span></span><span class="n">Properties</span> <span 
class="n">streamsSettings</span> <span class="o">=</span> <span 
class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
+<span class="c1">// same value for consumer and producer</span>
+<span class="n">streamsSettings</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="s">&quot;PARAMETER_NAME&quot;</span><span class="o">,</span> <span 
class="s">&quot;value&quot;</span><span class="o">);</span>
+<span class="c1">// different values for consumer and producer</span>
+<span class="n">streamsSettings</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="s">&quot;consumer.PARAMETER_NAME&quot;</span><span class="o">,</span> 
<span class="s">&quot;consumer-value&quot;</span><span class="o">);</span>
+<span class="n">streamsSettings</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="s">&quot;producer.PARAMETER_NAME&quot;</span><span class="o">,</span> 
<span class="s">&quot;producer-value&quot;</span><span class="o">);</span>
+<span class="c1">// alternatively, you can use</span>
+<span class="n">streamsSettings</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="n">StreamsConfig</span><span class="o">.</span><span 
class="na">consumerPrefix</span><span class="o">(</span><span 
class="s">&quot;PARAMETER_NAME&quot;</span><span class="o">),</span> <span 
class="s">&quot;consumer-value&quot;</span><span class="o">);</span>
+<span class="n">streamsSettings</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="n">StremasConfig</span><span class="o">.</span><span 
class="na">producerConfig</span><span class="o">(</span><span 
class="s">&quot;PARAMETER_NAME&quot;</span><span class="o">),</span> <span 
class="s">&quot;producer-value&quot;</span><span class="o">);</span>
+</pre></div>
+          </div>
+        </div>
+        <div class="section" id="default-values">
+          <h4><a class="toc-backref" href="#id18">Default Values</a><a 
class="headerlink" href="#default-values" title="Permalink to this 
headline"></a></h4>
+          <p>Kafka Streams uses different default values for some of the 
underlying client configs, which are summarized below. For detailed descriptions
+            of these configs, see <a class="reference external" 
href="http://kafka.apache.org/0100/documentation.html#producerconfigs";>Producer 
Configs</a>
+            and <a class="reference external" 
href="http://kafka.apache.org/0100/documentation.html#newconsumerconfigs";>Consumer
 Configs</a>.</p>
+          <table border="1" class="non-scrolling-table docutils">
+            <colgroup>
+              <col width="50%" />
+              <col width="19%" />
+              <col width="31%" />
+            </colgroup>
+            <thead valign="bottom">
+            <tr class="row-odd"><th class="head">Parameter Name</th>
+              <th class="head">Corresponding Client</th>
+              <th class="head">Streams Default</th>
+            </tr>
+            </thead>
+            <tbody valign="top">
+            <tr class="row-even"><td>auto.offset.reset</td>
+              <td>Consumer</td>
+              <td>earliest</td>
+            </tr>
+            <tr class="row-odd"><td>enable.auto.commit</td>
+              <td>Consumer</td>
+              <td>false</td>
+            </tr>
+            <tr class="row-even"><td>linger.ms</td>
+              <td>Producer</td>
+              <td>100</td>
+            </tr>
+            <tr class="row-odd"><td>max.poll.interval.ms</td>
+              <td>Consumer</td>
+              <td>Integer.MAX_VALUE</td>
+            </tr>
+            <tr class="row-even"><td>max.poll.records</td>
+              <td>Consumer</td>
+              <td>1000</td>
+            </tr>
+            <tr class="row-odd"><td>retries</td>
+              <td>Producer</td>
+              <td>10</td>
+            </tr>
+            <tr class="row-even"><td>rocksdb.config.setter</td>
+              <td>Consumer</td>
+              <td>&nbsp;</td>
+            </tr>
+            </tbody>
+          </table>
+        </div>
+        <div class="section" id="enable-auto-commit">
+          <span 
id="streams-developer-guide-consumer-auto-commit"></span><h4><a 
class="toc-backref" href="#id19">enable.auto.commit</a><a class="headerlink" 
href="#enable-auto-commit" title="Permalink to this headline"></a></h4>
+          <blockquote>
+            <div>The consumer auto commit. To guarantee at-least-once 
processing semantics and turn off auto commits, Kafka Streams overrides this 
consumer config
+              value to <code class="docutils literal"><span 
class="pre">false</span></code>.  Consumers will only commit explicitly via 
<em>commitSync</em> calls when the Kafka Streams library or a user decides
+              to commit the current processing state.</div></blockquote>
+        </div>
+        <div class="section" id="rocksdb-config-setter">
+          <span id="streams-developer-guide-rocksdb-config"></span><h4><a 
class="toc-backref" href="#id20">rocksdb.config.setter</a><a class="headerlink" 
href="#rocksdb-config-setter" title="Permalink to this headline"></a></h4>
+          <blockquote>
+            <div><p>The RocksDB configuration. Kafka Streams uses RocksDB as 
the default storage engine for persistent stores. To change the default
+              configuration for RocksDB, implement <code class="docutils 
literal"><span class="pre">RocksDBConfigSetter</span></code> and provide your 
custom class via <a class="reference external" 
href="/current/streams/javadocs/org/apache/kafka/streams/state/RocksDBConfigSetter.html">rocksdb.config.setter</a>.</p>
+              <p>Here is an example that adjusts the memory size consumed by 
RocksDB.</p>
+              <div class="highlight-java"><div 
class="highlight"><pre><span></span>    <span class="kd">public</span> <span 
class="kd">static</span> <span class="kd">class</span> <span 
class="nc">CustomRocksDBConfig</span> <span class="kd">implements</span> <span 
class="n">RocksDBConfigSetter</span> <span class="o">{</span>
+
+       <span class="nd">@Override</span>
+       <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">setConfig</span><span class="o">(</span><span 
class="kd">final</span> <span class="n">String</span> <span 
class="n">storeName</span><span class="o">,</span> <span 
class="kd">final</span> <span class="n">Options</span> <span 
class="n">options</span><span class="o">,</span> <span class="kd">final</span> 
<span class="n">Map</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Object</span><span class="o">&gt;</span> <span 
class="n">configs</span><span class="o">)</span> <span class="o">{</span>
+         <span class="c1">// See #1 below.</span>
+         <span class="n">BlockBasedTableConfig</span> <span 
class="n">tableConfig</span> <span class="o">=</span> <span 
class="k">new</span> <span class="n">org</span><span class="o">.</span><span 
class="na">rocksdb</span><span class="o">.</span><span 
class="na">BlockBasedTableConfig</span><span class="o">();</span>
+         <span class="n">tableConfig</span><span class="o">.</span><span 
class="na">setBlockCacheSize</span><span class="o">(</span><span 
class="mi">16</span> <span class="o">*</span> <span class="mi">1024</span> 
<span class="o">*</span> <span class="mi">1024L</span><span class="o">);</span>
+         <span class="c1">// See #2 below.</span>
+         <span class="n">tableConfig</span><span class="o">.</span><span 
class="na">setBlockSize</span><span class="o">(</span><span 
class="mi">16</span> <span class="o">*</span> <span 
class="mi">1024L</span><span class="o">);</span>
+         <span class="c1">// See #3 below.</span>
+         <span class="n">tableConfig</span><span class="o">.</span><span 
class="na">setCacheIndexAndFilterBlocks</span><span class="o">(</span><span 
class="kc">true</span><span class="o">);</span>
+         <span class="n">options</span><span class="o">.</span><span 
class="na">setTableFormatConfig</span><span class="o">(</span><span 
class="n">tableConfig</span><span class="o">);</span>
+         <span class="c1">// See #4 below.</span>
+         <span class="n">options</span><span class="o">.</span><span 
class="na">setMaxWriteBufferNumber</span><span class="o">(</span><span 
class="mi">2</span><span class="o">);</span>
+       <span class="o">}</span>
+    <span class="o">}</span>
+
+<span class="n">Properties</span> <span class="n">streamsSettings</span> <span 
class="o">=</span> <span class="k">new</span> <span 
class="n">Properties</span><span class="o">();</span>
+<span class="n">streamsConfig</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="n">StreamsConfig</span><span class="o">.</span><span 
class="na">ROCKSDB_CONFIG_SETTER_CLASS_CONFIG</span><span class="o">,</span> 
<span class="n">CustomRocksDBConfig</span><span class="o">.</span><span 
class="na">class</span><span class="o">);</span>
+</pre></div>
+              </div>
+              <dl class="docutils">
+                <dt>Notes for example:</dt>
+                <dd><ol class="first last arabic simple">
+                  <li><code class="docutils literal"><span 
class="pre">BlockBasedTableConfig</span> <span class="pre">tableConfig</span> 
<span class="pre">=</span> <span class="pre">new</span> <span 
class="pre">org.rocksdb.BlockBasedTableConfig();</span></code> Reduce block 
cache size from the default, shown <a class="reference external" 
href="https://github.com/apache/kafka/blob/1.0/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L81";>here</a>,
  as the total number of store RocksDB databases is partitions (40) * segments 
(3) = 120.</li>
+                  <li><code class="docutils literal"><span 
class="pre">tableConfig.setBlockSize(16</span> <span class="pre">*</span> <span 
class="pre">1024L);</span></code> Modify the default <a class="reference 
external" 
href="https://github.com/apache/kafka/blob/1.0/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L82";>block
 size</a> per these instructions from the <a class="reference external" 
href="https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#indexes-and-filter-blocks";>RocksDB
 GitHub</a>.</li>
+                  <li><code class="docutils literal"><span 
class="pre">tableConfig.setCacheIndexAndFilterBlocks(true);</span></code> Do 
not let the index and filter blocks grow unbounded. For more information, see 
the <a class="reference external" 
href="https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-and-filter-blocks";>RocksDB
 GitHub</a>.</li>
+                  <li><code class="docutils literal"><span 
class="pre">options.setMaxWriteBufferNumber(2);</span></code> See the advanced 
options in the <a class="reference external" 
href="https://github.com/facebook/rocksdb/blob/8dee8cad9ee6b70fd6e1a5989a8156650a70c04f/include/rocksdb/advanced_options.h#L103";>RocksDB
 GitHub</a>.</li>
+                </ol>
+                </dd>
+              </dl>
+            </div></blockquote>
+        </div>
+      </div>
+      <div class="section" 
id="recommended-configuration-parameters-for-resiliency">
+        <h3><a class="toc-backref" href="#id21">Recommended configuration 
parameters for resiliency</a><a class="headerlink" 
href="#recommended-configuration-parameters-for-resiliency" title="Permalink to 
this headline"></a></h3>
+        <p>There are several Kafka and Kafka Streams configuration options 
that need to be configured explicitly for resiliency in face of broker 
failures:</p>
+        <table border="1" class="non-scrolling-table docutils">
+          <colgroup>
+            <col width="22%" />
+            <col width="19%" />
+            <col width="10%" />
+            <col width="49%" />
+          </colgroup>
+          <thead valign="bottom">
+          <tr class="row-odd"><th class="head">Parameter Name</th>
+            <th class="head">Corresponding Client</th>
+            <th class="head">Default value</th>
+            <th class="head">Consider setting to</th>
+          </tr>
+          </thead>
+          <tbody valign="top">
+          <tr class="row-even"><td>acks</td>
+            <td>Producer</td>
+            <td><code class="docutils literal"><span 
class="pre">acks=1</span></code></td>
+            <td><code class="docutils literal"><span 
class="pre">acks=all</span></code></td>
+          </tr>
+          <tr class="row-odd"><td>replication.factor</td>
+            <td>Streams</td>
+            <td><code class="docutils literal"><span 
class="pre">1</span></code></td>
+            <td><code class="docutils literal"><span 
class="pre">3</span></code></td>
+          </tr>
+          <tr class="row-even"><td>min.insync.replicas</td>
+            <td>Broker</td>
+            <td><code class="docutils literal"><span 
class="pre">1</span></code></td>
+            <td><code class="docutils literal"><span 
class="pre">2</span></code></td>
+          </tr>
+          </tbody>
+        </table>
+        <p>Increasing the replication factor to 3 ensures that the internal 
Kafka Streams topic can tolerate up to 2 broker failures. Changing the acks 
setting to &#8220;all&#8221;
+          guarantees that a record will not be lost as long as one replica is 
alive. The tradeoff from moving to the default values to the recommended ones is
+          that some performance and more storage space (3x with the 
replication factor of 3) are sacrificed for more resiliency.</p>
+        <div class="section" id="acks">
+          <h4><a class="toc-backref" href="#id22">acks</a><a 
class="headerlink" href="#acks" title="Permalink to this headline"></a></h4>
+          <blockquote>
+            <div><p>The number of acknowledgments that the leader must have 
received before considering a request complete. This controls
+              the durability of records that are sent. The possible values 
are:</p>
+              <ul class="simple">
+                <li><code class="docutils literal"><span 
class="pre">acks=0</span></code> The producer does not wait for acknowledgment 
from the server and the record is immediately added to the socket buffer and 
considered sent. No guarantee can be made that the server has received the 
record in this case, and the <code class="docutils literal"><span 
class="pre">retries</span></code> configuration will not take effect (as the 
client won&#8217;t generally know of any failures). The offset returned for 
each record will always be set to <code class="docutils literal"><span 
class="pre">-1</span></code>.</li>
+                <li><code class="docutils literal"><span 
class="pre">acks=1</span></code> The leader writes the record to its local log 
and responds without waiting for full acknowledgement from all followers. If 
the leader immediately fails after acknowledging the record, but before the 
followers have replicated it, then the record will be lost.</li>
+                <li><code class="docutils literal"><span 
class="pre">acks=all</span></code> The leader waits for the full set of in-sync 
replicas to acknowledge the record. This guarantees that the record will not be 
lost if there is at least one in-sync replica alive. This is the strongest 
available guarantee.</li>
+              </ul>
+              <p>For more information, see the <a class="reference external" 
href="https://kafka.apache.org/documentation/#producerconfigs";>Kafka Producer 
documentation</a>.</p>
+            </div></blockquote>
+        </div>
+        <div class="section" id="id2">
+          <h4><a class="toc-backref" href="#id23">replication.factor</a><a 
class="headerlink" href="#id2" title="Permalink to this headline"></a></h4>
+          <blockquote>
+            <div>See the <a class="reference internal" 
href="#replication-factor-parm"><span class="std std-ref">description 
here</span></a>.</div></blockquote>
+          <p>You define these settings via <code class="docutils 
literal"><span class="pre">StreamsConfig</span></code>:</p>
+          <div class="highlight-java"><div 
class="highlight"><pre><span></span><span class="n">Properties</span> <span 
class="n">streamsSettings</span> <span class="o">=</span> <span 
class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
+<span class="n">streamsSettings</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="n">StreamsConfig</span><span class="o">.</span><span 
class="na">REPLICATION_FACTOR_CONFIG</span><span class="o">,</span> <span 
class="mi">3</span><span class="o">);</span>
+<span class="n">streamsSettings</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="n">StreamsConfig</span><span class="o">.</span><span 
class="na">producerPrefix</span><span class="o">(</span><span 
class="n">ProducerConfig</span><span class="o">.</span><span 
class="na">ACKS_CONFIG</span><span class="o">),</span> <span 
class="s">&quot;all&quot;</span><span class="o">);</span>
+</pre></div>
+          </div>
+          <div class="admonition note">
+            <p class="first admonition-title">Note</p>
+            <p class="last">A future version of Kafka Streams will allow 
developers to set their own app-specific configuration settings through
+              <code class="docutils literal"><span 
class="pre">StreamsConfig</span></code> as well, which can then be accessed 
through
+              <a class="reference external" 
href="/4.0.0/streams/javadocs/org/apache/kafka/streams/processor/ProcessorContext.html">ProcessorContext</a>.</p>
+</div>
+</div>
+</div>
+</div>
+</div>
+
+
+               </div>
+              </div>
+              <div class="pagination">
+                <a 
href="/{{version}}/documentation/streams/developer-guide/write-streams" 
class="pagination__btn pagination__btn__prev">Previous</a>
+                <a 
href="/{{version}}/documentation/streams/developer-guide/dsl-api" 
class="pagination__btn pagination__btn__next">Next</a>
+              </div>
+                </script>
+
+                <!--#include virtual="../../../includes/_header.htm" -->
+                <!--#include virtual="../../../includes/_top.htm" -->
+                    <div class="content documentation documentation--current">
+                    <!--#include virtual="../../../includes/_nav.htm" -->
+                    <div class="right">
+                    <!--#include virtual="../../../includes/_docs_banner.htm" 
-->
+                    <ul class="breadcrumbs">
+                    <li><a href="/documentation">Documentation</a></li>
+                    <li><a href="/documentation/streams">Kafka Streams</a></li>
+                    <li><a 
href="/documentation/streams/developer-guide/">Developer Guide</a></li>
+                </ul>
+                <div class="p-content"></div>
+                    </div>
+                    </div>
+                    <!--#include virtual="../../../includes/_footer.htm" -->
+                    <script>
+                    $(function() {
+                        // Show selected style on nav item
+                        $('.b-nav__streams').addClass('selected');
+
+                        //sticky secondary nav
+                        var $navbar = $(".sub-nav-sticky"),
+                            y_pos = $navbar.offset().top,
+                            height = $navbar.height();
+
+                        $(window).scroll(function() {
+                            var scrollTop = $(window).scrollTop();
+
+                            if (scrollTop > y_pos - height) {
+                                $navbar.addClass("navbar-fixed")
+                            } else if (scrollTop <= y_pos) {
+                                $navbar.removeClass("navbar-fixed")
+                            }
+                        });
+
+                        // Display docs subnav items
+                        
$('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
+                    });
+              </script>
\ No newline at end of file

Reply via email to