This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 124b5ec  KAFKA-9072: Add Topology naming to the dev guide (#7629)
124b5ec is described below

commit 124b5ecb259dbb267dc6dd2a6d0bcec4bfa5b1d0
Author: Bill Bejeck <[email protected]>
AuthorDate: Wed Nov 13 10:54:47 2019 -0500

    KAFKA-9072: Add Topology naming to the dev guide (#7629)
    
    Reviewers: Jim Galasyn <[email protected]>, Matthias J. Sax 
<[email protected]>, Sophie Blee-Goldman <[email protected]>
---
 docs/streams/developer-guide/dsl-api.html          |   6 +-
 .../developer-guide/dsl-topology-naming.html       | 370 +++++++++++++++++++++
 2 files changed, 375 insertions(+), 1 deletion(-)

diff --git a/docs/streams/developer-guide/dsl-api.html 
b/docs/streams/developer-guide/dsl-api.html
index 5be029e..e54d449 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -67,6 +67,7 @@
                     <li><a class="reference internal" 
href="#applying-processors-and-transformers-processor-api-integration" 
id="id24">Applying processors and transformers (Processor API 
integration)</a></li>
                 </ul>
                 </li>
+                <li><a class="reference internal" href="#naming-a-streams-app" 
id="id33">Naming Operators in a Streams DSL application</a></li>
                 <li><a class="reference internal" 
href="#controlling-emit-rate" id="id32">Controlling KTable update rate</a></li>
                 <li><a class="reference internal" 
href="#writing-streams-back-to-kafka" id="id25">Writing streams back to 
Kafka</a></li>
                 <li><a class="reference internal" 
href="#testing-a-streams-app" id="id26">Testing a Streams application</a></li>
@@ -3515,7 +3516,10 @@ grouped
                 </div>
             </div>
         </div>
-
+        <div class="section" id="naming-a-streams-app">
+            <a class="headerlink" href="#naming-a-streams-app" 
title="Permalink to this headline"><h2><a class="toc-backref" 
href="#id33">Naming Operators in a Streams DSL application</a></h2></a>
+            Kafka Streams allows you to <a class="reference internal" 
href="dsl-topology-naming.html">name processors</a> created via the Streams DSL
+        </div>
        <div class="section" id="controlling-emit-rate">
             <span id="streams-developer-guide-dsl-suppression"></span><h2><a 
class="toc-backref" href="#id32">Controlling KTable emit rate</a><a 
class="headerlink" href="#controlling-emit-rate" title="Permalink to this 
headline"></a></h2>
            <p>A KTable is logically a continuously updated table.
diff --git a/docs/streams/developer-guide/dsl-topology-naming.html 
b/docs/streams/developer-guide/dsl-topology-naming.html
new file mode 100644
index 0000000..e0c1e1f
--- /dev/null
+++ b/docs/streams/developer-guide/dsl-topology-naming.html
@@ -0,0 +1,370 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<script><!--#include virtual="../../js/templateData.js" --></script>
+
+<script id="content-template" type="text/x-handlebars-template">
+    <!-- h1>Developer Guide for Kafka Streams</h1 -->
+    <div class="sub-nav-sticky">
+        <div class="sticky-top">
+            <!-- div style="height:35px">
+              <a href="/{{version}}/documentation/streams/">Introduction</a>
+              <a class="active-menu-item" 
href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a>
+              <a 
href="/{{version}}/documentation/streams/core-concepts">Concepts</a>
+              <a href="/{{version}}/documentation/streams/quickstart">Run Demo 
App</a>
+              <a href="/{{version}}/documentation/streams/tutorial">Tutorial: 
Write App</a>
+            </div -->
+        </div>
+    </div>
+
+       <div class="section" id="naming">
+        <span id="streams-developer-guide-dsl-topology-naming"></span>
+        <h1>Naming Operators in a Kafka Streams DSL Application<a 
class="headerlink" href="#naming" title="Permalink to this headline"></a></h1>
+
+        <p>
+                  You now can give names to processors when using the Kafka 
Streams DSL.
+                  In the PAPI there are <code>Processors</code> and 
<code>State Stores</code> and
+                  you are required to explicitly name each one.
+           </p>
+               <p>
+                  At the DLS layer, there are operators.  A single DSL 
operator may
+                  compile down to multiple <code>Processors</code> and 
<code>State Stores</code>, and
+                  if required <code>repartition topics</code>. But with the 
Kafka Streams
+                  DSL, all these names are generated for you. There is a 
relationship between
+                  the generated processor name state store names (hence 
changelog topic names) and repartition
+                  topic names. Note, that the names of state stores and 
changelog/repartition topics
+                  are "stateful" while processor names are "stateless".
+           </p>
+               <p>
+                       This distinction
+                  of stateful vs. stateless names has important implications 
when updating your topology.
+                  While the internal naming makes creating
+                  a topology with the DSL much more straightforward,
+                  there are a couple of trade-offs.  The first trade-off is 
what we could
+                  consider a readability issue. The other
+                  more severe trade-off is the shifting of names due to the 
relationship between the
+                  DSL operator and the generated <code>Processors</code>, 
<code>State Stores</code> changelog
+                  topics and repartition topics.
+          </p>
+
+
+               <h2>Readability Issues</h2>
+
+               <p>
+                       By saying there is a readability trade-off, we are 
referring to viewing a description of the topology.
+                       When you render the string description of your topology 
via the <code>Topology#desribe()</code>
+                       method, you can see what the processor is, but you 
don't have any context for its business purpose.
+                       For example, consider the following simple topology:
+
+                       <br/>
+               <pre>
+               KStream&lt;String,String&gt; stream = builder.stream("input");
+               stream.filter((k,v) -> !v.equals("invalid_txn"))
+                     .mapValues((v) -> v.substring(0,5))
+                     .to("output")
+           </pre>
+
+               </p>
+
+               <p>
+               Running <code>Topology#describe()</code> yields this string:
+
+               <pre>
+               Topologies:
+                  Sub-topology: 0
+                   Source: KSTREAM-SOURCE-0000000000 (topics: [input])
+                     --> KSTREAM-FILTER-0000000001
+                   Processor: KSTREAM-FILTER-0000000001 (stores: [])
+                     --> KSTREAM-MAPVALUES-0000000002
+                     <-- KSTREAM-SOURCE-0000000000
+                   Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])
+                     --> KSTREAM-SINK-0000000003
+                     <-- KSTREAM-FILTER-0000000001
+                   Sink: KSTREAM-SINK-0000000003 (topic: output)
+                     <-- KSTREAM-MAPVALUES-0000000002
+                </pre>
+
+                From this report, you can see what the different operators 
are, but what is the broader context here?
+                For example, consider <code>KSTREAM-FILTER-0000000001</code>, 
we can see that it's a
+                filter operation, which means that records are dropped that 
don't match the given predicate.  But what is
+                the meaning of the predicate?  Additionally, you can see the 
topic names of the source and sink nodes,
+                but what if the topics aren't named in a meaningful way?  Then 
you're left to guess the
+                business purpose behind these topics.
+               </p>
+               <p>
+                Also notice the numbering here: the source node is suffixed 
with <code>0000000000</code>
+                indicating it's the first processor in the topology.  
+                The filter is suffixed with <code>0000000001</code>, 
indicating it's the second processor in
+                the topology.   In Kafka Streams, there are now overloaded 
methods for
+                both <code>KStream</code> and <code>KTable</code> that accept
+                a new parameter <code>Named</code>. By using the 
<code>Named</code> class DSL users can
+                provide meaningful names to the processors in their topology.
+               </p>
+               <p>
+                Now let's take a look at your topology with all the processors 
named:
+               <pre>
+               KStream&lt;String,String&gt; stream =
+               builder.stream("input", 
Consumed.as("Customer_transactions_input_topic"));
+               stream.filter((k,v) -> !v.equals("invalid_txn"), 
Named.as("filter_out_invalid_txns"))
+                     .mapValues((v) -> v.substring(0,5), 
Named.as("Map_values_to_first_6_characters"))
+                     .to("output", 
Produced.as("Mapped_transactions_output_topic"));
+            </pre>
+
+                <pre>
+                Topologies:
+                  Sub-topology: 0
+                   Source: Customer_transactions_input_topic (topics: [input])
+                     --> filter_out_invalid_txns
+                   Processor: filter_out_invalid_txns (stores: [])
+                     --> Map_values_to_first_6_characters
+                     <-- Customer_transactions_input_topic
+                   Processor: Map_values_to_first_6_characters (stores: [])
+                     --> Mapped_transactions_output_topic
+                     <-- filter_out_invalid_txns
+                   Sink: Mapped_transactions_output_topic (topic: output)
+                     <-- Map_values_to_first_6_characters
+                </pre>
+
+               Now you can look at the topology description and easily 
understand what role each processor
+               plays in the topology. But there's another reason for naming 
your processor nodes when you
+               have stateful operators that remain between restarts of your 
Kafka Streams applications, 
+               state stores, changelog topics, and repartition topics.
+               </p>
+
+               <h2>Changing Names</h2>
+               <p>
+                Generated names are numbered where they are built in the 
topology.
+                The name generation strategy is
+                <code>KSTREAM|KTABLE-&gt;operator name&lt;-&gt;number 
suffix&lt;</code>. The number is a
+                globally incrementing number that represents the operator's 
order in the topology.
+                The generated number is prefixed with a varying number of "0"s 
to create a
+                string that is consistently 10 characters long.
+                This means that if you add/remove or shift the order of 
operations, the position of the
+                processor shifts, which shifts the name of the processor.  
Since <strong>most</strong> processors exist
+                in memory only, this name shifting presents no issue for many 
topologies.  But the name
+                shifting does have implications for topologies with stateful 
operators or repartition topics. 
+
+                Here's a different topology with some state:
+                <pre>
+                KStream&lt;String,String&gt; stream = builder.stream("input");
+                stream.groupByKey()
+                      .count()
+                      .toStream()
+                      .to("output");
+                </pre>
+               This topology description yields the following:
+                <pre>
+                        Topologies:
+                          Sub-topology: 0
+                           Source: KSTREAM-SOURCE-0000000000 (topics: [input])
+                            --> KSTREAM-AGGREGATE-0000000002
+                           Processor: KSTREAM-AGGREGATE-0000000002 (stores: 
[KSTREAM-AGGREGATE-STATE-STORE-0000000001])
+                            --> KTABLE-TOSTREAM-0000000003
+                            <-- KSTREAM-SOURCE-0000000000
+                           Processor: KTABLE-TOSTREAM-0000000003 (stores: [])
+                            --> KSTREAM-SINK-0000000004
+                            <-- KSTREAM-AGGREGATE-0000000002
+                           Sink: KSTREAM-SINK-0000000004 (topic: output)
+                            <-- KTABLE-TOSTREAM-0000000003
+                </pre>
+                </p>
+                <p>
+                 You can see from the topology description above that the 
state store is named
+                 <code>KSTREAM-AGGREGATE-STATE-STORE-0000000002</code>.  
Here's what happens when you
+                 add a filter to keep some of the records out of the 
aggregation:
+                 <pre>
+                  KStream&lt;String,String&gt; stream = 
builder.stream("input");
+                  stream.filter((k,v)-> v !=null && v.length() >= 6 )
+                        .groupByKey()
+                        .count()
+                        .toStream()
+                        .to("output");
+                 </pre>
+
+                 And the corresponding topology:
+                 <pre>
+                         Topologies:
+                           Sub-topology: 0
+                            Source: KSTREAM-SOURCE-0000000000 (topics: [input])
+                             --> KSTREAM-FILTER-0000000001
+                            Processor: KSTREAM-FILTER-0000000001 (stores: [])
+                              --> KSTREAM-AGGREGATE-0000000003
+                              <-- KSTREAM-SOURCE-0000000000
+                            Processor: KSTREAM-AGGREGATE-0000000003 (stores: 
[KSTREAM-AGGREGATE-STATE-STORE-0000000002])
+                              --> KTABLE-TOSTREAM-0000000004
+                              <-- KSTREAM-FILTER-0000000001
+                            Processor: KTABLE-TOSTREAM-0000000004 (stores: [])
+                              --> KSTREAM-SINK-0000000005
+                              <-- KSTREAM-AGGREGATE-0000000003
+                             Sink: KSTREAM-SINK-0000000005 (topic: output)
+                              <-- KTABLE-TOSTREAM-0000000004
+                 </pre>
+                </p>
+               <p>
+               Notice that since you've added an operation <em>before</em> the 
<code>count</code> operation, the state
+                 store (and the changelog topic) names have changed.  This 
name change means you can't
+                 do a rolling re-deployment of your updated topology.  Also, 
you must use the
+                 <a 
href="/{{version}}/documentation/streams/developer-guide/app-reset-tool">Streams
 Reset Tool</a> 
