This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 06f306a  KAFKA-8410: Update the docs to reference the new PAPI (#10994)
06f306a is described below

commit 06f306a8a8910f2ce0b2656f666d9537593a0898
Author: John Roesler <[email protected]>
AuthorDate: Tue Jul 13 10:23:50 2021 -0500

    KAFKA-8410: Update the docs to reference the new PAPI (#10994)
    
    Reviewers: Jim Galasyn <[email protected]>, Luke Chen 
<[email protected]>, Matthias J. Sax <[email protected]>
---
 docs/streams/developer-guide/dsl-api.html          |  36 ++----
 docs/streams/developer-guide/processor-api.html    | 135 ++++++++++++++-------
 .../examples/wordcount/WordCountProcessorDemo.java |  86 +++++++------
 .../examples/wordcount/WordCountProcessorTest.java |   2 +-
 4 files changed, 141 insertions(+), 118 deletions(-)

diff --git a/docs/streams/developer-guide/dsl-api.html 
b/docs/streams/developer-guide/dsl-api.html
index d2bce04..babeefe 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -3446,33 +3446,35 @@ grouped
                 <p>First, we need to implement a custom stream processor, 
<code class="docutils literal"><span 
class="pre">PopularPageEmailAlert</span></code>, that implements the <code 
class="docutils literal"><span class="pre">Processor</span></code>
                     interface:</p>
                 <pre class="line-numbers"><code class="language-java">// A 
processor that sends an alert message about a popular page to a configurable 
email address
-public class PopularPageEmailAlert implements Processor&lt;PageId, Long&gt; {
+public class PopularPageEmailAlert implements Processor&lt;PageId, Long, Void, 
Void&gt; {
 
   private final String emailAddress;
-  private ProcessorContext context;
+  private ProcessorContext&lt;Void, Void&gt; context;
 
   public PopularPageEmailAlert(String emailAddress) {
     this.emailAddress = emailAddress;
   }
 
   @Override
-  public void init(ProcessorContext context) {
+  public void init(ProcessorContext&lt;Void, Void&gt; context) {
     this.context = context;
 
     // Here you would perform any additional initializations such as setting 
up an email client.
   }
 
   @Override
-  void process(PageId pageId, Long count) {
+  void process(Record&lt;PageId, Long&gt; record) {
     // Here you would format and send the alert email.
     //
-    // In this specific example, you would be able to include information 
about the page&#39;s ID and its view count
-    // (because the class implements `Processor&lt;PageId, Long&gt;`).
+    // In this specific example, you would be able to include
+    // information about the page&#39;s ID and its view count
   }
 
   @Override
   void close() {
-    // Any code for clean up would go here.  This processor instance will not 
be used again after this call.
+    // Any code for clean up would go here, for example tearing down the email 
client and anything
+    // else you created in the init() method
+    // This processor instance will not be used again after this call.
   }
 
 }</code></pre>
@@ -3492,7 +3494,6 @@ public class PopularPageEmailAlert implements 
Processor&lt;PageId, Long&gt; {
                     </ul>
                 </div>
                 <p>Then we can leverage the <code class="docutils 
literal"><span class="pre">PopularPageEmailAlert</span></code> processor in the 
DSL via <code class="docutils literal"><span 
class="pre">KStream#process</span></code>.</p>
-                <p>In Java 8+, using lambda expressions:</p>
                 <pre class="line-numbers"><code 
class="language-java">KStream&lt;String, GenericRecord&gt; pageViews = ...;
 
 // Send an email notification when the view count of a page reaches one 
thousand.
@@ -3502,25 +3503,6 @@ pageViews.groupByKey()
          // PopularPageEmailAlert is your custom processor that implements the
          // `Processor` interface, see further down below.
          .process(() -&gt; new 
PopularPageEmailAlert(&quot;[email protected]&quot;));</code></pre>
-                <p>In Java 7:</p>
-                <pre class="line-numbers"><code class="language-java">// Send 
an email notification when the view count of a page reaches one thousand.
-pageViews.groupByKey().
-         .count()
-         .filter(
-            new Predicate&lt;PageId, Long&gt;() {
-              public boolean test(PageId pageId, Long viewCount) {
-                return viewCount == 1000;
-              }
-            })
-         .process(
-           new ProcessorSupplier&lt;PageId, Long&gt;() {
-             public Processor&lt;PageId, Long&gt; get() {
-               // PopularPageEmailAlert is your custom processor that 
implements
-               // the `Processor` interface, see further down below.
-               return new 
PopularPageEmailAlert(&quot;[email protected]&quot;);
-             }
-           });</code></pre>
-            </div>
         </div>
         <div class="section" id="naming-a-streams-app">
             <a class="headerlink" href="#naming-a-streams-app" 
title="Permalink to this headline"><h2><a class="toc-backref" 
href="#id33">Naming Operators in a Streams DSL application</a></h2></a>
diff --git a/docs/streams/developer-guide/processor-api.html 
b/docs/streams/developer-guide/processor-api.html
index 589a3ff..90706e5 100644
--- a/docs/streams/developer-guide/processor-api.html
+++ b/docs/streams/developer-guide/processor-api.html
@@ -86,12 +86,48 @@
               <code class="docutils literal"><span 
class="pre">close()</span></code> method. Note that Kafka Streams may re-use a 
single
               <code class="docutils literal"><span 
class="pre">Processor</span></code> object by calling
               <code class="docutils literal"><span 
class="pre">init()</span></code> on it again after <code class="docutils 
literal"><span class="pre">close()</span></code>.</p>
-            <p>When records are forwarded via downstream processors they also 
get a timestamp assigned. There are two different default behaviors:
-              (1) If <code class="docutils literal"><span 
class="pre">#forward()</span></code> is called within <code class="docutils 
literal"><span class="pre">#process()</span></code> the output record inherits 
the input record timestamp.
-              (2) If <code class="docutils literal"><span 
class="pre">#forward()</span></code> is called within <code class="docutils 
literal"><span class="pre">punctuate()</span></code></p> the output record 
inherits the current punctuation timestamp (either current 'stream time' or 
system wall-clock time).
-              Note, that <code class="docutils literal"><span 
class="pre">#forward()</span></code> also allows to change the default behavior 
by passing a custom timestamp for the output record.</p>
-            <p>Specifically, <code class="docutils literal"><span 
class="pre">ProcessorContext#schedule()</span></code> accepts a user <code 
class="docutils literal"><span class="pre">Punctuator</span></code> callback 
interface, which triggers its <code class="docutils literal"><span 
class="pre">punctuate()</span></code>
-                API method periodically based on the <code class="docutils 
literal"><span class="pre">PunctuationType</span></code>. The <code 
class="docutils literal"><span class="pre">PunctuationType</span></code> 
determines what notion of time is used
+          <p>
+            The <code class="docutils literal"><span 
class="pre">Processor</span></code> interface takes two sets of generic 
parameters:
+            <code class="docutils literal"><span class="pre">KIn, VIn, KOut, 
VOut</span></code>. These define the input and output types
+            that the processor implementation can handle. <code 
class="docutils literal"><span class="pre">KIn</span></code> and
+            <code class="docutils literal"><span class="pre">VIn</span></code> 
define the key and value types that will be passed
+            to <code class="docutils literal"><span 
class="pre">process()</span></code>.
+            Likewise, <code class="docutils literal"><span 
class="pre">KOut</span></code> and <code class="docutils literal"><span 
class="pre">VOut</span></code>
+            define the forwarded key and value types that <code 
class="docutils literal"><span 
class="pre">ProcessorContext#forward()</span></code>
+            will accept. If your processor does not forward any records at all 
(or if it only forwards
+            <code class="docutils literal"><span 
class="pre">null</span></code> keys or values),
+            a best practice is to set the output generic type argument to
+            <code class="docutils literal"><span 
class="pre">Void</span></code>.
+            If it needs to forward multiple types that don't share a common 
superclass, you will
+            have to set the output generic type argument to <code 
class="docutils literal"><span class="pre">Object</span></code>.
+          </p>
+          <p>
+            Both the <code class="docutils literal"><span 
class="pre">Processor#process()</span></code>
+            and the <code class="docutils literal"><span 
class="pre">ProcessorContext#forward()</span></code>
+            methods handle records in the form of the <code class="docutils 
literal"><span class="pre">Record&lt;K, V&gt;</span></code>
+            data class. This class gives you access to the main components of 
a Kafka record:
+            the key, value, timestamp and headers. When forwarding records, 
you can use the
+            constructor to create a new <code class="docutils literal"><span 
class="pre">Record</span></code>
+            from scratch, or you can use the convenience builder methods to 
replace one of the
+            <code class="docutils literal"><span 
class="pre">Record</span></code>'s properties
+            and copy over the rest. For example,
+            <code class="docutils literal"><span 
class="pre">inputRecord.withValue(newValue)</span></code>
+            would copy the key, timestamp, and headers from
+            <code class="docutils literal"><span 
class="pre">inputRecord</span></code> while
+            setting the output record's value to <code class="docutils 
literal"><span class="pre">newValue</span></code>.
+            Note that this does not mutate <code class="docutils 
literal"><span class="pre">inputRecord</span></code>,
+            but instead creates a shallow copy. Beware that this is only a 
shallow copy, so if you
+            plan to mutate the key, value, or headers elsewhere in the 
program, you will want to
+            create a deep copy of those fields yourself.
+          </p>
+            <p>
+              In addition to handling incoming records via
+              <code class="docutils literal"><span 
class="pre">Processor#process()</span></code>,
+              you have the option to schedule periodic invocation (called 
"punctuation")
+              in your processor's <code class="docutils literal"><span 
class="pre">init()</span></code>
+              method by calling  <code class="docutils literal"><span 
class="pre">ProcessorContext#schedule()</span></code>
+              and passing it a  <code class="docutils literal"><span 
class="pre">Punctuator</span></code>.
+                The <code class="docutils literal"><span 
class="pre">PunctuationType</span></code> determines what notion of time is used
                 for the punctuation scheduling: either <a class="reference 
internal" href="../core-concepts.html#streams_time"><span class="std 
std-ref">stream-time</span></a> or wall-clock-time (by default, stream-time
                 is configured to represent event-time via <code 
class="docutils literal"><span class="pre">TimestampExtractor</span></code>). 
When stream-time is used, <code class="docutils literal"><span 
class="pre">punctuate()</span></code> is triggered purely
                 by data because stream-time is determined (and advanced 
forward) by the timestamps derived from the input data. When there
@@ -108,8 +144,10 @@
                 times inside <code class="docutils literal"><span 
class="pre">init()</span></code> method.</p>
             <div class="admonition attention">
                 <p class="first admonition-title"><b>Attention</b></p>
-                <p class="last">Stream-time is only advanced if all input 
partitions over all input topics have new data (with newer timestamps) 
available.
-                    If at least one partition does not have any new data 
available, stream-time will not be advanced and thus <code class="docutils 
literal"><span class="pre">punctuate()</span></code> will not be triggered if 
<code class="docutils literal"><span 
class="pre">PunctuationType.STREAM_TIME</span></code> was specified.
+                <p class="last">Stream-time is only advanced when Streams 
processes records.
+                  If there are no records to process, or if Streams is waiting 
for new records
+                  due to the <a class="reference internal" 
href="/documentation/#streamsconfigs_max.task.idle.ms">Task Idling</a>
+                  configuration, then the stream time will not advance and 
<code class="docutils literal"><span class="pre">punctuate()</span></code> will 
not be triggered if <code class="docutils literal"><span 
class="pre">PunctuationType.STREAM_TIME</span></code> was specified.
                     This behavior is independent of the configured timestamp 
extractor, i.e., using <code class="docutils literal"><span 
class="pre">WallclockTimestampExtractor</span></code> does not enable 
wall-clock triggering of <code class="docutils literal"><span 
class="pre">punctuate()</span></code>.</p>
             </div>
             <p><b>Example</b></p>
@@ -119,45 +157,42 @@
                 <li>In the <code class="docutils literal"><span 
class="pre">process()</span></code> method, upon each received record, split 
the value string into words, and update their counts into the state store (we 
will talk about this later in this section).</li>
                 <li>In the <code class="docutils literal"><span 
class="pre">punctuate()</span></code> method, iterate the local state store and 
send the aggregated counts to the downstream processor (we will talk about 
downstream processors later in this section), and commit the current stream 
state.</li>
             </ul>
-            <pre class="line-numbers"><code class="language-java">public class 
WordCountProcessor implements Processor&lt;String, String&gt; {
-
-  private ProcessorContext context;
-  private KeyValueStore&lt;String, Long&gt; kvStore;
-
-  @Override
-  @SuppressWarnings(&quot;unchecked&quot;)
-  public void init(ProcessorContext context) {
-      // keep the processor context locally because we need it in punctuate() 
and commit()
-      this.context = context;
+            <pre class="line-numbers"><code class="language-java">public class 
WordCountProcessor implements Processor&lt;String, String, String, String&gt; {
+    private KeyValueStore&lt;String, Integer&gt; kvStore;
 
-      // retrieve the key-value store named &quot;Counts&quot;
-      kvStore = (KeyValueStore) context.getStateStore(&quot;Counts&quot;);
+    @Override
+    public void init(final ProcessorContext&lt;String, String> context) {
+        context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, 
timestamp -&gt; {
+            try (final KeyValueIterator&lt;String, Integer&gt; iter = 
kvStore.all()) {
+                while (iter.hasNext()) {
+                    final KeyValue&lt;String, Integer&gt; entry = iter.next();
+                    context.forward(new Record&lt;&gt;(entry.key, 
entry.value.toString(), timestamp));
+                }
+            }
+        });
+        kvStore = context.getStateStore("Counts");
+    }
 
-      // schedule a punctuate() method every second based on stream-time
-      this.context.schedule(Duration.ofSeconds(1000), 
PunctuationType.STREAM_TIME, (timestamp) -&gt; {
-          KeyValueIterator&lt;String, Long&gt; iter = this.kvStore.all();
-          while (iter.hasNext()) {
-              KeyValue&lt;String, Long&gt; entry = iter.next();
-              context.forward(entry.key, entry.value.toString());
-          }
-          iter.close();
+    @Override
+    public void process(final Record&lt;String, String&gt; record) {
+        final String[] words = 
record.value().toLowerCase(Locale.getDefault()).split("\\W+");
 
-          // commit the current processing progress
-          context.commit();
-      });
-  }
+        for (final String word : words) {
+            final Integer oldValue = kvStore.get(word);
 
-  @Override
-  public void punctuate(long timestamp) {
-      // this method is deprecated and should not be used anymore
-  }
-
-  @Override
-  public void close() {
-      // close any resources managed by this processor
-      // Note: Do not close any StateStores as these are managed by the library
-  }
+            if (oldValue == null) {
+                kvStore.put(word, 1);
+            } else {
+                kvStore.put(word, oldValue + 1);
+            }
+        }
+    }
 
+    @Override
+    public void close() {
+        // close any resources managed by this processor
+        // Note: Do not close any StateStores as these are managed by the 
library
+    }
 }</code></pre>
             <div class="admonition note">
                 <p><b>Note</b></p>
@@ -428,12 +463,20 @@ builder.addSource("Source", "source-topic")
 builder.addSource("Source", "source-topic")
     // add the WordCountProcessor node which takes the source processor as its 
upstream processor.
     // the ProcessorSupplier provides the count store associated with the 
WordCountProcessor
-    .addProcessor("Process", new ProcessorSupplier&ltString, String&gt() {
-        public Processor&ltString, String&gt get() {
+    .addProcessor("Process", new ProcessorSupplier&lt;String, String, String, 
String&gt;() {
+        public Processor&lt;String, String, String, String&gt; get() {
             return new WordCountProcessor();
         }
-        public Set&ltStoreBuilder&lt?&gt&gt stores() {
-            return countStoreBuilder;
+
+        public Set&lt;StoreBuilder&lt;?&gt;&gt; stores() {
+            final StoreBuilder&lt;KeyValueStore&lt;String, Long&gt;&gt; 
countsStoreBuilder =
+                Stores
+                    .keyValueStoreBuilder(
+                        Stores.persistentKeyValueStore("Counts"),
+                        Serdes.String(),
+                        Serdes.Long()
+                    );
+            return Collections.singleton(countsStoreBuilder);
         }
     }, "Source")
     // add the sink processor node that takes Kafka topic "sink-topic" as 
output
diff --git 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
index 2c3ad1f..014923f 100644
--- 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
+++ 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
@@ -16,28 +16,26 @@
  */
 package org.apache.kafka.streams.examples.wordcount;
 
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
-import org.apache.kafka.streams.processor.api.ProcessorSupplier;
-import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
 
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.time.Duration;
-import java.util.Locale;
-import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
-
 /**
  * Demonstrates, using the low-level Processor APIs, how to implement the 
WordCount program
  * that computes a simple word occurrence histogram from an input text.
@@ -54,47 +52,46 @@ import java.util.concurrent.CountDownLatch;
  * {@code bin/kafka-console-producer.sh}). Otherwise you won't see any data 
arriving in the output topic.
  */
 public final class WordCountProcessorDemo {
-
-    static class MyProcessorSupplier implements ProcessorSupplier<String, 
String, String, String> {
+    static class WordCountProcessor implements Processor<String, String, 
String, String> {
+        private KeyValueStore<String, Integer> kvStore;
 
         @Override
-        public Processor<String, String, String, String> get() {
-            return new Processor<String, String, String, String>() {
-                private KeyValueStore<String, Integer> kvStore;
-
-                @Override
-                public void init(final ProcessorContext<String, String> 
context) {
-                    context.schedule(Duration.ofSeconds(1), 
PunctuationType.STREAM_TIME, timestamp -> {
-                        try (final KeyValueIterator<String, Integer> iter = 
kvStore.all()) {
-                            System.out.println("----------- " + timestamp + " 
----------- ");
-
-                            while (iter.hasNext()) {
-                                final KeyValue<String, Integer> entry = 
iter.next();
-
-                                System.out.println("[" + entry.key + ", " + 
entry.value + "]");
-
-                                context.forward(new Record<>(entry.key, 
entry.value.toString(), timestamp));
-                            }
-                        }
-                    });
-                    kvStore = context.getStateStore("Counts");
-                }
+        public void init(final ProcessorContext<String, String> context) {
+            context.schedule(Duration.ofSeconds(1), 
PunctuationType.STREAM_TIME, timestamp -> {
+                try (final KeyValueIterator<String, Integer> iter = 
kvStore.all()) {
+                    System.out.println("----------- " + timestamp + " 
----------- ");
 
-                @Override
-                public void process(final Record<String, String> record) {
-                    final String[] words = 
record.value().toLowerCase(Locale.getDefault()).split("\\W+");
+                    while (iter.hasNext()) {
+                        final KeyValue<String, Integer> entry = iter.next();
 
-                    for (final String word : words) {
-                        final Integer oldValue = kvStore.get(word);
+                        System.out.println("[" + entry.key + ", " + 
entry.value + "]");
 
-                        if (oldValue == null) {
-                            kvStore.put(word, 1);
-                        } else {
-                            kvStore.put(word, oldValue + 1);
-                        }
+                        context.forward(new Record<>(entry.key, 
entry.value.toString(), timestamp));
                     }
                 }
-            };
+            });
+            kvStore = context.getStateStore("Counts");
+        }
+
+        @Override
+        public void process(final Record<String, String> record) {
+            final String[] words = 
record.value().toLowerCase(Locale.getDefault()).split("\\W+");
+
+            for (final String word : words) {
+                final Integer oldValue = kvStore.get(word);
+
+                if (oldValue == null) {
+                    kvStore.put(word, 1);
+                } else {
+                    kvStore.put(word, oldValue + 1);
+                }
+            }
+        }
+
+        @Override
+        public void close() {
+            // close any resources managed by this processor
+            // Note: Do not close any StateStores as these are managed by the 
library
         }
     }
 
@@ -122,7 +119,8 @@ public final class WordCountProcessorDemo {
 
         builder.addSource("Source", "streams-plaintext-input");
 
-        builder.addProcessor("Process", new MyProcessorSupplier(), "Source");
+        builder.addProcessor("Process", WordCountProcessor::new, "Source");
+
         builder.addStateStore(Stores.keyValueStoreBuilder(
                 Stores.inMemoryKeyValueStore("Counts"),
                 Serdes.String(),
diff --git 
a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
 
b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
index eb62630..1343a29 100644
--- 
a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
+++ 
b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
@@ -48,7 +48,7 @@ public class WordCountProcessorTest {
         store.init(context.getStateStoreContext(), store);
 
         // Create and initialize the processor under test
-        final Processor<String, String, String, String> processor = new 
WordCountProcessorDemo.MyProcessorSupplier().get();
+        final Processor<String, String, String, String> processor = new 
WordCountProcessorDemo.WordCountProcessor();
         processor.init(context);
 
         // send a record to the processor

Reply via email to