This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 4eb35a435df KAFKA-16339: Add Kafka Streams migrating guide from 
transform to process (#18314)
4eb35a435df is described below

commit 4eb35a435df776c00ce194bf73a2dcba4d15c690
Author: Joao Pedro Fonseca Dantas <[email protected]>
AuthorDate: Wed Jan 29 16:07:16 2025 -0300

    KAFKA-16339: Add Kafka Streams migrating guide from transform to process 
(#18314)
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 docs/streams/developer-guide/dsl-api.html | 1021 +++++++++++++++++++++++++----
 1 file changed, 884 insertions(+), 137 deletions(-)

diff --git a/docs/streams/developer-guide/dsl-api.html 
b/docs/streams/developer-guide/dsl-api.html
index 97dc644e839..b59ac764f32 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -64,7 +64,8 @@
                         </li>
                     </ul>
                     </li>
-                    <li><a class="reference internal" 
href="#applying-processors-and-transformers-processor-api-integration" 
id="id24">Applying processors and transformers (Processor API 
integration)</a></li>
+                    <li><a class="reference internal" 
href="#applying-processors-processor-api-integration" id="id24">Applying 
processors (Processor API integration)</a></li>
+                    <li><a class="reference internal" 
href="#transformers-removal-and-migration-to-processors" id="id37">Transformers 
removal and migration to processors</a></li>
                 </ul>
                 </li>
                 <li><a class="reference internal" href="#naming-a-streams-app" 
id="id33">Naming Operators in a Streams DSL application</a></li>
@@ -3095,152 +3096,898 @@ grouped
                    </div>
                 </div>
             </div>
-            <div class="section" 
id="applying-processors-and-transformers-processor-api-integration">
-                <span id="streams-developer-guide-dsl-process"></span><h3><a 
class="toc-backref" href="#id24">Applying processors and transformers 
(Processor API integration)</a><a class="headerlink" 
href="#applying-processors-and-transformers-processor-api-integration" 
title="Permalink to this headline"></a></h3>
-                <p>Beyond the aforementioned <a class="reference internal" 
href="#streams-developer-guide-dsl-transformations-stateless"><span class="std 
std-ref">stateless</span></a> and
-                    <a class="reference internal" 
href="#streams-developer-guide-dsl-transformations-stateless"><span class="std 
std-ref">stateful</span></a> transformations, you may also
-                    leverage the <a class="reference internal" 
href="processor-api.html#streams-developer-guide-processor-api"><span 
class="std std-ref">Processor API</span></a> from the DSL.
-                    There are a number of scenarios where this may be 
helpful:</p>
-                <ul class="simple">
-                    <li><strong>Customization:</strong> You need to implement 
special, customized logic that is not or not yet available in the DSL.</li>
-                    <li><strong>Combining ease-of-use with full flexibility 
where it&#8217;s needed:</strong> Even though you generally prefer to use
-                        the expressiveness of the DSL, there are certain steps 
in your processing that require more flexibility and
-                        tinkering than the DSL provides.  For example, only 
the Processor API provides access to a
-                        record&#8217;s metadata such as its topic, partition, 
and offset information.
-                        However, you don&#8217;t want to switch completely to 
the Processor API just because of that.</li>
-                    <li><strong>Migrating from other tools:</strong> You are 
migrating from other stream processing technologies that provide an
-                        imperative API, and migrating some of your legacy code 
to the Processor API was faster and/or easier than to
-                        migrate completely to the DSL right away.</li>
+            <div class="section" 
id="applying-processors-processor-api-integration">
+                <a class="headerlink" 
href="#applying-processors-processor-api-integration" title="Permalink to this 
headline">
+                    <h3>
+                        <a class="toc-backref" href="#id24">Applying 
processors (Processor API integration)</a>
+                    </h3>
+                </a>
+                <p>Beyond the aforementioned <a class="reference internal"
+                        
href="#streams-developer-guide-dsl-transformations-stateless">
+                        <span class="std std-ref">stateless</span></a> and
+                    <a class="reference internal" 
href="#streams-developer-guide-dsl-transformations-stateless"> <span
+                            class="std std-ref">stateful</span></a> 
transformations, you may also leverage the Processor API from the
+                    DSL. There are a number of scenarios where this may be 
helpful:
+                </p>
+                <ul>
+                    <li><strong>Customization:</strong> You need to implement 
special, customized logic that is not or not yet
+                        available
+                        in the DSL.</li>
+                    <li><strong>Combining ease-of-use with full flexibility 
where it's needed:</strong> Even though you generally
+                        prefer
+                        to use the expressiveness of the DSL, there are 
certain steps in your processing that require more
+                        flexibility and tinkering than the DSL provides. For 
example, only the Processor API provides access to a
+                        record's metadata such as its topic, partition, and 
offset information. However, you don't want to switch
+                        completely to the Processor API just because of that; 
and</li>
+                    <li><strong>Migrating from other tools:</strong> You are 
migrating from other stream processing technologies
+                        that
+                        provide an imperative API, and migrating some of your 
legacy code to the Processor API was faster and/or
+                        easier than to migrate completely to the DSL right 
away.</li>
                 </ul>
-                <table border="1" class="non-scrolling-table width-100-percent 
docutils">
-                    <colgroup>
-                        <col width="19%" />
-                        <col width="81%" />
-                    </colgroup>
-                    <thead valign="bottom">
-                    <tr class="row-odd"><th class="head">Transformation</th>
-                        <th class="head">Description</th>
-                    </tr>
+                <h4>Operations and concepts</h4>
+                <ul>
+                    <li><code>KStream#process</code>: Process all records in a 
stream, one record at a time, by applying a
+                        <code>Processor</code> (provided by a given 
<code>ProcessorSupplier</code>);
+                    </li>
+                    <li><code>KStream#processValues</code>: Process all 
records in a stream, one record at a time, by applying a
+                        <code>FixedKeyProcessor</code> (provided by a given 
<code>FixedKeyProcessorSupplier</code>);
+                    </li>
+                    <li><code>Processor</code>: A processor of key-value pair 
records;</li>
+                    <li><code>ContextualProcessor</code>: An abstract 
implementation of <code>Processor</code> that manages the
+                        <code>ProcessorContext</code> instance.
+                    </li>
+                    <li><code>FixedKeyProcessor</code>: A processor of 
key-value pair records where keys are immutable;</li>
+                    <li><code>ContextualFixedKeyProcessor</code>: An abstract 
implementation of <code>FixedKeyProcessor</code> that
+                        manages the <code>FixedKeyProcessorContext</code> 
instance.
+                    </li>
+                    <li><code>ProcessorSupplier</code>: A processor supplier 
that can create one or more <code>Processor</code>
+                        instances; and</li>
+                    <li><code>FixedKeyProcessorSupplier</code>: A processor 
supplier that can create one or more
+                        <code>FixedKeyProcessor</code> instances.
+                    </li>
+                </ul>
+                <h4>Examples</h4>
+                <p>Follow the examples below to learn how to apply 
<code>process</code> and <code>processValues</code> to your
+                    <code>KStream</code>.
+                </p>
+                <table>
+                    <thead>
+                        <tr>
+                            <th>Example</th>
+                            <th>Operation</th>
+                            <th>State Type</th>
+                        </tr>
                     </thead>
-                    <tbody valign="top">
-                    <tr class="row-even"><td><p 
class="first"><strong>Process</strong></p>
-                        <ul class="last simple">
-                            <li>KStream -&gt; void</li>
-                        </ul>
-                    </td>
-                        <td><p class="first"><strong>Terminal 
operation.</strong>  Applies a <code class="docutils literal"><span 
class="pre">Processor</span></code> to each record.
-                            <code class="docutils literal"><span 
class="pre">process()</span></code> allows you to leverage the <a 
class="reference internal" 
href="processor-api.html#streams-developer-guide-processor-api"><span 
class="std std-ref">Processor API</span></a> from the DSL.
-                            (<a class="reference external" 
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#process-org.apache.kafka.streams.processor.ProcessorSupplier-java.lang.String...-">details</a>)</p>
-                            <p>This is essentially equivalent to adding the 
<code class="docutils literal"><span class="pre">Processor</span></code> via 
<code class="docutils literal"><span 
class="pre">Topology#addProcessor()</span></code> to your
-                                <a class="reference internal" 
href="../core-concepts.html#streams_topology"><span class="std 
std-ref">processor topology</span></a>.</p>
-                            <p class="last">An example is available in the
-                                <a class="reference external" 
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#process-org.apache.kafka.streams.processor.ProcessorSupplier-java.lang.String...-">javadocs</a>.</p>
-                        </td>
-                    </tr>
-                    <tr class="row-odd"><td><p 
class="first"><strong>Transform</strong></p>
-                        <ul class="last simple">
-                            <li>KStream -&gt; KStream</li>
-                        </ul>
-                    </td>
-                        <td><p class="first">Applies a <code class="docutils 
literal"><span class="pre">Transformer</span></code> to each record.
-                            <code class="docutils literal"><span 
class="pre">transform()</span></code> allows you to leverage the <a 
class="reference internal" 
href="processor-api.html#streams-developer-guide-processor-api"><span 
class="std std-ref">Processor API</span></a> from the DSL.
-                            (<a class="reference external" 
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#transform-org.apache.kafka.streams.kstream.TransformerSupplier-java.lang.String...-">details</a>)</p>
-                            <p>Each input record is transformed into zero, 
one, or more output records (similar to the stateless <code class="docutils 
literal"><span class="pre">flatMap</span></code>).
-                                The <code class="docutils literal"><span 
class="pre">Transformer</span></code> must return <code class="docutils 
literal"><span class="pre">null</span></code> for zero output.
-                                You can modify the record&#8217;s key and 
value, including their types.</p>
-                            <p><strong>Marks the stream for data 
re-partitioning:</strong>
-                                Applying a grouping or a join after <code 
class="docutils literal"><span class="pre">transform</span></code> will result 
in re-partitioning of the records.
-                                If possible use <code class="docutils 
literal"><span class="pre">transformValues</span></code> instead, which will 
not cause data re-partitioning.</p>
-                            <p><code class="docutils literal"><span 
class="pre">transform</span></code> is essentially equivalent to adding the 
<code class="docutils literal"><span class="pre">Transformer</span></code> via 
<code class="docutils literal"><span 
class="pre">Topology#addProcessor()</span></code> to your
-                                <a class="reference internal" 
href="../core-concepts.html#streams_topology"><span class="std 
std-ref">processor topology</span></a>.</p>
-                            <p class="last">An example is available in the
-                                <a class="reference external" 
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#transform-org.apache.kafka.streams.kstream.TransformerSupplier-java.lang.String...-">javadocs</a>.
-                               </p>
-                        </td>
-                    </tr>
-                    <tr class="row-even"><td><p 
class="first"><strong>Transform (values only)</strong></p>
-                        <ul class="last simple">
-                            <li>KStream -&gt; KStream</li>
-                            <li>KTable -&gt; KTable</li>
-                        </ul>
-                    </td>
-                        <td><p class="first">Applies a <code class="docutils 
literal"><span class="pre">ValueTransformer</span></code> to each record, while 
retaining the key of the original record.
-                            <code class="docutils literal"><span 
class="pre">transformValues()</span></code> allows you to leverage the <a 
class="reference internal" 
href="processor-api.html#streams-developer-guide-processor-api"><span 
class="std std-ref">Processor API</span></a> from the DSL.
-                            (<a class="reference external" 
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-">details</a>)</p>
-                            <p>Each input record is transformed into exactly 
one output record (zero output records or multiple output records are not 
possible).
-                                The <code class="docutils literal"><span 
class="pre">ValueTransformer</span></code> may return <code class="docutils 
literal"><span class="pre">null</span></code> as the new value for a record.</p>
-                            <p><code class="docutils literal"><span 
class="pre">transformValues</span></code> is preferable to <code 
class="docutils literal"><span class="pre">transform</span></code> because it 
will not cause data re-partitioning.</p>
-                            <p><code class="docutils literal"><span 
class="pre">transformValues</span></code> is essentially equivalent to adding 
the <code class="docutils literal"><span 
class="pre">ValueTransformer</span></code> via <code class="docutils 
literal"><span class="pre">Topology#addProcessor()</span></code> to your
-                                <a class="reference internal" 
href="../core-concepts.html#streams_topology"><span class="std 
std-ref">processor topology</span></a>.</p>
-                            <p class="last">An example is available in the
-                                <a class="reference external" 
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-">javadocs</a>.</p>
-                        </td>
-                    </tr>
+                    <tbody>
+                        <tr>
+                            <td><a 
href="#categorizing-logs-by-severity">Categorizing Logs by Severity</a></td>
+                            <td><code>process</code></td>
+                            <td>Stateless</td>
+                        </tr>
+                        <tr>
+                            <td><a 
href="#replacing-slang-in-text-messages">Replacing Slang in Text 
Messages</a></td>
+                            <td><code>processValues</code></td>
+                            <td>Stateless</td>
+                        </tr>
+                        <tr>
+                            <td><a 
href="#cumulative-discounts-for-a-loyalty-program">Cumulative Discounts for a 
Loyalty Program</a>
+                            </td>
+                            <td><code>process</code></td>
+                            <td>Stateful</td>
+                        </tr>
+                        <tr>
+                            <td><a 
href="#traffic-radar-monitoring-car-count">Traffic Radar Monitoring Car 
Count</a></td>
+                            <td><code>processValues</code></td>
+                            <td>Stateful</td>
+                        </tr>
                     </tbody>
                 </table>
-                <p>The following example shows how to leverage, via the <code 
class="docutils literal"><span class="pre">KStream#process()</span></code> 
method, a custom <code class="docutils literal"><span 
class="pre">Processor</span></code> that sends an
-                    email notification whenever a page view count reaches a 
predefined threshold.</p>
-                <p>First, we need to implement a custom stream processor, 
<code class="docutils literal"><span 
class="pre">PopularPageEmailAlert</span></code>, that implements the <code 
class="docutils literal"><span class="pre">Processor</span></code>
-                    interface:</p>
-                <pre class="line-numbers"><code class="language-java">// A 
processor that sends an alert message about a popular page to a configurable 
email address
-public class PopularPageEmailAlert implements Processor&lt;PageId, Long, Void, 
Void&gt; {
-
-  private final String emailAddress;
-  private ProcessorContext&lt;Void, Void&gt; context;
-
-  public PopularPageEmailAlert(String emailAddress) {
-    this.emailAddress = emailAddress;
-  }
+                <h5 id="categorizing-logs-by-severity">Categorizing Logs by 
Severity</h5>
+                <ul>
+                    <li><strong>Idea:</strong> You have a stream of log 
messages. Each message contains a severity level (e.g.,
+                        INFO,
+                        WARN, ERROR) in the value. The processor filters 
messages, routing ERROR messages to a dedicated topic and
+                        discarding INFO messages. The rest (WARN) are 
forwarded to a dedicated topic too.</li>
+                    <li><strong>Real-World Context:</strong> In a production 
monitoring system, categorizing logs by severity
+                        ensures
+                        ERROR logs are sent to a critical incident management 
system, WARN logs are analyzed for potential risks,
+                        and
+                        INFO logs are stored for basic reporting purposes.</li>
+                </ul>
+                <pre class="line-numbers"><code class="language-java">public 
class CategorizingLogsBySeverityExample {
+    private static final String ERROR_LOGS_TOPIC = 
&quot;error-logs-topic&quot;;
+    private static final String INPUT_LOGS_TOPIC = 
&quot;input-logs-topic&quot;;
+    private static final String UNKNOWN_LOGS_TOPIC = 
&quot;unknown-logs-topic&quot;;
+    private static final String WARN_LOGS_TOPIC = &quot;warn-logs-topic&quot;;
+
+    public static void categorizeWithProcess(final StreamsBuilder builder) {
+        final KStream&lt;String, String&gt; logStream = 
builder.stream(INPUT_LOGS_TOPIC);
+        logStream.process(LogSeverityProcessor::new)
+                .to((key, value, recordContext) -&gt; {
+                    // Determine the target topic dynamically
+                    if (&quot;ERROR&quot;.equals(key)) return ERROR_LOGS_TOPIC;
+                    if (&quot;WARN&quot;.equals(key)) return WARN_LOGS_TOPIC;
+                    return UNKNOWN_LOGS_TOPIC;
+                });
+    }
 
-  @Override
-  public void init(ProcessorContext&lt;Void, Void&gt; context) {
-    this.context = context;
+    private static class LogSeverityProcessor extends 
ContextualProcessor&lt;String, String, String, String&gt; {
+        @Override
+        public void process(final Record&lt;String, String&gt; record) {
+            if (record.value() == null) {
+                return; // Skip null values
+            }
 
-    // Here you would perform any additional initializations such as setting 
up an email client.
-  }
+            // Assume the severity is the first word in the log message
+            // For example: &quot;ERROR: Disk not found&quot; -&gt; 
&quot;ERROR&quot;
+            final int colonIndex = record.value().indexOf(&#39;:&#39;);
+            final String severity = colonIndex &gt; 0 ? 
record.value().substring(0, colonIndex).trim() : &quot;UNKNOWN&quot;;
+
+            // Route logs based on severity
+            switch (severity) {
+                case &quot;ERROR&quot;:
+                    context().forward(record.withKey(ERROR_LOGS_TOPIC));
+                    break;
+                case &quot;WARN&quot;:
+                    context().forward(record.withKey(WARN_LOGS_TOPIC));
+                    break;
+                case &quot;INFO&quot;:
+                    // INFO logs are ignored
+                    break;
+                default:
+                    // Forward to an &quot;unknown&quot; topic for logs with 
unrecognized severities
+                    context().forward(record.withKey(UNKNOWN_LOGS_TOPIC));
+            }
+        }
+    }
+}</code></pre>
+                <h5 id="replacing-slang-in-text-messages">Replacing Slang in 
Text Messages</h5>
+                <ul>
+                    <li><strong>Idea:</strong> A messaging stream contains 
user-generated content, and you want to replace slang
+                        words
+                        with their formal equivalents (e.g., &quot;u&quot; 
becomes &quot;you&quot;, &quot;brb&quot; becomes &quot;be
+                        right back&quot;). The operation only modifies the 
message value and keeps the key intact.</li>
+                    <li><strong>Real-World Context:</strong> In customer 
support chat systems, normalizing text by replacing slang
+                        with
+                        formal equivalents ensures that automated sentiment 
analysis tools work accurately and provide reliable
+                        insights.</li>
+                </ul>
+                <pre class="line-numbers"><code class="language-java">public 
class ReplacingSlangTextInMessagesExample {
+    private static final Map&lt;String, String&gt; SLANG_DICTIONARY = Map.of(
+            &quot;u&quot;, &quot;you&quot;,
+            &quot;brb&quot;, &quot;be right back&quot;,
+            &quot;omg&quot;, &quot;oh my god&quot;,
+            &quot;btw&quot;, &quot;by the way&quot;
+    );
+    private static final String INPUT_MESSAGES_TOPIC = 
&quot;input-messages-topic&quot;;
+    private static final String OUTPUT_MESSAGES_TOPIC = 
&quot;output-messages-topic&quot;;
+
+    public static void replaceWithProcessValues(final StreamsBuilder builder) {
+        KStream&lt;String, String&gt; messageStream = 
builder.stream(INPUT_MESSAGES_TOPIC);
+        
messageStream.processValues(SlangReplacementProcessor::new).to(OUTPUT_MESSAGES_TOPIC);
+    }
 
-  @Override
-  void process(Record&lt;PageId, Long&gt; record) {
-    // Here you would format and send the alert email.
-    //
-    // In this specific example, you would be able to include
-    // information about the page&#39;s ID and its view count
-  }
+    private static class SlangReplacementProcessor extends 
ContextualFixedKeyProcessor&lt;String, String, String&gt; {
+        @Override
+        public void process(final FixedKeyRecord&lt;String, String&gt; record) 
{
+            if (record.value() == null) {
+                return; // Skip null values
+            }
 
-  @Override
-  void close() {
-    // Any code for clean up would go here, for example tearing down the email 
client and anything
-    // else you created in the init() method
-    // This processor instance will not be used again after this call.
-  }
+            // Replace slang words in the message
+            final String[] words = record.value().split(&quot;\\s+&quot;);
+            for (final String word : words) {
+                String replacedWord = SLANG_DICTIONARY.getOrDefault(word, 
word);
+                context().forward(record.withValue(replacedWord));
+            }
+        }
+    }
+}</code></pre>
+                <h5 id="cumulative-discounts-for-a-loyalty-program">Cumulative 
Discounts for a Loyalty Program</h5>
+                <ul>
+                    <li><strong>Idea:</strong> A stream of purchase events 
contains user IDs and transaction amounts. Use a state
+                        store
+                        to accumulate the total spending of each user. When 
their total crosses a threshold, apply a discount on
+                        their next transaction and update their accumulated 
total.</li>
+                    <li><strong>Real-World Context:</strong> In a retail 
loyalty program, tracking cumulative customer spending
+                        enables
+                        dynamic rewards, such as issuing a discount when a 
customer's total purchases exceed a predefined limit.
+                    </li>
+                </ul>
+                <pre class="line-numbers"><code class="language-java">public 
class CumulativeDiscountsForALoyaltyProgramExample {
+    private static final double DISCOUNT_THRESHOLD = 100.0;
+    private static final String CUSTOMER_SPENDING_STORE = 
&quot;customer-spending-store&quot;;
+    private static final String DISCOUNT_NOTIFICATION_MESSAGE =
+            &quot;Discount applied! You have received a reward for your 
purchases.&quot;;
+    private static final String DISCOUNT_NOTIFICATIONS_TOPIC = 
&quot;discount-notifications-topic&quot;;
+    private static final String PURCHASE_EVENTS_TOPIC = 
&quot;purchase-events-topic&quot;;
+
+    public static void applyDiscountWithProcess(final StreamsBuilder builder) {
+        // Define the state store for tracking cumulative spending
+        builder.addStateStore(
+                Stores.keyValueStoreBuilder(
+                        Stores.inMemoryKeyValueStore(CUSTOMER_SPENDING_STORE),
+                        Serdes.String(),
+                        Serdes.Double()
+                )
+        );
+        final KStream&lt;String, Double&gt; purchaseStream = 
builder.stream(PURCHASE_EVENTS_TOPIC);
+        // Apply the Processor with the state store
+        final KStream&lt;String, String&gt; notificationStream =
+                purchaseStream.process(CumulativeDiscountProcessor::new, 
CUSTOMER_SPENDING_STORE);
+        // Send the notifications to the output topic
+        notificationStream.to(DISCOUNT_NOTIFICATIONS_TOPIC);
+    }
+
+    private static class CumulativeDiscountProcessor implements 
Processor&lt;String, Double, String, String&gt; {
+        private KeyValueStore&lt;String, Double&gt; spendingStore;
+        private ProcessorContext&lt;String, String&gt; context;
+
+        @Override
+        public void init(final ProcessorContext&lt;String, String&gt; context) 
{
+            this.context = context;
+            // Retrieve the state store for cumulative spending
+            spendingStore = context.getStateStore(CUSTOMER_SPENDING_STORE);
+        }
+
+        @Override
+        public void process(final Record&lt;String, Double&gt; record) {
+            if (record.value() == null) {
+                return; // Skip null purchase amounts
+            }
 
+            // Get the current spending total for the customer
+            Double currentSpending = spendingStore.get(record.key());
+            if (currentSpending == null) {
+                currentSpending = 0.0;
+            }
+            // Update the cumulative spending
+            currentSpending += record.value();
+            spendingStore.put(record.key(), currentSpending);
+
+            // Check if the customer qualifies for a discount
+            if (currentSpending &gt;= DISCOUNT_THRESHOLD) {
+                // Reset the spending after applying the discount
+                spendingStore.put(record.key(), currentSpending - 
DISCOUNT_THRESHOLD);
+                // Send a discount notification
+                
context.forward(record.withValue(DISCOUNT_NOTIFICATION_MESSAGE));
+            }
+        }
+    }
 }</code></pre>
-                <div class="admonition tip">
-                    <p><b>Tip</b></p>
-                    <p class="last">Even though we do not demonstrate it in 
this example, a stream processor can access any available state stores by
-                        calling <code class="docutils literal"><span 
class="pre">ProcessorContext#getStateStore()</span></code>.
-                        State stores are only available if they have been 
connected to the processor, or if they are global stores.  While global stores 
do not need to be connected explicitly, they only allow for read-only access.
-                        There are two ways to connect state stores to a 
processor:
-                    <ul class="simple">
-                        <li>By passing the name of a store that has already 
been added via <code class="docutils literal"><span 
class="pre">Topology#addStateStore()</span></code> to the corresponding <code 
class="docutils literal"><span class="pre">KStream#process()</span></code> 
method call.</li>
-                        <li>Implementing <code class="docutils literal"><span 
class="pre">ConnectedStoreProvider#stores()</span></code> on the <code 
class="docutils literal"><span class="pre">ProcessorSupplier</span></code>
-                            passed to <code class="docutils literal"><span 
class="pre">KStream#process()</span></code>.  In this case there is no need to 
call <code class="docutils literal"><span 
class="pre">StreamsBuilder#addStateStore()</span></code>
-                            beforehand, the store will be automatically added 
for you. You can also implement <code class="docutils literal"><span 
class="pre">ConnectedStoreProvider#stores()</span></code> on the
-                            <code class="docutils literal"><span 
class="pre">Value*</span></code> or <code class="docutils literal"><span 
class="pre">*WithKey</span></code> supplier variants, or <code class="docutils 
literal"><span class="pre">TransformerSupplier</span></code> or any of its 
variants.
-                        </li>
-                    </ul>
-                </div>
-                <p>Then we can leverage the <code class="docutils 
literal"><span class="pre">PopularPageEmailAlert</span></code> processor in the 
DSL via <code class="docutils literal"><span 
class="pre">KStream#process</span></code>.</p>
-                <pre class="line-numbers"><code 
class="language-java">KStream&lt;String, GenericRecord&gt; pageViews = ...;
-
-// Send an email notification when the view count of a page reaches one 
thousand.
-pageViews.groupByKey()
-         .count()
-         .filter((PageId pageId, Long viewCount) -&gt; viewCount == 1000)
-         // PopularPageEmailAlert is your custom processor that implements the
-         // `Processor` interface, see further down below.
-         .process(() -&gt; new 
PopularPageEmailAlert(&quot;[email protected]&quot;));</code></pre>
-        </div>
+                <h5 id="traffic-radar-monitoring-car-count">Traffic Radar 
Monitoring Car Count</h5>
+                <ul>
+                    <li><strong>Idea:</strong> A radar monitors cars passing 
along a road stretch. A system counts the cars for each
+                        day, maintaining a cumulative total for the current 
day in a state store. At the end of the day, the count
+                        is
+                        emitted and the state is cleared for the next day.</li>
+                    <li><strong>Real-World Context:</strong> A car counting 
system can be useful for determining measures for
+                        widening
+                        or controlling traffic depending on the number of cars 
passing through the monitored stretch.</li>
+                </ul>
+                <pre class="line-numbers"><code class="language-java">public 
class TrafficRadarMonitoringCarCountExample {
+    private static final String DAILY_COUNT_STORE = 
&quot;price-state-store&quot;;
+    private static final String DAILY_COUNT_TOPIC = 
&quot;price-state-topic&quot;;
+    private static final String RADAR_COUNT_TOPIC = 
&quot;car-radar-topic&quot;;
+
+    public static void countWithProcessValues(final StreamsBuilder builder) {
+        // Define a state store for tracking daily car counts
+        builder.addStateStore(
+                Stores.keyValueStoreBuilder(
+                        Stores.inMemoryKeyValueStore(DAILY_COUNT_STORE),
+                        Serdes.String(),
+                        Serdes.Long()
+                )
+        );
+        final KStream&lt;Void, String&gt; radarStream = 
builder.stream(RADAR_COUNT_TOPIC);
+        // Apply the FixedKeyProcessor with the state store
+        radarStream.processValues(DailyCarCountProcessor::new, 
DAILY_COUNT_STORE)
+                .to(DAILY_COUNT_TOPIC);
+    }
+
+    private static class DailyCarCountProcessor implements 
FixedKeyProcessor&lt;Void, String, String&gt; {
+        private FixedKeyProcessorContext&lt;Void, String&gt; context;
+        private KeyValueStore&lt;String, Long&gt; stateStore;
+        private static final DateTimeFormatter DATE_FORMATTER =
+                
DateTimeFormatter.ofPattern(&quot;yyyy-MM-dd&quot;).withZone(ZoneId.systemDefault());
+
+        @Override
+        public void init(final FixedKeyProcessorContext&lt;Void, String&gt; 
context) {
+            this.context = context;
+            stateStore = context.getStateStore(DAILY_COUNT_STORE);
+        }
+
+        @Override
+        public void process(final FixedKeyRecord&lt;Void, String&gt; record) {
+            if (record.value() == null) {
+                return; // Skip null events
+            }
+
+            // Derive the current day from the event timestamp
+            final long timestamp = System.currentTimeMillis(); // Use system 
time for simplicity
+            final String currentDay = 
DATE_FORMATTER.format(Instant.ofEpochMilli(timestamp));
+            // Retrieve the current count for the day
+            Long dailyCount = stateStore.get(currentDay);
+            if (dailyCount == null) {
+                dailyCount = 0L;
+            }
+            // Increment the count
+            dailyCount++;
+            stateStore.put(currentDay, dailyCount);
+
+            // Emit the current day&#39;s count
+            context.forward(record.withValue(String.format(&quot;Day: %s, Car 
Count: %s&quot;, currentDay, dailyCount)));
+        }
+    }
+}</code></pre>
+                
+                <h4>Keynotes</h4>
+                <ul>
+                    <li><strong>Type Safety and Flexibility:</strong> The 
process and processValues APIs utilize
+                        <code>ProcessorContext</code> and <code>Record</code> 
or <code>FixedKeyRecord</code> objects for better type
+                        safety and flexibility of custom processing logic.
+                    </li>
+                    <li><strong>Clear State and Logic Management:</strong> 
Implementations for <code>Processor</code> or
+                        <code>FixedKeyProcessor</code> should manage state and 
logic clearly. Use <code>context().forward()</code>
+                        for emitting records downstream.
+                    </li>
+                    <li><strong>Unified API:</strong> Consolidates multiple 
methods into a single, versatile API.</li>
+                    <li><strong>Future-Proof:</strong> Ensures compatibility 
with the latest Kafka Streams releases.</li>
+                </ul>
+            </div>
+            <div class="section" 
id="transformers-removal-and-migration-to-processors">
+                <a class="headerlink" 
href="#transformers-removal-and-migration-to-processors" title="Permalink to 
this headline">
+                    <h3>
+                        <a class="toc-backref" href="#id37">Transformers 
removal and migration to processors</a>
+                    </h3>
+                </a>
+                <p>As of Kafka 4.0, several deprecated methods in the Kafka 
Streams API, such as <code>transform</code>,
+                    <code>flatTransform</code>, <code>transformValues</code>, 
<code>flatTransformValues</code>, and
+                    <code>process</code> have been removed. These methods have 
been replaced with the more versatile Processor API.
+                    This guide provides detailed steps for migrating existing 
code to use the new Processor API and explains the
+                    benefits of the changes.</p>
+                <p>The following deprecated methods are no longer available in 
Kafka Streams:</p>
+                <ul>
+                    <li><code>KStream#transform</code></li>
+                    <li><code>KStream#flatTransform</code></li>
+                    <li><code>KStream#transformValues</code></li>
+                    <li><code>KStream#flatTransformValues</code></li>
+                    <li><code>KStream#process</code></li>
+                </ul>
+                <p>The Processor API now serves as a unified replacement for 
all these methods. It simplifies the API surface
+                    while maintaining support for both stateless and stateful 
operations.</p>
+                <h4>Migration Examples</h4>
+                <p>To migrate from the deprecated <code>transform</code>, 
<code>transformValues</code>, <code>flatTransform</code>, and
+                    <code>flatTransformValues</code> methods to the Processor 
API (PAPI) in Kafka Streams, let&#39;s resume the
+                    previouss examples. The new <code>process</code> and 
<code>processValues</code> methods enable a more flexible
+                    and reusable approach by requiring implementations of the 
<code>Processor</code>
+                    or <code>FixedKeyProcessor</code> interfaces.</p>
+                <table>
+                    <thead>
+                        <tr>
+                            <th>Example</th>
+                            <th>Migrating from</th>
+                            <th>Migrating to</th>
+                            <th>State Type</th>
+                        </tr>
+                    </thead>
+                    <tbody>
+                        <tr>
+                            <td><a 
href="#categorizing-logs-by-severity-removal">Categorizing Logs by 
Severity</a></td>
+                            <td><code>flatTransform</code></td>
+                            <td><code>process</code></td>
+                            <td>Stateless</td>
+                        </tr>
+                        <tr>
+                            <td><a 
href="#replacing-slang-in-text-messages-removal">Replacing Slang in Text 
Messages</a></td>
+                            <td><code>flatTransformValues</code></td>
+                            <td><code>processValues</code></td>
+                            <td>Stateless</td>
+                        </tr>
+                        <tr>
+                            <td><a 
href="#cumulative-discounts-for-a-loyalty-program-removal">Cumulative Discounts 
for a Loyalty Program</a>
+                            </td>
+                            <td><code>transform</code></td>
+                            <td><code>process</code></td>
+                            <td>Stateful</td>
+                        </tr>
+                        <tr>
+                            <td><a 
href="#traffic-radar-monitoring-car-count-removal">Traffic Radar Monitoring Car 
Count</a></td>
+                            <td><code>transformValues</code></td>
+                            <td><code>processValues</code></td>
+                            <td>Stateful</td>
+                        </tr>
+                    </tbody>
+                </table>
+                <h5 id="categorizing-logs-by-severity-removal">Categorizing 
Logs by Severity</h5>
+                <p>Below, methods <code>categorizeWithFlatTransform</code> and 
<code>categorizeWithProcess</code> show how you can
+                    migrate from <code>flatTransform</code> to 
<code>process</code>.</p>
+                <pre class="line-numbers"><code class="language-java">public 
class CategorizingLogsBySeverityExample {
+    private static final String ERROR_LOGS_TOPIC = 
&quot;error-logs-topic&quot;;
+    private static final String INPUT_LOGS_TOPIC = 
&quot;input-logs-topic&quot;;
+    private static final String UNKNOWN_LOGS_TOPIC = 
&quot;unknown-logs-topic&quot;;
+    private static final String WARN_LOGS_TOPIC = &quot;warn-logs-topic&quot;;
+
+    public static void categorizeWithFlatTransform(final StreamsBuilder 
builder) {
+        final KStream&lt;String, String&gt; logStream = 
builder.stream(INPUT_LOGS_TOPIC);
+        logStream.flatTransform(LogSeverityTransformer::new)
+                .to((key, value, recordContext) -&gt; {
+                    // Determine the target topic dynamically
+                    if (&quot;ERROR&quot;.equals(key)) return ERROR_LOGS_TOPIC;
+                    if (&quot;WARN&quot;.equals(key)) return WARN_LOGS_TOPIC;
+                    return UNKNOWN_LOGS_TOPIC;
+                });
+    }
+
+    public static void categorizeWithProcess(final StreamsBuilder builder) {
+        final KStream&lt;String, String&gt; logStream = 
builder.stream(INPUT_LOGS_TOPIC);
+        logStream.process(LogSeverityProcessor::new)
+                .to((key, value, recordContext) -&gt; {
+                    // Determine the target topic dynamically
+                    if (&quot;ERROR&quot;.equals(key)) return ERROR_LOGS_TOPIC;
+                    if (&quot;WARN&quot;.equals(key)) return WARN_LOGS_TOPIC;
+                    return UNKNOWN_LOGS_TOPIC;
+                });
+    }
+
+    private static class LogSeverityTransformer implements 
Transformer&lt;String, String, Iterable&lt;KeyValue&lt;String, 
String&gt;&gt;&gt; {
+        @Override
+        public void init(org.apache.kafka.streams.processor.ProcessorContext 
context) {
+        }
+
+        @Override
+        public Iterable&lt;KeyValue&lt;String, String&gt;&gt; transform(String 
key, String value) {
+            if (value == null) {
+                return Collections.emptyList(); // Skip null values
+            }
+
+            // Assume the severity is the first word in the log message
+            // For example: &quot;ERROR: Disk not found&quot; -&gt; 
&quot;ERROR&quot;
+            int colonIndex = value.indexOf(&#39;:&#39;);
+            String severity = colonIndex &gt; 0 ? value.substring(0, 
colonIndex).trim() : &quot;UNKNOWN&quot;;
+
+            // Create appropriate KeyValue pair based on severity
+            return switch (severity) {
+                case &quot;ERROR&quot; -&gt; List.of(new 
KeyValue&lt;&gt;(&quot;ERROR&quot;, value));
+                case &quot;WARN&quot; -&gt; List.of(new 
KeyValue&lt;&gt;(&quot;WARN&quot;, value));
+                case &quot;INFO&quot; -&gt; Collections.emptyList(); // INFO 
logs are ignored
+                default -&gt; List.of(new 
KeyValue&lt;&gt;(&quot;UNKNOWN&quot;, value));
+            };
+        }
+
+        @Override
+        public void close() {
+        }
+    }
+
+    private static class LogSeverityProcessor extends 
ContextualProcessor&lt;String, String, String, String&gt; {
+        @Override
+        public void process(final Record&lt;String, String&gt; record) {
+            if (record.value() == null) {
+                return; // Skip null values
+            }
+
+            // Assume the severity is the first word in the log message
+            // For example: &quot;ERROR: Disk not found&quot; -&gt; 
&quot;ERROR&quot;
+            final int colonIndex = record.value().indexOf(&#39;:&#39;);
+            final String severity = colonIndex &gt; 0 ? 
record.value().substring(0, colonIndex).trim() : &quot;UNKNOWN&quot;;
+
+            // Route logs based on severity
+            switch (severity) {
+                case &quot;ERROR&quot;:
+                    context().forward(record.withKey(ERROR_LOGS_TOPIC));
+                    break;
+                case &quot;WARN&quot;:
+                    context().forward(record.withKey(WARN_LOGS_TOPIC));
+                    break;
+                case &quot;INFO&quot;:
+                    // INFO logs are ignored
+                    break;
+                default:
+                    // Forward to an &quot;unknown&quot; topic for logs with 
unrecognized severities
+                    context().forward(record.withKey(UNKNOWN_LOGS_TOPIC));
+            }
+        }
+    }
+}</code></pre>
+                <h5 id="replacing-slang-in-text-messages-removal">Replacing 
Slang in Text Messages</h5>
+                <p>Below, methods <code>replaceWithFlatTransformValues</code> 
and <code>replaceWithProcessValues</code> show how you can
+                    migrate from <code>flatTransformValues</code> to 
<code>processValues</code>.</p>
+                <pre class="line-numbers"><code class="language-java">public 
class ReplacingSlangTextInMessagesExample {
+    private static final Map&lt;String, String&gt; SLANG_DICTIONARY = Map.of(
+            &quot;u&quot;, &quot;you&quot;,
+            &quot;brb&quot;, &quot;be right back&quot;,
+            &quot;omg&quot;, &quot;oh my god&quot;,
+            &quot;btw&quot;, &quot;by the way&quot;
+    );
+    private static final String INPUT_MESSAGES_TOPIC = 
&quot;input-messages-topic&quot;;
+    private static final String OUTPUT_MESSAGES_TOPIC = 
&quot;output-messages-topic&quot;;
+
+    public static void replaceWithFlatTransformValues(final StreamsBuilder 
builder) {
+        KStream&lt;String, String&gt; messageStream = 
builder.stream(INPUT_MESSAGES_TOPIC);
+        
messageStream.flatTransformValues(SlangReplacementTransformer::new).to(OUTPUT_MESSAGES_TOPIC);
+    }
+
+    public static void replaceWithProcessValues(final StreamsBuilder builder) {
+        KStream&lt;String, String&gt; messageStream = 
builder.stream(INPUT_MESSAGES_TOPIC);
+        
messageStream.processValues(SlangReplacementProcessor::new).to(OUTPUT_MESSAGES_TOPIC);
+    }
+
+    private static class SlangReplacementTransformer implements 
ValueTransformer&lt;String, Iterable&lt;String&gt;&gt; {
+
+        @Override
+        public void init(final 
org.apache.kafka.streams.processor.ProcessorContext context) {
+        }
+
+        @Override
+        public Iterable&lt;String&gt; transform(final String value) {
+            if (value == null) {
+                return Collections.emptyList(); // Skip null values
+            }
+
+            // Replace slang words in the message
+            final String[] words = value.split(&quot;\\s+&quot;);
+            return Arrays.asList(
+                    Arrays.stream(words)
+                            .map(word -&gt; 
SLANG_DICTIONARY.getOrDefault(word, word))
+                            .toArray(String[]::new)
+            );
+        }
+
+        @Override
+        public void close() {
+        }
+    }
+
+    private static class SlangReplacementProcessor extends 
ContextualFixedKeyProcessor&lt;String, String, String&gt; {
+        @Override
+        public void process(final FixedKeyRecord&lt;String, String&gt; record) 
{
+            if (record.value() == null) {
+                return; // Skip null values
+            }
+
+            // Replace slang words in the message
+            final String[] words = record.value().split(&quot;\\s+&quot;);
+            for (final String word : words) {
+                String replacedWord = SLANG_DICTIONARY.getOrDefault(word, 
word);
+                context().forward(record.withValue(replacedWord));
+            }
+        }
+    }
+}</code></pre>
+                <h5 
id="cumulative-discounts-for-a-loyalty-program-removal">Cumulative Discounts 
for a Loyalty Program</h5>
+                <pre class="line-numbers"><code class="language-java">public 
class CumulativeDiscountsForALoyaltyProgramExample {
+    private static final double DISCOUNT_THRESHOLD = 100.0;
+    private static final String CUSTOMER_SPENDING_STORE = 
&quot;customer-spending-store&quot;;
+    private static final String DISCOUNT_NOTIFICATION_MESSAGE =
+            &quot;Discount applied! You have received a reward for your 
purchases.&quot;;
+    private static final String DISCOUNT_NOTIFICATIONS_TOPIC = 
&quot;discount-notifications-topic&quot;;
+    private static final String PURCHASE_EVENTS_TOPIC = 
&quot;purchase-events-topic&quot;;
+
+    public static void applyDiscountWithTransform(final StreamsBuilder 
builder) {
+        // Define the state store for tracking cumulative spending
+        builder.addStateStore(
+                Stores.keyValueStoreBuilder(
+                        Stores.inMemoryKeyValueStore(CUSTOMER_SPENDING_STORE),
+                        Serdes.String(),
+                        Serdes.Double()
+                )
+        );
+        final KStream&lt;String, Double&gt; purchaseStream = 
builder.stream(PURCHASE_EVENTS_TOPIC);
+        // Apply the Transformer with the state store
+        final KStream&lt;String, String&gt; notificationStream =
+                purchaseStream.transform(CumulativeDiscountTransformer::new, 
CUSTOMER_SPENDING_STORE);
+        // Send the notifications to the output topic
+        notificationStream.to(DISCOUNT_NOTIFICATIONS_TOPIC);
+    }
+
+    public static void applyDiscountWithProcess(final StreamsBuilder builder) {
+        // Define the state store for tracking cumulative spending
+        builder.addStateStore(
+                Stores.keyValueStoreBuilder(
+                        Stores.inMemoryKeyValueStore(CUSTOMER_SPENDING_STORE),
+                        org.apache.kafka.common.serialization.Serdes.String(),
+                        org.apache.kafka.common.serialization.Serdes.Double()
+                )
+        );
+        final KStream&lt;String, Double&gt; purchaseStream = 
builder.stream(PURCHASE_EVENTS_TOPIC);
+        // Apply the Processor with the state store
+        final KStream&lt;String, String&gt; notificationStream =
+                purchaseStream.process(CumulativeDiscountProcessor::new, 
CUSTOMER_SPENDING_STORE);
+        // Send the notifications to the output topic
+        notificationStream.to(DISCOUNT_NOTIFICATIONS_TOPIC);
+    }
+
+    private static class CumulativeDiscountTransformer implements 
Transformer&lt;String, Double, KeyValue&lt;String, String&gt;&gt; {
+        private KeyValueStore&lt;String, Double&gt; spendingStore;
+
+        @Override
+        public void init(final 
org.apache.kafka.streams.processor.ProcessorContext context) {
+            // Retrieve the state store for cumulative spending
+            spendingStore = context.getStateStore(CUSTOMER_SPENDING_STORE);
+        }
+
+        @Override
+        public KeyValue&lt;String, String&gt; transform(final String key, 
final Double value) {
+            if (value == null) {
+                return null; // Skip null purchase amounts
+            }
+
+            // Get the current spending total for the customer
+            Double currentSpending = spendingStore.get(key);
+            if (currentSpending == null) {
+                currentSpending = 0.0;
+            }
+            // Update the cumulative spending
+            currentSpending += value;
+            spendingStore.put(key, currentSpending);
+
+            // Check if the customer qualifies for a discount
+            if (currentSpending &gt;= DISCOUNT_THRESHOLD) {
+                // Reset the spending after applying the discount
+                spendingStore.put(key, currentSpending - DISCOUNT_THRESHOLD);
+                // Return a notification message
+                return new KeyValue&lt;&gt;(key, 
DISCOUNT_NOTIFICATION_MESSAGE);
+            }
+            return null; // No discount, so no output for this record
+        }
+
+        @Override
+        public void close() {
+        }
+    }
+
+    private static class CumulativeDiscountProcessor implements 
Processor&lt;String, Double, String, String&gt; {
+        private KeyValueStore&lt;String, Double&gt; spendingStore;
+        private ProcessorContext&lt;String, String&gt; context;
+
+        @Override
+        public void init(final ProcessorContext&lt;String, String&gt; context) 
{
+            this.context = context;
+            // Retrieve the state store for cumulative spending
+            spendingStore = context.getStateStore(CUSTOMER_SPENDING_STORE);
+        }
+
+        @Override
+        public void process(final Record&lt;String, Double&gt; record) {
+            if (record.value() == null) {
+                return; // Skip null purchase amounts
+            }
+
+            // Get the current spending total for the customer
+            Double currentSpending = spendingStore.get(record.key());
+            if (currentSpending == null) {
+                currentSpending = 0.0;
+            }
+            // Update the cumulative spending
+            currentSpending += record.value();
+            spendingStore.put(record.key(), currentSpending);
+
+            // Check if the customer qualifies for a discount
+            if (currentSpending &gt;= DISCOUNT_THRESHOLD) {
+                // Reset the spending after applying the discount
+                spendingStore.put(record.key(), currentSpending - 
DISCOUNT_THRESHOLD);
+                // Send a discount notification
+                
context.forward(record.withValue(DISCOUNT_NOTIFICATION_MESSAGE));
+            }
+        }
+    }
+}</code></pre>
+                <h5 id="traffic-radar-monitoring-car-count-removal">Traffic 
Radar Monitoring Car Count</h5>
+                <p>Below, methods <code>countWithTransformValues</code> and 
<code>countWithProcessValues</code> show how you can migrate
+                    from <code>transformValues</code> to 
<code>processValues</code>.</p>
+                <pre class="line-numbers"><code class="language-java">public 
class TrafficRadarMonitoringCarCountExample {
+    private static final String DAILY_COUNT_STORE = 
&quot;price-state-store&quot;;
+    private static final String DAILY_COUNT_TOPIC = 
&quot;price-state-topic&quot;;
+    private static final String RADAR_COUNT_TOPIC = 
&quot;car-radar-topic&quot;;
+
+    public static void countWithTransformValues(final StreamsBuilder builder) {
+        // Define a state store for tracking daily car counts
+        builder.addStateStore(
+                Stores.keyValueStoreBuilder(
+                        Stores.inMemoryKeyValueStore(DAILY_COUNT_STORE),
+                        org.apache.kafka.common.serialization.Serdes.String(),
+                        org.apache.kafka.common.serialization.Serdes.Long()
+                )
+        );
+        final KStream&lt;Void, String&gt; radarStream = 
builder.stream(RADAR_COUNT_TOPIC);
+        // Apply the ValueTransformer with the state store
+        radarStream.transformValues(DailyCarCountTransformer::new, 
DAILY_COUNT_STORE)
+                .to(DAILY_COUNT_TOPIC);
+    }
+
+    public static void countWithProcessValues(final StreamsBuilder builder) {
+        // Define a state store for tracking daily car counts
+        builder.addStateStore(
+                Stores.keyValueStoreBuilder(
+                        Stores.inMemoryKeyValueStore(DAILY_COUNT_STORE),
+                        org.apache.kafka.common.serialization.Serdes.String(),
+                        org.apache.kafka.common.serialization.Serdes.Long()
+                )
+        );
+        final KStream&lt;Void, String&gt; radarStream = 
builder.stream(RADAR_COUNT_TOPIC);
+        // Apply the FixedKeyProcessor with the state store
+        radarStream.processValues(DailyCarCountProcessor::new, 
DAILY_COUNT_STORE)
+                .to(DAILY_COUNT_TOPIC);
+    }
+
+    private static class DailyCarCountTransformer implements 
ValueTransformerWithKey&lt;Void, String, String&gt; {
+        private KeyValueStore&lt;String, Long&gt; stateStore;
+        private static final DateTimeFormatter DATE_FORMATTER =
+                
DateTimeFormatter.ofPattern(&quot;yyyy-MM-dd&quot;).withZone(ZoneId.systemDefault());
+
+        @Override
+        public void init(final 
org.apache.kafka.streams.processor.ProcessorContext context) {
+            // Access the state store
+            stateStore = context.getStateStore(DAILY_COUNT_STORE);
+        }
+
+        @Override
+        public String transform(Void readOnlyKey, String value) {
+            if (value == null) {
+                return null; // Skip null events
+            }
+
+            // Derive the current day from the event timestamp
+            final long timestamp = System.currentTimeMillis(); // Use system 
time for simplicity
+            final String currentDay = 
DATE_FORMATTER.format(Instant.ofEpochMilli(timestamp));
+            // Retrieve the current count for the day
+            Long dailyCount = stateStore.get(currentDay);
+            if (dailyCount == null) {
+                dailyCount = 0L;
+            }
+            // Increment the count
+            dailyCount++;
+            stateStore.put(currentDay, dailyCount);
+
+            // Return the current day&#39;s count
+            return String.format(&quot;Day: %s, Car Count: %s&quot;, 
currentDay, dailyCount);
+        }
+
+        @Override
+        public void close() {
+        }
+    }
+
+    private static class DailyCarCountProcessor implements 
FixedKeyProcessor&lt;Void, String, String&gt; {
+        private FixedKeyProcessorContext&lt;Void, String&gt; context;
+        private KeyValueStore&lt;String, Long&gt; stateStore;
+        private static final DateTimeFormatter DATE_FORMATTER =
+                
DateTimeFormatter.ofPattern(&quot;yyyy-MM-dd&quot;).withZone(ZoneId.systemDefault());
+
+        @Override
+        public void init(final FixedKeyProcessorContext&lt;Void, String&gt; 
context) {
+            this.context = context;
+            stateStore = context.getStateStore(DAILY_COUNT_STORE);
+        }
+
+        @Override
+        public void process(final FixedKeyRecord&lt;Void, String&gt; record) {
+            if (record.value() == null) {
+                return; // Skip null events
+            }
+
+            // Derive the current day from the event timestamp
+            final long timestamp = System.currentTimeMillis(); // Use system 
time for simplicity
+            final String currentDay = 
DATE_FORMATTER.format(Instant.ofEpochMilli(timestamp));
+            // Retrieve the current count for the day
+            Long dailyCount = stateStore.get(currentDay);
+            if (dailyCount == null) {
+                dailyCount = 0L;
+            }
+            // Increment the count
+            dailyCount++;
+            stateStore.put(currentDay, dailyCount);
+
+            // Emit the current day&#39;s count
+            context.forward(record.withValue(String.format(&quot;Day: %s, Car 
Count: %s&quot;, currentDay, dailyCount)));
+        }
+    }
+}</code></pre>
+                <h4>Keynotes</h4>
+                <ul>
+                    <li><strong>Type Safety and Flexibility:</strong> The 
process and processValues APIs utilize
+                        <code>ProcessorContext</code> and <code>Record</code> 
or <code>FixedKeyRecord</code> objects for better type
+                        safety and flexibility of custom processing logic.</li>
+                    <li><strong>Clear State and Logic Management:</strong> 
Implementations for <code>Processor</code> or
+                        <code>FixedKeyProcessor</code> should manage state and 
logic clearly. Use <code>context().forward()</code>
+                        for emitting records downstream.</li>
+                    <li><strong>Unified API:</strong> Consolidates multiple 
methods into a single, versatile API.</li>
+                    <li><strong>Future-Proof:</strong> Ensures compatibility 
with the latest Kafka Streams releases.</li>
+                </ul>
+                <h4>Removal of Old <code>process</code> Method</h4>
+                <p>It is worth mentioning that, in addition to the methods 
mentioned above, the <code>process</code> method, which
+                    integrated the &#39;old&#39; Processor API (i.e., 
<code>Processor</code> as opposed to the new
+                    <code>api.Processor</code>) into the DSL, has also been 
removed. The following example shows how to migrate to the
+                    new <code>process</code>.
+                </p>
+                <h5>Example</h5>
+                <ul>
+                    <li>
+                        <p><strong>Idea:</strong> The system monitors page 
views for a website in real-time. When a page reaches a
+                            predefined popularity threshold (e.g., 1000 
views), the system automatically sends an email alert to the
+                            site administrator or marketing team to notify 
them of the page&#39;s success. This helps teams quickly
+                            identify high-performing content and act on it, 
such as promoting the page further or analyzing the
+                            traffic source.</p>
+                    </li>
+                    <li>
+                        <p><strong>Real-World Context:</strong> In a content 
management system (CMS) for a news or blogging platform,
+                            it&#39;s crucial to track the popularity of 
articles or posts. For example:</p>
+                        <ul>
+                            <li><strong>Marketing Teams:</strong> Use the 
notification to highlight trending content on social media or
+                                email newsletters.</li>
+                            <li><strong>Operations Teams:</strong> Use the 
alert to ensure the site can handle increased traffic for
+                                popular pages.</li>
+                            <li><strong>Ad Managers:</strong> Identify pages 
where additional ad placements might maximize revenue.</li>
+                        </ul>
+                        <p>By automating the detection of popular pages, the 
system eliminates the need for manual monitoring and
+                            ensures timely actions to capitalize on the 
content&#39;s performance.</p>
+                    </li>
+                </ul>
+                <pre class="line-numbers"><code class="language-java">public 
class PopularPageEmailAlertExample {
+    private static final String ALERTS_EMAIL = 
&quot;[email protected]&quot;;
+    private static final String PAGE_VIEWS_TOPIC = 
&quot;page-views-topic&quot;;
+
+    public static void alertWithOldProcess(StreamsBuilder builder) {
+        KStream&lt;String, Long&gt; pageViews = 
builder.stream(PAGE_VIEWS_TOPIC);
+        // Filter pages with exactly 1000 views and process them using the old 
API
+        pageViews.filter((pageId, viewCount) -&gt; viewCount == 1000)
+                .process(PopularPageEmailAlertOld::new);
+    }
+
+    public static void alertWithNewProcess(StreamsBuilder builder) {
+        KStream&lt;String, Long&gt; pageViews = 
builder.stream(PAGE_VIEWS_TOPIC);
+        // Filter pages with exactly 1000 views and process them using the new 
API
+        pageViews.filter((pageId, viewCount) -&gt; viewCount == 1000)
+                .process(PopularPageEmailAlertNew::new);
+    }
+
+    private static class PopularPageEmailAlertOld extends 
AbstractProcessor&lt;String, Long&gt; {
+        @Override
+        public void init(org.apache.kafka.streams.processor.ProcessorContext 
context) {
+            super.init(context);
+            System.out.println(&quot;Initialized email client for: &quot; + 
ALERTS_EMAIL);
+        }
+
+        @Override
+        public void process(String key, Long value) {
+            if (value == null) return;
+
+            if (value == 1000) {
+                // Send an email alert
+                System.out.printf(&quot;ALERT (Old API): Page %s has reached 
1000 views. Sending email to %s%n&quot;, key, ALERTS_EMAIL);
+            }
+        }
+
+        @Override
+        public void close() {
+            System.out.println(&quot;Tearing down email client for: &quot; + 
ALERTS_EMAIL);
+        }
+    }
+
+    private static class PopularPageEmailAlertNew implements 
Processor&lt;String, Long, Void, Void&gt; {
+        @Override
+        public void init(ProcessorContext&lt;Void, Void&gt; context) {
+            System.out.println(&quot;Initialized email client for: &quot; + 
ALERTS_EMAIL);
+        }
+
+        @Override
+        public void process(Record&lt;String, Long&gt; record) {
+            if (record.value() == null) return;
+
+            if (record.value() == 1000) {
+                // Send an email alert
+                System.out.printf(&quot;ALERT (New API): Page %s has reached 
1000 views. Sending email to %s%n&quot;, record.key(), ALERTS_EMAIL);
+            }
+        }
+
+        @Override
+        public void close() {
+            System.out.println(&quot;Tearing down email client for: &quot; + 
ALERTS_EMAIL);
+        }
+    }
+}</code></pre>
+            </div>
         <div class="section" id="naming-a-streams-app">
             <a class="headerlink" href="#naming-a-streams-app" 
title="Permalink to this headline"><h2><a class="toc-backref" 
href="#id33">Naming Operators in a Streams DSL application</a></h2></a>
             Kafka Streams allows you to <a class="reference internal" 
href="dsl-topology-naming.html">name processors</a> created via the Streams DSL

Reply via email to