+                 to re-calculate the aggregations, because the changelog topic 
has changed on start-up and the
+                 new changelog topic contains no data.
+
+                 Fortunately, there's an easy solution to remedy this 
situation.  Give the
+                 state store a user-defined name instead of relying on the 
generated one,
+                 so you don't have to worry about topology changes shifting 
the name of the state store.
+
+                 You've had the ability to name repartition topics with the 
<code>Joined</code>,
+                 <code>StreamJoined</code>, and<code>Grouped</code> classes, 
and
+                 name state store and changelog topics with 
<code>Materialized</code>.
+                 But it's worth reiterating the importance of naming these DSL 
topology operations again.
+
+                 Here's how your DSL code looks now giving a specific name to 
your state store:
+                 <pre>
+                 KStream&lt;String,String&gt; stream = builder.stream("input");
+                 stream.filter((k, v) -> v != null && v.length() >= 6)
+                       .groupByKey()
+                       .count(Materialized.as("Purchase_count_store"))
+                       .toStream()
+                       .to("output");
+                 </pre>
+
+                 And here's the topology 
+
+                 <pre>
+                 Topologies:
+                  Sub-topology: 0
+                   Source: KSTREAM-SOURCE-0000000000 (topics: [input])
+                     --> KSTREAM-FILTER-0000000001
+                   Processor: KSTREAM-FILTER-0000000001 (stores: [])
+                     --> KSTREAM-AGGREGATE-0000000002
+                     <-- KSTREAM-SOURCE-0000000000
+                   Processor: KSTREAM-AGGREGATE-0000000002 (stores: 
[Purchase_count_store])
+                     --> KTABLE-TOSTREAM-0000000003
+                     <-- KSTREAM-FILTER-0000000001
+                   Processor: KTABLE-TOSTREAM-0000000003 (stores: [])
+                     --> KSTREAM-SINK-0000000004
+                     <-- KSTREAM-AGGREGATE-0000000002
+                   Sink: KSTREAM-SINK-0000000004 (topic: output)
+                     <-- KTABLE-TOSTREAM-0000000003
+                 </pre>
+               </p>
+               <p>
+                 Now, even though you've added processors before your state 
store, the store name and its changelog
+                 topic names don't change.  This makes your topology more 
robust and resilient to changes made by
+                 adding or removing processors.
+               </p>
+
+               <h2>Conclusion</h2>
+
+                It's a good practice to name your processing nodes when using 
the DSL, and it's even
+                more important to do this when you have "stateful" processors
+               your application such as repartition
+                topics and state stores (and the accompanying changelog 
topics).
+        <p>
+                Here are a couple of points to remember when naming your DSL 
topology:
+               <ol>
+                       <li>
+                               If you have an <em>existing topology</em> and 
you <em>haven't</em> named your
+                               state stores (and changelog topics) and 
repartition topics, we recommended that you
+                               do so.  But this will be a topology breaking 
change, so you'll need to shut down all
+                               application instances, make the changes, and 
run the
+                               <a 
href="/{{version}}/documentation/streams/developer-guide/app-reset-tool">Streams
 Reset Tool</a>.
