[ 
https://issues.apache.org/jira/browse/KAFKA-3625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16357609#comment-16357609
 ] 

ASF GitHub Bot commented on KAFKA-3625:
---------------------------------------

mjsax closed pull request #4493: KAFKA-3625: add docs for Kafka Streams 
test-utils (follow up)
URL: https://github.com/apache/kafka/pull/4493
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/documentation/streams/developer-guide/testing.html 
b/docs/documentation/streams/developer-guide/testing.html
new file mode 100644
index 00000000000..4753e66ad1c
--- /dev/null
+++ b/docs/documentation/streams/developer-guide/testing.html
@@ -0,0 +1,19 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- should always link the latest release's documentation -->
+<!--#include virtual="../../../streams/developer-guide/testing.html" -->
diff --git a/docs/streams/developer-guide/datatypes.html 
b/docs/streams/developer-guide/datatypes.html
index a86ea3e5790..51bd5857eec 100644
--- a/docs/streams/developer-guide/datatypes.html
+++ b/docs/streams/developer-guide/datatypes.html
@@ -178,7 +178,7 @@ <h3>JSON<a class="headerlink" href="#json" title="Permalink 
to this headline"></
               </div>
   <div class="pagination">
     <a href="/{{version}}/documentation/streams/developer-guide/processor-api" 
class="pagination__btn pagination__btn__prev">Previous</a>
-    <a 
href="/{{version}}/documentation/streams/developer-guide/interactive-queries" 
class="pagination__btn pagination__btn__next">Next</a>
+    <a href="/{{version}}/documentation/streams/developer-guide/testing" 
class="pagination__btn pagination__btn__next">Next</a>
   </div>
 </script>
 
diff --git a/docs/streams/developer-guide/index.html 
b/docs/streams/developer-guide/index.html
index 443ad7d90bd..095ec829aa1 100644
--- a/docs/streams/developer-guide/index.html
+++ b/docs/streams/developer-guide/index.html
@@ -44,6 +44,7 @@ <h1>Developer Guide for Kafka Streams</h1>
 <li class="toctree-l1"><a class="reference internal" 
href="dsl-api.html">Streams DSL</a></li>
 <li class="toctree-l1"><a class="reference internal" 
href="processor-api.html">Processor API</a></li>
 <li class="toctree-l1"><a class="reference internal" 
href="datatypes.html">Data Types and Serialization</a></li>
+<li class="toctree-l1"><a class="reference internal" 
href="testing.html">Testing a Streams Application</a></li>
 <li class="toctree-l1"><a class="reference internal" 
href="interactive-queries.html">Interactive Queries</a></li>
 <li class="toctree-l1"><a class="reference internal" 
href="memory-mgmt.html">Memory Management</a></li>
 <li class="toctree-l1"><a class="reference internal" 
href="running-app.html">Running Streams Applications</a></li>
diff --git a/docs/streams/developer-guide/interactive-queries.html 
b/docs/streams/developer-guide/interactive-queries.html
index 0a37d5668cb..e4ba4db8b8e 100644
--- a/docs/streams/developer-guide/interactive-queries.html
+++ b/docs/streams/developer-guide/interactive-queries.html
@@ -484,7 +484,7 @@
                </div>
               </div>
               <div class="pagination">
-                <a 
href="/{{version}}/documentation/streams/developer-guide/datatypes" 
class="pagination__btn pagination__btn__prev">Previous</a>
+                <a 
href="/{{version}}/documentation/streams/developer-guide/testing" 
class="pagination__btn pagination__btn__prev">Previous</a>
                 <a 
href="/{{version}}/documentation/streams/developer-guide/memory-mgmt" 
class="pagination__btn pagination__btn__next">Next</a>
               </div>
                 </script>
