This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new 124b5ec KAFKA-9072: Add Topology naming to the dev guide (#7629)
124b5ec is described below
commit 124b5ecb259dbb267dc6dd2a6d0bcec4bfa5b1d0
Author: Bill Bejeck <[email protected]>
AuthorDate: Wed Nov 13 10:54:47 2019 -0500
KAFKA-9072: Add Topology naming to the dev guide (#7629)
Reviewers: Jim Galasyn <[email protected]>, Matthias J. Sax
<[email protected]>, Sophie Blee-Goldman <[email protected]>
---
docs/streams/developer-guide/dsl-api.html | 6 +-
.../developer-guide/dsl-topology-naming.html | 370 +++++++++++++++++++++
2 files changed, 375 insertions(+), 1 deletion(-)
diff --git a/docs/streams/developer-guide/dsl-api.html
b/docs/streams/developer-guide/dsl-api.html
index 5be029e..e54d449 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -67,6 +67,7 @@
<li><a class="reference internal"
href="#applying-processors-and-transformers-processor-api-integration"
id="id24">Applying processors and transformers (Processor API
integration)</a></li>
</ul>
</li>
+ <li><a class="reference internal" href="#naming-a-streams-app"
id="id33">Naming Operators in a Streams DSL application</a></li>
<li><a class="reference internal"
href="#controlling-emit-rate" id="id32">Controlling KTable update rate</a></li>
<li><a class="reference internal"
href="#writing-streams-back-to-kafka" id="id25">Writing streams back to
Kafka</a></li>
<li><a class="reference internal"
href="#testing-a-streams-app" id="id26">Testing a Streams application</a></li>
@@ -3515,7 +3516,10 @@ grouped
</div>
</div>
</div>
-
+ <div class="section" id="naming-a-streams-app">
+ <a class="headerlink" href="#naming-a-streams-app"
title="Permalink to this headline"><h2><a class="toc-backref"
href="#id33">Naming Operators in a Streams DSL application</a></h2></a>
+ Kafka Streams allows you to <a class="reference internal"
href="dsl-topology-naming.html">name processors</a> created via the Streams DSL
+ </div>
<div class="section" id="controlling-emit-rate">
<span id="streams-developer-guide-dsl-suppression"></span><h2><a
class="toc-backref" href="#id32">Controlling KTable emit rate</a><a
class="headerlink" href="#controlling-emit-rate" title="Permalink to this
headline"></a></h2>
<p>A KTable is logically a continuously updated table.
diff --git a/docs/streams/developer-guide/dsl-topology-naming.html
b/docs/streams/developer-guide/dsl-topology-naming.html
new file mode 100644
index 0000000..e0c1e1f
--- /dev/null
+++ b/docs/streams/developer-guide/dsl-topology-naming.html
@@ -0,0 +1,370 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<script><!--#include virtual="../../js/templateData.js" --></script>
+
+<script id="content-template" type="text/x-handlebars-template">
+ <!-- h1>Developer Guide for Kafka Streams</h1 -->
+ <div class="sub-nav-sticky">
+ <div class="sticky-top">
+ <!-- div style="height:35px">
+ <a href="/{{version}}/documentation/streams/">Introduction</a>
+ <a class="active-menu-item"
href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a>
+ <a
href="/{{version}}/documentation/streams/core-concepts">Concepts</a>
+ <a href="/{{version}}/documentation/streams/quickstart">Run Demo
App</a>
+ <a href="/{{version}}/documentation/streams/tutorial">Tutorial:
Write App</a>
+ </div -->
+ </div>
+ </div>
+
+ <div class="section" id="naming">
+ <span id="streams-developer-guide-dsl-topology-naming"></span>
+ <h1>Naming Operators in a Kafka Streams DSL Application<a
class="headerlink" href="#naming" title="Permalink to this headline"></a></h1>
+
+ <p>
+ You now can give names to processors when using the Kafka
Streams DSL.
+ In the PAPI there are <code>Processors</code> and
<code>State Stores</code> and
+ you are required to explicitly name each one.
+ </p>
+ <p>
+ At the DLS layer, there are operators. A single DSL
operator may
+ compile down to multiple <code>Processors</code> and
<code>State Stores</code>, and
+ if required <code>repartition topics</code>. But with the
Kafka Streams
+ DSL, all these names are generated for you. There is a
relationship between
+ the generated processor name state store names (hence
changelog topic names) and repartition
+ topic names. Note, that the names of state stores and
changelog/repartition topics
+ are "stateful" while processor names are "stateless".
+ </p>
+ <p>
+ This distinction
+ of stateful vs. stateless names has important implications
when updating your topology.
+ While the internal naming makes creating
+ a topology with the DSL much more straightforward,
+ there are a couple of trade-offs. The first trade-off is
what we could
+ consider a readability issue. The other
+ more severe trade-off is the shifting of names due to the
relationship between the
+ DSL operator and the generated <code>Processors</code>,
<code>State Stores</code> changelog
+ topics and repartition topics.
+ </p>
+
+
+ <h2>Readability Issues</h2>
+
+ <p>
+ By saying there is a readability trade-off, we are
referring to viewing a description of the topology.
+ When you render the string description of your topology
via the <code>Topology#desribe()</code>
+ method, you can see what the processor is, but you
don't have any context for its business purpose.
+ For example, consider the following simple topology:
+
+ <br/>
+ <pre>
+ KStream<String,String> stream = builder.stream("input");
+ stream.filter((k,v) -> !v.equals("invalid_txn"))
+ .mapValues((v) -> v.substring(0,5))
+ .to("output")
+ </pre>
+
+ </p>
+
+ <p>
+ Running <code>Topology#describe()</code> yields this string:
+
+ <pre>
+ Topologies:
+ Sub-topology: 0
+ Source: KSTREAM-SOURCE-0000000000 (topics: [input])
+ --> KSTREAM-FILTER-0000000001
+ Processor: KSTREAM-FILTER-0000000001 (stores: [])
+ --> KSTREAM-MAPVALUES-0000000002
+ <-- KSTREAM-SOURCE-0000000000
+ Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])
+ --> KSTREAM-SINK-0000000003
+ <-- KSTREAM-FILTER-0000000001
+ Sink: KSTREAM-SINK-0000000003 (topic: output)
+ <-- KSTREAM-MAPVALUES-0000000002
+ </pre>
+
+ From this report, you can see what the different operators
are, but what is the broader context here?
+ For example, consider <code>KSTREAM-FILTER-0000000001</code>,
we can see that it's a
+ filter operation, which means that records are dropped that
don't match the given predicate. But what is
+ the meaning of the predicate? Additionally, you can see the
topic names of the source and sink nodes,
+ but what if the topics aren't named in a meaningful way? Then
you're left to guess the
+ business purpose behind these topics.
+ </p>
+ <p>
+ Also notice the numbering here: the source node is suffixed
with <code>0000000000</code>
+ indicating it's the first processor in the topology.
+ The filter is suffixed with <code>0000000001</code>,
indicating it's the second processor in
+ the topology. In Kafka Streams, there are now overloaded
methods for
+ both <code>KStream</code> and <code>KTable</code> that accept
+ a new parameter <code>Named</code>. By using the
<code>Named</code> class DSL users can
+ provide meaningful names to the processors in their topology.
+ </p>
+ <p>
+ Now let's take a look at your topology with all the processors
named:
+ <pre>
+ KStream<String,String> stream =
+ builder.stream("input",
Consumed.as("Customer_transactions_input_topic"));
+ stream.filter((k,v) -> !v.equals("invalid_txn"),
Named.as("filter_out_invalid_txns"))
+ .mapValues((v) -> v.substring(0,5),
Named.as("Map_values_to_first_6_characters"))
+ .to("output",
Produced.as("Mapped_transactions_output_topic"));
+ </pre>
+
+ <pre>
+ Topologies:
+ Sub-topology: 0
+ Source: Customer_transactions_input_topic (topics: [input])
+ --> filter_out_invalid_txns
+ Processor: filter_out_invalid_txns (stores: [])
+ --> Map_values_to_first_6_characters
+ <-- Customer_transactions_input_topic
+ Processor: Map_values_to_first_6_characters (stores: [])
+ --> Mapped_transactions_output_topic
+ <-- filter_out_invalid_txns
+ Sink: Mapped_transactions_output_topic (topic: output)
+ <-- Map_values_to_first_6_characters
+ </pre>
+
+ Now you can look at the topology description and easily
understand what role each processor
+ plays in the topology. But there's another reason for naming
your processor nodes when you
+ have stateful operators that remain between restarts of your
Kafka Streams applications,
+ state stores, changelog topics, and repartition topics.
+ </p>
+
+ <h2>Changing Names</h2>
+ <p>
+ Generated names are numbered where they are built in the
topology.
+ The name generation strategy is
+ <code>KSTREAM|KTABLE->operator name<->number
suffix<</code>. The number is a
+ globally incrementing number that represents the operator's
order in the topology.
+ The generated number is prefixed with a varying number of "0"s
to create a
+ string that is consistently 10 characters long.
+ This means that if you add/remove or shift the order of
operations, the position of the
+ processor shifts, which shifts the name of the processor.
Since <strong>most</strong> processors exist
+ in memory only, this name shifting presents no issue for many
topologies. But the name
+ shifting does have implications for topologies with stateful
operators or repartition topics.
+
+ Here's a different topology with some state:
+ <pre>
+ KStream<String,String> stream = builder.stream("input");
+ stream.groupByKey()
+ .count()
+ .toStream()
+ .to("output");
+ </pre>
+ This topology description yields the following:
+ <pre>
+ Topologies:
+ Sub-topology: 0
+ Source: KSTREAM-SOURCE-0000000000 (topics: [input])
+ --> KSTREAM-AGGREGATE-0000000002
+ Processor: KSTREAM-AGGREGATE-0000000002 (stores:
[KSTREAM-AGGREGATE-STATE-STORE-0000000001])
+ --> KTABLE-TOSTREAM-0000000003
+ <-- KSTREAM-SOURCE-0000000000
+ Processor: KTABLE-TOSTREAM-0000000003 (stores: [])
+ --> KSTREAM-SINK-0000000004
+ <-- KSTREAM-AGGREGATE-0000000002
+ Sink: KSTREAM-SINK-0000000004 (topic: output)
+ <-- KTABLE-TOSTREAM-0000000003
+ </pre>
+ </p>
+ <p>
+ You can see from the topology description above that the
state store is named
+ <code>KSTREAM-AGGREGATE-STATE-STORE-0000000002</code>.
Here's what happens when you
+ add a filter to keep some of the records out of the
aggregation:
+ <pre>
+ KStream<String,String> stream =
builder.stream("input");
+ stream.filter((k,v)-> v !=null && v.length() >= 6 )
+ .groupByKey()
+ .count()
+ .toStream()
+ .to("output");
+ </pre>
+
+ And the corresponding topology:
+ <pre>
+ Topologies:
+ Sub-topology: 0
+ Source: KSTREAM-SOURCE-0000000000 (topics: [input])
+ --> KSTREAM-FILTER-0000000001
+ Processor: KSTREAM-FILTER-0000000001 (stores: [])
+ --> KSTREAM-AGGREGATE-0000000003
+ <-- KSTREAM-SOURCE-0000000000
+ Processor: KSTREAM-AGGREGATE-0000000003 (stores:
[KSTREAM-AGGREGATE-STATE-STORE-0000000002])
+ --> KTABLE-TOSTREAM-0000000004
+ <-- KSTREAM-FILTER-0000000001
+ Processor: KTABLE-TOSTREAM-0000000004 (stores: [])
+ --> KSTREAM-SINK-0000000005
+ <-- KSTREAM-AGGREGATE-0000000003
+ Sink: KSTREAM-SINK-0000000005 (topic: output)
+ <-- KTABLE-TOSTREAM-0000000004
+ </pre>
+ </p>
+ <p>
+ Notice that since you've added an operation <em>before</em> the
<code>count</code> operation, the state
+ store (and the changelog topic) names have changed. This
name change means you can't
+ do a rolling re-deployment of your updated topology. Also,
you must use the
+ <a
href="/{{version}}/documentation/streams/developer-guide/app-reset-tool">Streams
Reset Tool</a>
+ to re-calculate the aggregations, because the changelog topic
has changed on start-up and the
+ new changelog topic contains no data.
+
+ Fortunately, there's an easy solution to remedy this
situation. Give the
+ state store a user-defined name instead of relying on the
generated one,
+ so you don't have to worry about topology changes shifting
the name of the state store.
+
+ You've had the ability to name repartition topics with the
<code>Joined</code>,
+ <code>StreamJoined</code>, and<code>Grouped</code> classes,
and
+ name state store and changelog topics with
<code>Materialized</code>.
+ But it's worth reiterating the importance of naming these DSL
topology operations again.
+
+ Here's how your DSL code looks now giving a specific name to
your state store:
+ <pre>
+ KStream<String,String> stream = builder.stream("input");
+ stream.filter((k, v) -> v != null && v.length() >= 6)
+ .groupByKey()
+ .count(Materialized.as("Purchase_count_store"))
+ .toStream()
+ .to("output");
+ </pre>
+
+ And here's the topology
+
+ <pre>
+ Topologies:
+ Sub-topology: 0
+ Source: KSTREAM-SOURCE-0000000000 (topics: [input])
+ --> KSTREAM-FILTER-0000000001
+ Processor: KSTREAM-FILTER-0000000001 (stores: [])
+ --> KSTREAM-AGGREGATE-0000000002
+ <-- KSTREAM-SOURCE-0000000000
+ Processor: KSTREAM-AGGREGATE-0000000002 (stores:
[Purchase_count_store])
+ --> KTABLE-TOSTREAM-0000000003
+ <-- KSTREAM-FILTER-0000000001
+ Processor: KTABLE-TOSTREAM-0000000003 (stores: [])
+ --> KSTREAM-SINK-0000000004
+ <-- KSTREAM-AGGREGATE-0000000002
+ Sink: KSTREAM-SINK-0000000004 (topic: output)
+ <-- KTABLE-TOSTREAM-0000000003
+ </pre>
+ </p>
+ <p>
+ Now, even though you've added processors before your state
store, the store name and its changelog
+ topic names don't change. This makes your topology more
robust and resilient to changes made by
+ adding or removing processors.
+ </p>
+
+ <h2>Conclusion</h2>
+
+ It's a good practice to name your processing nodes when using
the DSL, and it's even
+ more important to do this when you have "stateful" processors
+ your application such as repartition
+ topics and state stores (and the accompanying changelog
topics).
+ <p>
+ Here are a couple of points to remember when naming your DSL
topology:
+ <ol>
+ <li>
+ If you have an <em>existing topology</em> and
you <em>haven't</em> named your
+ state stores (and changelog topics) and
repartition topics, we recommended that you
+ do so. But this will be a topology breaking
change, so you'll need to shut down all
+ application instances, make the changes, and
run the
+ <a
href="/{{version}}/documentation/streams/developer-guide/app-reset-tool">Streams
Reset Tool</a>.
+ Although this may be inconvenient at first,
it's worth the effort to protect your application from
+ unexpected errors due to topology changes.
+ </li>
+ <li>
+ If you have a <em>new topology</em>, make sure
you name the persistent parts of your topology:
+ state stores (changelog topics) and repartition
topics. This way, when you deploy your
+ application, you're protected from topology
changes that otherwise would break your Kafka Streams application.
+ If you don't want to add names to stateless
processors at first, that's fine as you can
+ always go back and add the names later.
+ </li>
+ </ol>
+
+ Here's a quick reference on naming the critical parts of
+ your Kafka Streams application to prevent topology name
changes from breaking your application:
+
+ <table>
+ <tr>
+ <th>Operation</th><th>Naming Class</th>
+ </tr>
+ <tr>
+ <td>Aggregation repartition
topics</td><td>Grouped</td>
+ </tr>
+ <tr>
+ <td>KStream-KStream Join repartition
topics</td><td>StreamJoined</td>
+ </tr>
+ <tr>
+ <td>KStream-KTable Join repartition
topic</td><td>Joined</td>
+ </tr>
+ <tr>
+ <td>KStream-KStream Join state
stores</td><td>StreamJoined</td>
+ </tr>
+ <tr>
+ <td>State Stores (for aggregations and KTable-KTable
joins)</td><td>Materialized</td>
+ </tr>
+ <tr>
+ <td>Stream/Table non-stateful
operations</td><td>Named</td>
+ </tr>
+ </table>
+ </p>
+</div>
+
+</script>
+
+<!--#include virtual="../../../includes/_header.htm" -->
+<!--#include virtual="../../../includes/_top.htm" -->
+<div class="content documentation documentation--current">
+ <!--#include virtual="../../../includes/_nav.htm" -->
+ <div class="right">
+ <!--#include virtual="../../../includes/_docs_banner.htm" -->
+ <ul class="breadcrumbs">
+ <li><a href="/documentation">Documentation</a></li>
+ <li><a href="/documentation/streams">Kafka
Streams</a></li>
+ <li><a
href="/documentation/streams/developer-guide/">Developer Guide</a></li>
+ </ul>
+ <div class="p-content"></div>
+ </div>
+</div>
+<!--#include virtual="../../../includes/_footer.htm" -->
+<script>
+ $(function () {
+ // Show selected style on nav item
+ $('.b-nav__streams').addClass('selected');
+
+ //sticky secondary nav
+ var $navbar = $(".sub-nav-sticky"),
+ y_pos = $navbar.offset().top,
+ height = $navbar.height();
+
+ $(window).scroll(function () {
+ var scrollTop = $(window).scrollTop();
+
+ if (scrollTop > y_pos - height) {
+ $navbar.addClass("navbar-fixed")
+ } else if (scrollTop <= y_pos) {
+ $navbar.removeClass("navbar-fixed")
+ }
+ });
+
+ // Display docs subnav items
+
$('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
+ });
+</script>
+
+
+
+