+                               Although this may be inconvenient at first, 
it's worth the effort to protect your application from
+                               unexpected errors due to topology changes.
+                       </li>
+                       <li>
+                               If you have a <em>new topology</em>, make sure 
you name the persistent parts of your topology:
+                               state stores (changelog topics) and repartition 
topics. This way, when you deploy your
+                               application, you're protected from topology 
changes that otherwise would break your Kafka Streams application.
+                               If you don't want to add names to stateless 
processors at first, that's fine as you can
+                               always go back and add the names later.
+                       </li>
+               </ol>
+
+                 Here's a quick reference on naming the critical parts of
+                 your Kafka Streams application to prevent topology name 
changes from breaking your application:
+
+                 <table>
+                     <tr>
+                         <th>Operation</th><th>Naming Class</th>
+                     </tr>
+                     <tr>
+                         <td>Aggregation repartition 
topics</td><td>Grouped</td>
+                     </tr>
+                     <tr>
+                         <td>KStream-KStream Join repartition 
topics</td><td>StreamJoined</td>
+                     </tr>
+                         <tr>
+                                 <td>KStream-KTable Join repartition 
topic</td><td>Joined</td>
+                         </tr>
+                     <tr>
+                         <td>KStream-KStream Join state 
stores</td><td>StreamJoined</td>
+                     </tr>
+                     <tr>
+                         <td>State Stores (for aggregations and KTable-KTable 
joins)</td><td>Materialized</td>
+                     </tr>
+                     <tr>
+                         <td>Stream/Table non-stateful 
operations</td><td>Named</td>
+                     </tr>
+                 </table>
+       </p>
+</div>
+
+</script>
+
+<!--#include virtual="../../../includes/_header.htm" -->
+<!--#include virtual="../../../includes/_top.htm" -->
+<div class="content documentation documentation--current">
+       <!--#include virtual="../../../includes/_nav.htm" -->
+       <div class="right">
+               <!--#include virtual="../../../includes/_docs_banner.htm" -->
+               <ul class="breadcrumbs">
+                       <li><a href="/documentation">Documentation</a></li>
+                       <li><a href="/documentation/streams">Kafka 
Streams</a></li>
+                       <li><a 
href="/documentation/streams/developer-guide/">Developer Guide</a></li>
+               </ul>
+               <div class="p-content"></div>
+       </div>
+</div>
+<!--#include virtual="../../../includes/_footer.htm" -->
+<script>
+       $(function () {
+               // Show selected style on nav item
+               $('.b-nav__streams').addClass('selected');
+
+               //sticky secondary nav
+               var $navbar = $(".sub-nav-sticky"),
+                               y_pos = $navbar.offset().top,
+                               height = $navbar.height();
+
+               $(window).scroll(function () {
+                       var scrollTop = $(window).scrollTop();
+
+                       if (scrollTop > y_pos - height) {
+                               $navbar.addClass("navbar-fixed")
+                       } else if (scrollTop <= y_pos) {
+                               $navbar.removeClass("navbar-fixed")
+                       }
+               });
+
+               // Display docs subnav items
+               
$('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
+       });
+</script>
+
+
+
+

Reply via email to