diff --git a/docs/streams/developer-guide/testing.html 
b/docs/streams/developer-guide/testing.html
new file mode 100644
index 00000000000..e6886a1689f
--- /dev/null
+++ b/docs/streams/developer-guide/testing.html
@@ -0,0 +1,315 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<script><!--#include virtual="../../js/templateData.js" --></script>
+
+<script id="content-template" type="text/x-handlebars-template">
+  <!-- h1>Developer Guide for Kafka Streams</h1 -->
+  <div class="sub-nav-sticky">
+    <div class="sticky-top">
+      <!-- div style="height:35px">
+        <a href="/{{version}}/documentation/streams/">Introduction</a>
+        <a class="active-menu-item" 
href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a>
+        <a href="/{{version}}/documentation/streams/core-concepts">Concepts</a>
+        <a href="/{{version}}/documentation/streams/quickstart">Run Demo 
App</a>
+        <a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write 
App</a>
+      </div -->
+    </div>
+  </div>
+
+  <div class="section" id="testing">
+    <span id="streams-developer-guide-testing"></span><h1>Testing a Streams 
Application<a class="headerlink" href="#testing" title="Permalink to this 
headline"></a></h1>
+    <p>
+      To test a Kafka Streams application, Kafka provides a test-utils 
artifact that can be added as regular dependency to your test code base.
+      Example <code>pom.xml</code> snippet when using Maven:
+    </p>
+    <pre>
+&lt;dependency&gt;
+    &lt;groupId&gt;org.apache.kafka&lt;/groupId&gt;
+    &lt;artifactId&gt;kafka-streams-test-utils&lt;/artifactId&gt;
+    &lt;version&gt;{{fullDotVersion}}&lt;/version&gt;
+    &lt;scope&gt;test&lt;/scope&gt;
+&lt;/dependency&gt;
+    </pre>
+    <p>
+    The test-utils package provides a <code>TopologyTestDriver</code> that can 
be used pipe data through a <code>Topology</code> that is either assembled 
manually
+    using Processor API or via the DSL using <code>StreamsBuilder</code>.
+    The test driver simulates the library runtime that continuously fetches 
records from input topics and processes them by traversing the topology.
+    You can use the test driver to verify that your specified processor 
topology computes the correct result with the manually piped in data records.
+    The test driver captures the results records and allows to query its 
embedded state stores.
+    <pre>
+// Processor API
+Topology topology = new Topology();
+topology.addSource("sourceProcessor", "input-topic");
+topology.addProcessor("processor", ..., "sourceProcessor");
+topology.addSink("sinkProcessor", "output-topic", "processor");
+// or
+// using DSL
+StreamsBuilder builder = new StreamsBuilder();
+builder.stream("input-topic").filter(...).to("output-topic");
+Topology topology = builder.build();
+
+// setup test driver
+Properties config = new Properties();
+config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
+config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
+TopologyTestDriver testDriver = new TopologyTestDriver(topology, config);
+    </pre>
+    <p>
+    The test driver accepts <code>ConsumerRecord</code>s with key and value 
type <code>byte[]</code>.
+    Because <code>byte[]</code> types can be problematic, you can use the 
<code>ConsumerRecordFactory</code> to generate those records
+    by providing regular Java types for key and values and the corresponding 
serializers.
+    </p>
+    <pre>
+ConsumerRecordFactory&lt;String, Integer&gt; factory = new 
ConsumerRecordFactory&lt;&gt;("input-topic", new StringSerializer(), new 
IntegerSerializer());
+testDriver.pipe(factory.create("key", 42L));
+    </pre>
+    <p>
+    To verify the output, the test driver produces 
<code>ProducerRecord</code>s with key and value type <code>byte[]</code>.
+    For result verification, you can specify corresponding deserializers when 
reading the output record from the driver.
+    <pre>
+ProducerRecord&lt;String, Integer&gt; outputRecord = 
testDriver.readOutput("output-topic", new StringDeserializer(), new 
LongDeserializer());
+    </pre>
+    <p>
+    For result verification, you can use <code>OutputVerifier</code>.
+    It offers helper methods to compare only certain parts of the result 
record:
+    for example, you might only care about the key and value, but not the 
timestamp of the result record.
+    </p>
+    <pre>
+OutputVerifier.compareKeyValue(outputRecord, "key", 42L); // throws 
AssertionError if key or value does not match
+    </pre>
+    <p>
+    <code>TopologyTestDriver</code> supports punctuations, too.
+    Event-time punctuations are triggered automatically based on the processed 
records' timestamps.
+    Wall-clock-time punctuations can also be triggered by advancing the test 
driver's wall-clock-time (the driver mocks wall-clock-time internally to give 
users control over it).
+    </p>
+    <pre>
+testDriver.advanceWallClockTime(20L);
+    </pre>
+    </div>
+    <p>
+    Additionally, you can access state stores via the test driver before or 
after a test.
+    Accessing stores before a test is useful to pre-populate a store with some 
initial values.
+    After data was processed, expected updates to the store can be verified.
+    </p>
+    <pre>
+KeyValueStore store = testDriver.getKeyValueStore("store-name");
+    </pre>
+    <p>
+    Note, that you should always close the test driver at the end to make sure 
all resources are release properly.
+    </p>
+    <pre>
+testDriver.close();
+    </pre>
+
+    <h2>Example</h2>
+    <p>
+    The following example demonstrates how to use the test driver and helper 
classes.
+    The example creates a topology that computes the maximum value per key 
using a key-value-store.
+    While processing, no output is generated, but only the store is updated.
+    Output is only sent downstream based on event-time and wall-clock 
punctuations.
+    </p>
+    <pre>
+private TopologyTestDriver testDriver;
+private KeyValueStore&lt;String, Long&gt; store;
+
+private StringDeserializer stringDeserializer = new StringDeserializer();
+private LongDeserializer longDeserializer = new LongDeserializer();
+private ConsumerRecordFactory&lt;String, Long&gt; recordFactory = new 
ConsumerRecordFactory&lt;&gt;(new StringSerializer(), new LongSerializer());
+
+@Before
+public void setup() {
+    Topology topology = new Topology();
+    topology.addSource("sourceProcessor", "input-topic");
+    topology.addProcessor("aggregator", new CustomMaxAggregatorSupplier(), 
"sourceProcessor");
+    topology.addStateStore(
+        Stores.keyValueStoreBuilder(
+            Stores.inMemoryKeyValueStore("aggStore"),
+            Serdes.String(),
+            Serdes.Long()).withLoggingDisabled(), // need to disable logging 
to allow store pre-populating
+        "aggregator");
+    topology.addSink("sinkProcessor", "result-topic", "aggregator");
+
+    // setup test driver
+    Properties config = new Properties();
+    config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "maxAggregation");
+    config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
+    config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
+    config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass().getName());
+    testDriver = new TopologyTestDriver(topology, config);
+
+    // pre-populate store
+    store = testDriver.getKeyValueStore("aggStore");
+    store.put("a", 21L);
+}
+
+@After
+public void tearDown() {
+    testDriver.close();
+}
+
+@Test
+public void shouldFlushStoreForFirstInput() {
+    testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 9999L));
+    OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer), "a", 21L);
+    Assert.assertNull(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer));
+}
+
+@Test
+public void shouldNotUpdateStoreForSmallerValue() {
+    testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 9999L));
+    Assert.assertThat(store.get("a"), equalTo(21L));
+    OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer), "a", 21L);
+    Assert.assertNull(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer));
+}
+
+@Test
+public void shouldNotUpdateStoreForLargerValue() {
+    testDriver.pipeInput(recordFactory.create("input-topic", "a", 42L, 9999L));
+    Assert.assertThat(store.get("a"), equalTo(42L));
+    OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer), "a", 42L);
+    Assert.assertNull(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer));
+}
+
+@Test
+public void shouldUpdateStoreForNewKey() {
+    testDriver.pipeInput(recordFactory.create("input-topic", "b", 21L, 9999L));
+    Assert.assertThat(store.get("b"), equalTo(21L));
+    OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer), "a", 21L);
+    OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer), "b", 21L);
+    Assert.assertNull(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer));
+}
+
+@Test
+public void shouldPunctuateIfEvenTimeAdvances() {
+    testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 9999L));
+    OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer), "a", 21L);
+
+    testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 9999L));
+    Assert.assertNull(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer));
+
+    testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 10000L));
+    OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer), "a", 21L);
+    Assert.assertNull(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer));
+}
+
+@Test
+public void shouldPunctuateIfWallClockTimeAdvances() {
+    testDriver.advanceWallClockTime(60000);
+    OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer), "a", 21L);
+    Assert.assertNull(testDriver.readOutput("result-topic", 
stringDeserializer, longDeserializer));
+}
+
+public class CustomMaxAggregatorSupplier implements 
ProcessorSupplier&lt;String, Long&gt; {
+    @Override
+    public Processor&lt;String, Long&gt; get() {
+        return new CustomMaxAggregator();
+    }
+}
+
+public class CustomMaxAggregator implements Processor&lt;String, Long&gt; {
+    ProcessorContext context;
+    private KeyValueStore&lt;String, Long&gt; store;
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void init(ProcessorContext context) {
+        this.context = context;
+        context.schedule(60000, PunctuationType.WALL_CLOCK_TIME, new 
Punctuator() {
+            @Override
+            public void punctuate(long timestamp) {
+                flushStore();
+            }
+        });
+        context.schedule(10000, PunctuationType.STREAM_TIME, new Punctuator() {
+            @Override
+            public void punctuate(long timestamp) {
+                flushStore();
+            }
+        });
+        store = (KeyValueStore&lt;String, Long&gt;) 
context.getStateStore("aggStore");
+    }
+
+    @Override
+    public void process(String key, Long value) {
+        Long oldValue = store.get(key);
+        if (oldValue == null || value &gt; oldValue) {
+            store.put(key, value);
+        }
+    }
+
+    private void flushStore() {
+        KeyValueIterator&lt;String, Long&gt; it = store.all();
+        while (it.hasNext()) {
+            KeyValue&lt;String, Long&gt; next = it.next();
+            context.forward(next.key, next.value);
+        }
+    }
+
+    @Override
+    public void punctuate(long timestamp) {} // deprecated; not used
+
+    @Override
+    public void close() {}
+}
+    </pre>
+  <div class="pagination">
+  <div class="pagination">
+    <a href="/{{version}}/documentation/streams/developer-guide/datatypes" 
class="pagination__btn pagination__btn__prev">Previous</a>
+    <a 
href="/{{version}}/documentation/streams/developer-guide/interactive-queries" 
class="pagination__btn pagination__btn__next">Next</a>
+  </div>
+</script>
+
+<!--#include virtual="../../../includes/_header.htm" -->
+<!--#include virtual="../../../includes/_top.htm" -->
+<div class="content documentation documentation--current">
+  <!--#include virtual="../../../includes/_nav.htm" -->
+  <div class="right">
+    <!--#include virtual="../../../includes/_docs_banner.htm" -->
+    <ul class="breadcrumbs">
+      <li><a href="/documentation">Documentation</a></li>
+      <li><a href="/documentation/streams">Kafka Streams</a></li>
+      <li><a href="/documentation/streams/developer-guide/">Developer 
Guide</a></li>
+    </ul>
+    <div class="p-content"></div>
+  </div>
+</div>
+<!--#include virtual="../../../includes/_footer.htm" -->
+<script>
+    $(function() {
+        // Show selected style on nav item
+        $('.b-nav__streams').addClass('selected');
+
+        //sticky secondary nav
+        var $navbar = $(".sub-nav-sticky"),
+            y_pos = $navbar.offset().top,
+            height = $navbar.height();
+
+        $(window).scroll(function() {
+            var scrollTop = $(window).scrollTop();
+
+            if (scrollTop > y_pos - height) {
+                $navbar.addClass("navbar-fixed")
+            } else if (scrollTop <= y_pos) {
+                $navbar.removeClass("navbar-fixed")
+            }
+        });
+
+        // Display docs subnav items
+        
$('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
+    });
+</script>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Move kafka-streams test fixtures into a published package
> ---------------------------------------------------------
>
>                 Key: KAFKA-3625
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3625
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Jeff Klukas
>            Assignee: Matthias J. Sax
>            Priority: Minor
>              Labels: kip, user-experience
>             Fix For: 1.1.0
>
>
> The KStreamTestDriver and related fixtures defined in 
> streams/src/test/java/org/apache/kafka/test would be useful to developers 
> building applications on top of Kafka Streams, but they are not currently 
> exposed in a package.
> I propose moving this directory to live under streams/fixtures/src/main and 
> creating a new 'streams:fixtures' project in the gradle configuration to 
> publish these as a separate package.
> KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to