This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 06f306a KAFKA-8410: Update the docs to reference the new PAPI (#10994)
06f306a is described below
commit 06f306a8a8910f2ce0b2656f666d9537593a0898
Author: John Roesler <[email protected]>
AuthorDate: Tue Jul 13 10:23:50 2021 -0500
KAFKA-8410: Update the docs to reference the new PAPI (#10994)
Reviewers: Jim Galasyn <[email protected]>, Luke Chen
<[email protected]>, Matthias J. Sax <[email protected]>
---
docs/streams/developer-guide/dsl-api.html | 36 ++----
docs/streams/developer-guide/processor-api.html | 135 ++++++++++++++-------
.../examples/wordcount/WordCountProcessorDemo.java | 86 +++++++------
.../examples/wordcount/WordCountProcessorTest.java | 2 +-
4 files changed, 141 insertions(+), 118 deletions(-)
diff --git a/docs/streams/developer-guide/dsl-api.html
b/docs/streams/developer-guide/dsl-api.html
index d2bce04..babeefe 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -3446,33 +3446,35 @@ grouped
<p>First, we need to implement a custom stream processor,
<code class="docutils literal"><span
class="pre">PopularPageEmailAlert</span></code>, that implements the <code
class="docutils literal"><span class="pre">Processor</span></code>
interface:</p>
<pre class="line-numbers"><code class="language-java">// A
processor that sends an alert message about a popular page to a configurable
email address
-public class PopularPageEmailAlert implements Processor<PageId, Long> {
+public class PopularPageEmailAlert implements Processor<PageId, Long, Void,
Void> {
private final String emailAddress;
- private ProcessorContext context;
+ private ProcessorContext<Void, Void> context;
public PopularPageEmailAlert(String emailAddress) {
this.emailAddress = emailAddress;
}
@Override
- public void init(ProcessorContext context) {
+ public void init(ProcessorContext<Void, Void> context) {
this.context = context;
// Here you would perform any additional initializations such as setting
up an email client.
}
@Override
- void process(PageId pageId, Long count) {
+ void process(Record<PageId, Long> record) {
// Here you would format and send the alert email.
//
- // In this specific example, you would be able to include information
about the page's ID and its view count
- // (because the class implements `Processor<PageId, Long>`).
+ // In this specific example, you would be able to include
+ // information about the page's ID and its view count
}
@Override
void close() {
- // Any code for clean up would go here. This processor instance will not
be used again after this call.
+ // Any code for clean up would go here, for example tearing down the email
client and anything
+ // else you created in the init() method
+ // This processor instance will not be used again after this call.
}
}</code></pre>
@@ -3492,7 +3494,6 @@ public class PopularPageEmailAlert implements
Processor<PageId, Long> {
</ul>
</div>
<p>Then we can leverage the <code class="docutils
literal"><span class="pre">PopularPageEmailAlert</span></code> processor in the
DSL via <code class="docutils literal"><span
class="pre">KStream#process</span></code>.</p>
- <p>In Java 8+, using lambda expressions:</p>
<pre class="line-numbers"><code
class="language-java">KStream<String, GenericRecord> pageViews = ...;
// Send an email notification when the view count of a page reaches one
thousand.
@@ -3502,25 +3503,6 @@ pageViews.groupByKey()
// PopularPageEmailAlert is your custom processor that implements the
// `Processor` interface, see further down below.
.process(() -> new
PopularPageEmailAlert("[email protected]"));</code></pre>
- <p>In Java 7:</p>
- <pre class="line-numbers"><code class="language-java">// Send
an email notification when the view count of a page reaches one thousand.
-pageViews.groupByKey().
- .count()
- .filter(
- new Predicate<PageId, Long>() {
- public boolean test(PageId pageId, Long viewCount) {
- return viewCount == 1000;
- }
- })
- .process(
- new ProcessorSupplier<PageId, Long>() {
- public Processor<PageId, Long> get() {
- // PopularPageEmailAlert is your custom processor that
implements
- // the `Processor` interface, see further down below.
- return new
PopularPageEmailAlert("[email protected]");
- }
- });</code></pre>
- </div>
</div>
<div class="section" id="naming-a-streams-app">
<a class="headerlink" href="#naming-a-streams-app"
title="Permalink to this headline"><h2><a class="toc-backref"
href="#id33">Naming Operators in a Streams DSL application</a></h2></a>
diff --git a/docs/streams/developer-guide/processor-api.html
b/docs/streams/developer-guide/processor-api.html
index 589a3ff..90706e5 100644
--- a/docs/streams/developer-guide/processor-api.html
+++ b/docs/streams/developer-guide/processor-api.html
@@ -86,12 +86,48 @@
<code class="docutils literal"><span
class="pre">close()</span></code> method. Note that Kafka Streams may re-use a
single
<code class="docutils literal"><span
class="pre">Processor</span></code> object by calling
<code class="docutils literal"><span
class="pre">init()</span></code> on it again after <code class="docutils
literal"><span class="pre">close()</span></code>.</p>
- <p>When records are forwarded via downstream processors they also
get a timestamp assigned. There are two different default behaviors:
- (1) If <code class="docutils literal"><span
class="pre">#forward()</span></code> is called within <code class="docutils
literal"><span class="pre">#process()</span></code> the output record inherits
the input record timestamp.
- (2) If <code class="docutils literal"><span
class="pre">#forward()</span></code> is called within <code class="docutils
literal"><span class="pre">punctuate()</span></code></p> the output record
inherits the current punctuation timestamp (either current 'stream time' or
system wall-clock time).
- Note, that <code class="docutils literal"><span
class="pre">#forward()</span></code> also allows to change the default behavior
by passing a custom timestamp for the output record.</p>
- <p>Specifically, <code class="docutils literal"><span
class="pre">ProcessorContext#schedule()</span></code> accepts a user <code
class="docutils literal"><span class="pre">Punctuator</span></code> callback
interface, which triggers its <code class="docutils literal"><span
class="pre">punctuate()</span></code>
- API method periodically based on the <code class="docutils
literal"><span class="pre">PunctuationType</span></code>. The <code
class="docutils literal"><span class="pre">PunctuationType</span></code>
determines what notion of time is used
+ <p>
+ The <code class="docutils literal"><span
class="pre">Processor</span></code> interface takes two sets of generic
parameters:
+ <code class="docutils literal"><span class="pre">KIn, VIn, KOut,
VOut</span></code>. These define the input and output types
+ that the processor implementation can handle. <code
class="docutils literal"><span class="pre">KIn</span></code> and
+ <code class="docutils literal"><span class="pre">VIn</span></code>
define the key and value types that will be passed
+ to <code class="docutils literal"><span
class="pre">process()</span></code>.
+ Likewise, <code class="docutils literal"><span
class="pre">KOut</span></code> and <code class="docutils literal"><span
class="pre">VOut</span></code>
+ define the forwarded key and value types that <code
class="docutils literal"><span
class="pre">ProcessorContext#forward()</span></code>
+ will accept. If your processor does not forward any records at all
(or if it only forwards
+ <code class="docutils literal"><span
class="pre">null</span></code> keys or values),
+ a best practice is to set the output generic type argument to
+ <code class="docutils literal"><span
class="pre">Void</span></code>.
+ If it needs to forward multiple types that don't share a common
superclass, you will
+ have to set the output generic type argument to <code
class="docutils literal"><span class="pre">Object</span></code>.
+ </p>
+ <p>
+ Both the <code class="docutils literal"><span
class="pre">Processor#process()</span></code>
+ and the <code class="docutils literal"><span
class="pre">ProcessorContext#forward()</span></code>
+ methods handle records in the form of the <code class="docutils
literal"><span class="pre">Record<K, V></span></code>
+ data class. This class gives you access to the main components of
a Kafka record:
+ the key, value, timestamp and headers. When forwarding records,
you can use the
+ constructor to create a new <code class="docutils literal"><span
class="pre">Record</span></code>
+ from scratch, or you can use the convenience builder methods to
replace one of the
+ <code class="docutils literal"><span
class="pre">Record</span></code>'s properties
+ and copy over the rest. For example,
+ <code class="docutils literal"><span
class="pre">inputRecord.withValue(newValue)</span></code>
+ would copy the key, timestamp, and headers from
+ <code class="docutils literal"><span
class="pre">inputRecord</span></code> while
+ setting the output record's value to <code class="docutils
literal"><span class="pre">newValue</span></code>.
+ Note that this does not mutate <code class="docutils
literal"><span class="pre">inputRecord</span></code>,
+ but instead creates a shallow copy. Beware that this is only a
shallow copy, so if you
+ plan to mutate the key, value, or headers elsewhere in the
program, you will want to
+ create a deep copy of those fields yourself.
+ </p>
+ <p>
+ In addition to handling incoming records via
+ <code class="docutils literal"><span
class="pre">Processor#process()</span></code>,
+ you have the option to schedule periodic invocation (called
"punctuation")
+ in your processor's <code class="docutils literal"><span
class="pre">init()</span></code>
+ method by calling <code class="docutils literal"><span
class="pre">ProcessorContext#schedule()</span></code>
+ and passing it a <code class="docutils literal"><span
class="pre">Punctuator</span></code>.
+ The <code class="docutils literal"><span
class="pre">PunctuationType</span></code> determines what notion of time is used
for the punctuation scheduling: either <a class="reference
internal" href="../core-concepts.html#streams_time"><span class="std
std-ref">stream-time</span></a> or wall-clock-time (by default, stream-time
is configured to represent event-time via <code
class="docutils literal"><span class="pre">TimestampExtractor</span></code>).
When stream-time is used, <code class="docutils literal"><span
class="pre">punctuate()</span></code> is triggered purely
by data because stream-time is determined (and advanced
forward) by the timestamps derived from the input data. When there
@@ -108,8 +144,10 @@
times inside <code class="docutils literal"><span
class="pre">init()</span></code> method.</p>
<div class="admonition attention">
<p class="first admonition-title"><b>Attention</b></p>
- <p class="last">Stream-time is only advanced if all input
partitions over all input topics have new data (with newer timestamps)
available.
- If at least one partition does not have any new data
available, stream-time will not be advanced and thus <code class="docutils
literal"><span class="pre">punctuate()</span></code> will not be triggered if
<code class="docutils literal"><span
class="pre">PunctuationType.STREAM_TIME</span></code> was specified.
+ <p class="last">Stream-time is only advanced when Streams
processes records.
+ If there are no records to process, or if Streams is waiting
for new records
+ due to the <a class="reference internal"
href="/documentation/#streamsconfigs_max.task.idle.ms">Task Idling</a>
+ configuration, then the stream time will not advance and
<code class="docutils literal"><span class="pre">punctuate()</span></code> will
not be triggered if <code class="docutils literal"><span
class="pre">PunctuationType.STREAM_TIME</span></code> was specified.
This behavior is independent of the configured timestamp
extractor, i.e., using <code class="docutils literal"><span
class="pre">WallclockTimestampExtractor</span></code> does not enable
wall-clock triggering of <code class="docutils literal"><span
class="pre">punctuate()</span></code>.</p>
</div>
<p><b>Example</b></p>
@@ -119,45 +157,42 @@
<li>In the <code class="docutils literal"><span
class="pre">process()</span></code> method, upon each received record, split
the value string into words, and update their counts into the state store (we
will talk about this later in this section).</li>
<li>In the <code class="docutils literal"><span
class="pre">punctuate()</span></code> method, iterate the local state store and
send the aggregated counts to the downstream processor (we will talk about
downstream processors later in this section), and commit the current stream
state.</li>
</ul>
- <pre class="line-numbers"><code class="language-java">public class
WordCountProcessor implements Processor<String, String> {
-
- private ProcessorContext context;
- private KeyValueStore<String, Long> kvStore;
-
- @Override
- @SuppressWarnings("unchecked")
- public void init(ProcessorContext context) {
- // keep the processor context locally because we need it in punctuate()
and commit()
- this.context = context;
+ <pre class="line-numbers"><code class="language-java">public class
WordCountProcessor implements Processor<String, String, String, String> {
+ private KeyValueStore<String, Integer> kvStore;
- // retrieve the key-value store named "Counts"
- kvStore = (KeyValueStore) context.getStateStore("Counts");
+ @Override
+ public void init(final ProcessorContext<String, String> context) {
+ context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME,
timestamp -> {
+ try (final KeyValueIterator<String, Integer> iter =
kvStore.all()) {
+ while (iter.hasNext()) {
+ final KeyValue<String, Integer> entry = iter.next();
+ context.forward(new Record<>(entry.key,
entry.value.toString(), timestamp));
+ }
+ }
+ });
+ kvStore = context.getStateStore("Counts");
+ }
- // schedule a punctuate() method every second based on stream-time
- this.context.schedule(Duration.ofSeconds(1000),
PunctuationType.STREAM_TIME, (timestamp) -> {
- KeyValueIterator<String, Long> iter = this.kvStore.all();
- while (iter.hasNext()) {
- KeyValue<String, Long> entry = iter.next();
- context.forward(entry.key, entry.value.toString());
- }
- iter.close();
+ @Override
+ public void process(final Record<String, String> record) {
+ final String[] words =
record.value().toLowerCase(Locale.getDefault()).split("\\W+");
- // commit the current processing progress
- context.commit();
- });
- }
+ for (final String word : words) {
+ final Integer oldValue = kvStore.get(word);
- @Override
- public void punctuate(long timestamp) {
- // this method is deprecated and should not be used anymore
- }
-
- @Override
- public void close() {
- // close any resources managed by this processor
- // Note: Do not close any StateStores as these are managed by the library
- }
+ if (oldValue == null) {
+ kvStore.put(word, 1);
+ } else {
+ kvStore.put(word, oldValue + 1);
+ }
+ }
+ }
+ @Override
+ public void close() {
+ // close any resources managed by this processor
+ // Note: Do not close any StateStores as these are managed by the
library
+ }
}</code></pre>
<div class="admonition note">
<p><b>Note</b></p>
@@ -428,12 +463,20 @@ builder.addSource("Source", "source-topic")
builder.addSource("Source", "source-topic")
// add the WordCountProcessor node which takes the source processor as its
upstream processor.
// the ProcessorSupplier provides the count store associated with the
WordCountProcessor
- .addProcessor("Process", new ProcessorSupplier<String, String>() {
- public Processor<String, String> get() {
+ .addProcessor("Process", new ProcessorSupplier<String, String, String,
String>() {
+ public Processor<String, String, String, String> get() {
return new WordCountProcessor();
}
- public Set<StoreBuilder<?>> stores() {
- return countStoreBuilder;
+
+ public Set<StoreBuilder<?>> stores() {
+ final StoreBuilder<KeyValueStore<String, Long>>
countsStoreBuilder =
+ Stores
+ .keyValueStoreBuilder(
+ Stores.persistentKeyValueStore("Counts"),
+ Serdes.String(),
+ Serdes.Long()
+ );
+ return Collections.singleton(countsStoreBuilder);
}
}, "Source")
// add the sink processor node that takes Kafka topic "sink-topic" as
output
diff --git
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
index 2c3ad1f..014923f 100644
---
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
+++
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
@@ -16,28 +16,26 @@
*/
package org.apache.kafka.streams.examples.wordcount;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
-import org.apache.kafka.streams.processor.api.ProcessorSupplier;
-import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.time.Duration;
-import java.util.Locale;
-import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
-
/**
* Demonstrates, using the low-level Processor APIs, how to implement the
WordCount program
* that computes a simple word occurrence histogram from an input text.
@@ -54,47 +52,46 @@ import java.util.concurrent.CountDownLatch;
* {@code bin/kafka-console-producer.sh}). Otherwise you won't see any data
arriving in the output topic.
*/
public final class WordCountProcessorDemo {
-
- static class MyProcessorSupplier implements ProcessorSupplier<String,
String, String, String> {
+ static class WordCountProcessor implements Processor<String, String,
String, String> {
+ private KeyValueStore<String, Integer> kvStore;
@Override
- public Processor<String, String, String, String> get() {
- return new Processor<String, String, String, String>() {
- private KeyValueStore<String, Integer> kvStore;
-
- @Override
- public void init(final ProcessorContext<String, String>
context) {
- context.schedule(Duration.ofSeconds(1),
PunctuationType.STREAM_TIME, timestamp -> {
- try (final KeyValueIterator<String, Integer> iter =
kvStore.all()) {
- System.out.println("----------- " + timestamp + "
----------- ");
-
- while (iter.hasNext()) {
- final KeyValue<String, Integer> entry =
iter.next();
-
- System.out.println("[" + entry.key + ", " +
entry.value + "]");
-
- context.forward(new Record<>(entry.key,
entry.value.toString(), timestamp));
- }
- }
- });
- kvStore = context.getStateStore("Counts");
- }
+ public void init(final ProcessorContext<String, String> context) {
+ context.schedule(Duration.ofSeconds(1),
PunctuationType.STREAM_TIME, timestamp -> {
+ try (final KeyValueIterator<String, Integer> iter =
kvStore.all()) {
+ System.out.println("----------- " + timestamp + "
----------- ");
- @Override
- public void process(final Record<String, String> record) {
- final String[] words =
record.value().toLowerCase(Locale.getDefault()).split("\\W+");
+ while (iter.hasNext()) {
+ final KeyValue<String, Integer> entry = iter.next();
- for (final String word : words) {
- final Integer oldValue = kvStore.get(word);
+ System.out.println("[" + entry.key + ", " +
entry.value + "]");
- if (oldValue == null) {
- kvStore.put(word, 1);
- } else {
- kvStore.put(word, oldValue + 1);
- }
+ context.forward(new Record<>(entry.key,
entry.value.toString(), timestamp));
}
}
- };
+ });
+ kvStore = context.getStateStore("Counts");
+ }
+
+ @Override
+ public void process(final Record<String, String> record) {
+ final String[] words =
record.value().toLowerCase(Locale.getDefault()).split("\\W+");
+
+ for (final String word : words) {
+ final Integer oldValue = kvStore.get(word);
+
+ if (oldValue == null) {
+ kvStore.put(word, 1);
+ } else {
+ kvStore.put(word, oldValue + 1);
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ // close any resources managed by this processor
+ // Note: Do not close any StateStores as these are managed by the
library
}
}
@@ -122,7 +119,8 @@ public final class WordCountProcessorDemo {
builder.addSource("Source", "streams-plaintext-input");
- builder.addProcessor("Process", new MyProcessorSupplier(), "Source");
+ builder.addProcessor("Process", WordCountProcessor::new, "Source");
+
builder.addStateStore(Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("Counts"),
Serdes.String(),
diff --git
a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
index eb62630..1343a29 100644
---
a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
+++
b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
@@ -48,7 +48,7 @@ public class WordCountProcessorTest {
store.init(context.getStateStoreContext(), store);
// Create and initialize the processor under test
- final Processor<String, String, String, String> processor = new
WordCountProcessorDemo.MyProcessorSupplier().get();
+ final Processor<String, String, String, String> processor = new
WordCountProcessorDemo.WordCountProcessor();
processor.init(context);
// send a record to the processor