This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new c438ee91668 MINOR: Kafka Streams doc updates for 4.0 release (#18488)
c438ee91668 is described below

commit c438ee9166897026f896c8b42f3495d95d0ce42f
Author: Matthias J. Sax <matth...@confluent.io>
AuthorDate: Wed Jan 15 11:58:53 2025 -0800

    MINOR: Kafka Streams doc updates for 4.0 release (#18488)
    
    Reviewers: Bill Bejeck <b...@confluent.io>
---
 docs/streams/developer-guide/processor-api.html    | 114 ++++++++++-----------
 docs/streams/developer-guide/testing.html          |   2 +-
 .../org/apache/kafka/streams/StreamsBuilder.java   |   8 +-
 .../apache/kafka/streams/TopologyDescription.java  |   2 +-
 .../apache/kafka/streams/kstream/ValueMapper.java  |   8 +-
 .../kafka/streams/kstream/ValueMapperWithKey.java  |   7 +-
 .../kafka/streams/processor/ProcessorContext.java  |   7 +-
 .../ForwardingDisabledProcessorContext.java        |   3 +-
 .../processor/internals/ProcessorContextImpl.java  |   6 +-
 9 files changed, 73 insertions(+), 84 deletions(-)

diff --git a/docs/streams/developer-guide/processor-api.html 
b/docs/streams/developer-guide/processor-api.html
index e25ecd601ac..72ed7fbe852 100644
--- a/docs/streams/developer-guide/processor-api.html
+++ b/docs/streams/developer-guide/processor-api.html
@@ -67,42 +67,40 @@
                 <p><b>Tip</b></p>
                 <p class="last"><strong>Combining the DSL and the Processor 
API:</strong>
                     You can combine the convenience of the DSL with the power 
and flexibility of the Processor API as described in the
-                    section <a class="reference internal" 
href="dsl-api.html#streams-developer-guide-dsl-process"><span class="std 
std-ref">Applying processors and transformers (Processor API 
integration)</span></a>.</p>
+                    section <a class="reference internal" 
href="dsl-api.html#streams-developer-guide-dsl-process"><span class="std 
std-ref">Applying processors (Processor API integration)</span></a>.</p>
             </div>
             <p>For a complete list of available API functionality, see the <a 
href="/{{version}}/javadoc/org/apache/kafka/streams/package-summary.html">Streams</a>
 API docs.</p>
         </div>
         <div class="section" id="defining-a-stream-processor">
             <span id="streams-developer-guide-stream-processor"></span><h2><a 
class="toc-backref" href="#id2">Defining a Stream Processor</a><a 
class="headerlink" href="#defining-a-stream-processor" title="Permalink to this 
headline"></a></h2>
-            <p>A <a class="reference internal" 
href="../core-concepts.html#streams_processor_node"><span class="std 
std-ref">stream processor</span></a> is a node in the processor topology that 
represents a single processing step.
-                With the Processor API, you can define arbitrary stream 
processors that processes one received record at a time, and connect
-                these processors with their associated state stores to compose 
the processor topology.</p>
-            <p>You can define a customized stream processor by implementing 
the <code class="docutils literal"><span class="pre">Processor</span></code> 
interface, which provides the <code class="docutils literal"><span 
class="pre">process()</span></code> API method.
-                The <code class="docutils literal"><span 
class="pre">process()</span></code> method is called on each of the received 
records.</p>
-            <p>The <code class="docutils literal"><span 
class="pre">Processor</span></code> interface also has an <code class="docutils 
literal"><span class="pre">init()</span></code> method, which is called by the 
Kafka Streams library during task construction
-                phase. Processor instances should perform any required 
initialization in this method. The <code class="docutils literal"><span 
class="pre">init()</span></code> method passes in a <code class="docutils 
literal"><span class="pre">ProcessorContext</span></code>
-                instance, which provides access to the metadata of the 
currently processed record, including its source Kafka topic and partition,
-                its corresponding message offset, and further such 
information. You can also use this context instance to schedule a punctuation
-                function (via <code class="docutils literal"><span 
class="pre">ProcessorContext#schedule()</span></code>), to forward a new record 
as a key-value pair to the downstream processors (via <code class="docutils 
literal"><span class="pre">ProcessorContext#forward()</span></code>),
-              and to commit the current processing progress (via <code 
class="docutils literal"><span 
class="pre">ProcessorContext#commit()</span></code>).
-              Any resources you set up in <code class="docutils literal"><span 
class="pre">init()</span></code> can be cleaned up in the
-              <code class="docutils literal"><span 
class="pre">close()</span></code> method. Note that Kafka Streams may re-use a 
single
-              <code class="docutils literal"><span 
class="pre">Processor</span></code> object by calling
-              <code class="docutils literal"><span 
class="pre">init()</span></code> on it again after <code class="docutils 
literal"><span class="pre">close()</span></code>.</p>
-          <p>
-            The <code class="docutils literal"><span 
class="pre">Processor</span></code> interface takes two sets of generic 
parameters:
+          <p>A <a class="reference internal" 
href="../core-concepts.html#streams_processor_node"><span class="std 
std-ref">stream processor</span></a> is a node in the processor topology that 
represents a single processing step.
+            With the Processor API, you can define arbitrary stream processors 
that processes one received record at a time, and connect
+            these processors with their associated state stores to compose the 
processor topology.</p>
+          <p>You can define a customized stream processor by implementing the 
<code class="docutils literal"><span class="pre">Processor</span></code> 
interface, which provides the <code class="docutils literal"><span 
class="pre">process()</span></code> API method.
+            The <code class="docutils literal"><span 
class="pre">process()</span></code> method is called on each of the received 
records.</p>
+          <p>The <code class="docutils literal"><span 
class="pre">Processor</span></code> interface also has an <code class="docutils 
literal"><span class="pre">init()</span></code> method, which is called by the 
Kafka Streams library during task construction
+            phase. Processor instances should perform any required 
initialization in this method. The <code class="docutils literal"><span 
class="pre">init()</span></code> method passes in a <code class="docutils 
literal"><span class="pre">ProcessorContext</span></code>
+            instance, which provides access to the metadata of the currently 
processed record, including its source Kafka topic and partition,
+            its corresponding message offset, and further such information. 
You can also use this context instance to schedule a punctuation
+            function (via <code class="docutils literal"><span 
class="pre">ProcessorContext#schedule()</span></code>), to forward a new record 
to the downstream processors (via <code class="docutils literal"><span 
class="pre">ProcessorContext#forward()</span></code>),
+            and to request a commit of the current processing progress (via 
<code class="docutils literal"><span 
class="pre">ProcessorContext#commit()</span></code>).
+            Any resources you set up in <code class="docutils literal"><span 
class="pre">init()</span></code> can be cleaned up in the
+            <code class="docutils literal"><span 
class="pre">close()</span></code> method. Note that Kafka Streams may re-use a 
single
+            <code class="docutils literal"><span 
class="pre">Processor</span></code> object by calling
+            <code class="docutils literal"><span 
class="pre">init()</span></code> on it again after <code class="docutils 
literal"><span class="pre">close()</span></code>.</p>
+          <p>The <code class="docutils literal"><span 
class="pre">Processor</span></code> interface takes four generic parameters:
             <code class="docutils literal"><span class="pre">KIn, VIn, KOut, 
VOut</span></code>. These define the input and output types
             that the processor implementation can handle. <code 
class="docutils literal"><span class="pre">KIn</span></code> and
-            <code class="docutils literal"><span class="pre">VIn</span></code> 
define the key and value types that will be passed
+            <code class="docutils literal"><span class="pre">VIn</span></code> 
define the key and value types of the <code class="docutils literal"><span 
class="pre">Record</span></code> that will be passed
             to <code class="docutils literal"><span 
class="pre">process()</span></code>.
             Likewise, <code class="docutils literal"><span 
class="pre">KOut</span></code> and <code class="docutils literal"><span 
class="pre">VOut</span></code>
-            define the forwarded key and value types that <code 
class="docutils literal"><span 
class="pre">ProcessorContext#forward()</span></code>
+            define the forwarded key and value types for the result <code 
class="docutils literal"><span class="pre">Record</span></code> that <code 
class="docutils literal"><span 
class="pre">ProcessorContext#forward()</span></code>
             will accept. If your processor does not forward any records at all 
(or if it only forwards
             <code class="docutils literal"><span 
class="pre">null</span></code> keys or values),
             a best practice is to set the output generic type argument to
             <code class="docutils literal"><span 
class="pre">Void</span></code>.
             If it needs to forward multiple types that don't share a common 
superclass, you will
-            have to set the output generic type argument to <code 
class="docutils literal"><span class="pre">Object</span></code>.
-          </p>
+            have to set the output generic type argument to <code 
class="docutils literal"><span class="pre">Object</span></code>.</p>
           <p>
             Both the <code class="docutils literal"><span 
class="pre">Processor#process()</span></code>
             and the <code class="docutils literal"><span 
class="pre">ProcessorContext#forward()</span></code>
@@ -120,40 +118,38 @@
             Note that this does not mutate <code class="docutils 
literal"><span class="pre">inputRecord</span></code>,
             but instead creates a shallow copy. Beware that this is only a 
shallow copy, so if you
             plan to mutate the key, value, or headers elsewhere in the 
program, you will want to
-            create a deep copy of those fields yourself.
-          </p>
-            <p>
-              In addition to handling incoming records via
-              <code class="docutils literal"><span 
class="pre">Processor#process()</span></code>,
-              you have the option to schedule periodic invocation (called 
"punctuation")
-              in your processor's <code class="docutils literal"><span 
class="pre">init()</span></code>
-              method by calling  <code class="docutils literal"><span 
class="pre">ProcessorContext#schedule()</span></code>
-              and passing it a  <code class="docutils literal"><span 
class="pre">Punctuator</span></code>.
-                The <code class="docutils literal"><span 
class="pre">PunctuationType</span></code> determines what notion of time is used
-                for the punctuation scheduling: either <a class="reference 
internal" href="../core-concepts.html#streams_time"><span class="std 
std-ref">stream-time</span></a> or wall-clock-time (by default, stream-time
-                is configured to represent event-time via <code 
class="docutils literal"><span class="pre">TimestampExtractor</span></code>). 
When stream-time is used, <code class="docutils literal"><span 
class="pre">punctuate()</span></code> is triggered purely
-                by data because stream-time is determined (and advanced 
forward) by the timestamps derived from the input data. When there
-                is no new input data arriving, stream-time is not advanced and 
thus <code class="docutils literal"><span class="pre">punctuate()</span></code> 
is not called.</p>
-            <p>For example, if you schedule a <code class="docutils 
literal"><span class="pre">Punctuator</span></code> function every 10 seconds 
based on <code class="docutils literal"><span 
class="pre">PunctuationType.STREAM_TIME</span></code> and if you
-                process a stream of 60 records with consecutive timestamps 
from 1 (first record) to 60 seconds (last record),
-                then <code class="docutils literal"><span 
class="pre">punctuate()</span></code> would be called 6 times. This happens 
regardless of the time required to actually process those records. <code 
class="docutils literal"><span class="pre">punctuate()</span></code>
-                would be called 6 times regardless of whether processing these 
60 records takes a second, a minute, or an hour.</p>
-            <p>When wall-clock-time (i.e. <code class="docutils literal"><span 
class="pre">PunctuationType.WALL_CLOCK_TIME</span></code>) is used, <code 
class="docutils literal"><span class="pre">punctuate()</span></code> is 
triggered purely by the wall-clock time.
-                Reusing the example above, if the <code class="docutils 
literal"><span class="pre">Punctuator</span></code> function is scheduled based 
on <code class="docutils literal"><span 
class="pre">PunctuationType.WALL_CLOCK_TIME</span></code>, and if these
-                60 records were processed within 20 seconds, <code 
class="docutils literal"><span class="pre">punctuate()</span></code> is called 
2 times (one time every 10 seconds). If these 60 records
-                were processed within 5 seconds, then no <code class="docutils 
literal"><span class="pre">punctuate()</span></code> is called at all. Note 
that you can schedule multiple <code class="docutils literal"><span 
class="pre">Punctuator</span></code>
-                callbacks with different <code class="docutils literal"><span 
class="pre">PunctuationType</span></code> types within the same processor by 
calling <code class="docutils literal"><span 
class="pre">ProcessorContext#schedule()</span></code> multiple
-                times inside <code class="docutils literal"><span 
class="pre">init()</span></code> method.</p>
-            <div class="admonition attention">
+            create a deep copy of those fields yourself.</p>
+          <p>In addition to handling incoming records via
+            <code class="docutils literal"><span 
class="pre">Processor#process()</span></code>,
+            you have the option to schedule periodic invocation (called 
"punctuation")
+            in your processor's <code class="docutils literal"><span 
class="pre">init()</span></code>
+            method by calling  <code class="docutils literal"><span 
class="pre">ProcessorContext#schedule()</span></code>
+            and passing it a  <code class="docutils literal"><span 
class="pre">Punctuator</span></code>.
+            The <code class="docutils literal"><span 
class="pre">PunctuationType</span></code> determines what notion of time is used
+            for the punctuation scheduling: either <a class="reference 
internal" href="../core-concepts.html#streams_time"><span class="std 
std-ref">stream-time</span></a> or wall-clock-time (by default, stream-time
+            is configured to represent event-time via <code class="docutils 
literal"><span class="pre">TimestampExtractor</span></code>). When stream-time 
is used, <code class="docutils literal"><span 
class="pre">punctuate()</span></code> is triggered purely
+            by data because stream-time is determined (and advanced forward) 
by the timestamps derived from the input data. When there
+            is no new input data arriving, stream-time is not advanced and 
thus <code class="docutils literal"><span class="pre">punctuate()</span></code> 
is not called.</p>
+          <p>For example, if you schedule a <code class="docutils 
literal"><span class="pre">Punctuator</span></code> function every 10 seconds 
based on <code class="docutils literal"><span 
class="pre">PunctuationType.STREAM_TIME</span></code> and if you
+            process a stream of 60 records with consecutive timestamps from 1 
(first record) to 60 seconds (last record),
+            then <code class="docutils literal"><span 
class="pre">punctuate()</span></code> would be called 6 times. This happens 
regardless of the time required to actually process those records. <code 
class="docutils literal"><span class="pre">punctuate()</span></code>
+            would be called 6 times regardless of whether processing these 60 
records takes a second, a minute, or an hour.</p>
+          <p>When wall-clock-time (i.e. <code class="docutils literal"><span 
class="pre">PunctuationType.WALL_CLOCK_TIME</span></code>) is used, <code 
class="docutils literal"><span class="pre">punctuate()</span></code> is 
triggered purely by the wall-clock time.
+            Reusing the example above, if the <code class="docutils 
literal"><span class="pre">Punctuator</span></code> function is scheduled based 
on <code class="docutils literal"><span 
class="pre">PunctuationType.WALL_CLOCK_TIME</span></code>, and if these
+            60 records were processed within 20 seconds, <code class="docutils 
literal"><span class="pre">punctuate()</span></code> is called 2 times (one 
time every 10 seconds). If these 60 records
+            were processed within 5 seconds, then no <code class="docutils 
literal"><span class="pre">punctuate()</span></code> is called at all. Note 
that you can schedule multiple <code class="docutils literal"><span 
class="pre">Punctuator</span></code>
+            callbacks with different <code class="docutils literal"><span 
class="pre">PunctuationType</span></code> types within the same processor by 
calling <code class="docutils literal"><span 
class="pre">ProcessorContext#schedule()</span></code> multiple
+            times inside <code class="docutils literal"><span 
class="pre">init()</span></code> method.</p>
+          <div class="admonition attention">
                 <p class="first admonition-title"><b>Attention</b></p>
                 <p class="last">Stream-time is only advanced when Streams 
processes records.
                   If there are no records to process, or if Streams is waiting 
for new records
                   due to the <a class="reference internal" 
href="/documentation/#streamsconfigs_max.task.idle.ms">Task Idling</a>
                   configuration, then the stream time will not advance and 
<code class="docutils literal"><span class="pre">punctuate()</span></code> will 
not be triggered if <code class="docutils literal"><span 
class="pre">PunctuationType.STREAM_TIME</span></code> was specified.
                     This behavior is independent of the configured timestamp 
extractor, i.e., using <code class="docutils literal"><span 
class="pre">WallclockTimestampExtractor</span></code> does not enable 
wall-clock triggering of <code class="docutils literal"><span 
class="pre">punctuate()</span></code>.</p>
-            </div>
-            <p><b>Example</b></p>
-            <p>The following example <code class="docutils literal"><span 
class="pre">Processor</span></code> defines a simple word-count algorithm and 
the following actions are performed:</p>
+          </div>
+          <p><b>Example</b></p>
+          <p>The following example <code class="docutils literal"><span 
class="pre">Processor</span></code> defines a simple word-count algorithm and 
the following actions are performed:</p>
             <ul class="simple">
                 <li>In the <code class="docutils literal"><span 
class="pre">init()</span></code> method, schedule the punctuation every 1000 
time units (the time unit is normally milliseconds, which in this example would 
translate to punctuation every 1 second) and retrieve the local state store by 
its name &#8220;Counts&#8221;.</li>
                 <li>In the <code class="docutils literal"><span 
class="pre">process()</span></code> method, upon each received record, split 
the value string into words, and update their counts into the state store (we 
will talk about this later in this section).</li>
@@ -216,8 +212,8 @@
         </div>
         <div class="section" id="state-stores">
             <span id="streams-developer-guide-state-store"></span><h2><a 
class="toc-backref" href="#id3">State Stores</a><a class="headerlink" 
href="#state-stores" title="Permalink to this headline"></a></h2>
-            <p>To implement a <strong>stateful</strong> <code class="docutils 
literal"><span class="pre">Processor</span></code> or <code class="docutils 
literal"><span class="pre">Transformer</span></code>, you must provide one or 
more state stores to the processor
-                or transformer (<em>stateless</em> processors or transformers 
do not need state stores).  State stores can be used to remember
+            <p>To implement a <strong>stateful</strong> <code class="docutils 
literal"><span class="pre">Processor</span></code>, you must provide one or 
more state stores to the processor
+                (<em>stateless</em> processors do not need state stores). 
State stores can be used to remember
                 recently received input records, to track rolling aggregates, 
to de-duplicate input records, and more.
                 Another feature of state stores is that they can be
                 <a class="reference internal" 
href="interactive-queries.html#streams-developer-guide-interactive-queries"><span
 class="std std-ref">interactively queried</span></a> from other applications, 
such as a
@@ -499,15 +495,9 @@ StoreBuilder&lt;KeyValueStore&lt;String, Long&gt;&gt; 
countStoreSupplier = Store
             <p>As we have mentioned in the <a 
href=#defining-a-stream-processor>Defining a Stream Processor</a> section, a 
<code>ProcessorContext</code> control the processing workflow, such as 
scheduling a punctuation function, and committing the current processed 
state.</p>
             <p>This object can also be used to access the metadata related 
with the application like
             <code class="docutils literal"><span 
class="pre">applicationId</span></code>, <code class="docutils literal"><span 
class="pre">taskId</span></code>,
-                and <code class="docutils literal"><span 
class="pre">stateDir</span></code>, and also record related metadata as <code 
class="docutils literal"><span class="pre">topic</span></code>,
-                <code class="docutils literal"><span 
class="pre">partition</span></code>, <code class="docutils literal"><span 
class="pre">offset</span></code>, <code class="docutils literal"><span 
class="pre">timestamp</span></code> and
-                <code class="docutils literal"><span 
class="pre">headers</span></code>.</p>
-            <p>Here is an example implementation of how to add a new header to 
the record:</p>
-            <pre class="line-numbers"><code class="language-java">public void 
process(String key, String value) {
-
-    // add a header to the elements
-    context().headers().add.(&quot;key&quot;, &quot;value&quot;);
-}</code></pre>
+                and <code class="docutils literal"><span 
class="pre">stateDir</span></code>, and also <code class="docutils 
literal"><span class="pre">RecordMetadata</span></code> such as
+                <code class="docutils literal"><span 
class="pre">topic</span></code>,
+                <code class="docutils literal"><span 
class="pre">partition</span></code>, and <code class="docutils literal"><span 
class="pre">offset</span></code>.</p>
         <div class="section" id="connecting-processors-and-state-stores">
             <h2><a class="toc-backref" href="#id8">Connecting Processors and 
State Stores</a><a class="headerlink" 
href="#connecting-processors-and-state-stores" title="Permalink to this 
headline"></a></h2>
             <p>Now that a <a class="reference internal" 
href="#streams-developer-guide-stream-processor"><span class="std 
std-ref">processor</span></a> (WordCountProcessor) and the
@@ -572,7 +562,7 @@ builder.addSource("Source", "source-topic")
                 upstream processor of the <code class="docutils literal"><span 
class="pre">&quot;Sink&quot;</span></code> node.  As a result, whenever the 
<code class="docutils literal"><span 
class="pre">&quot;Source&quot;</span></code> node forwards a newly fetched 
record from
                 Kafka to its downstream <code class="docutils literal"><span 
class="pre">&quot;Process&quot;</span></code> node, the <code class="docutils 
literal"><span class="pre">WordCountProcessor#process()</span></code> method is 
triggered to process the record and
                 update the associated state store. Whenever <code 
class="docutils literal"><span class="pre">context#forward()</span></code> is 
called in the
-                <code class="docutils literal"><span 
class="pre">WordCountProcessor#punctuate()</span></code> method, the aggregate 
key-value pair will be sent via the <code class="docutils literal"><span 
class="pre">&quot;Sink&quot;</span></code> processor node to
+                <code class="docutils literal"><span 
class="pre">WordCountProcessor#punctuate()</span></code> method, the aggregate 
records will be sent via the <code class="docutils literal"><span 
class="pre">&quot;Sink&quot;</span></code> processor node to
                 the Kafka topic <code class="docutils literal"><span 
class="pre">&quot;sink-topic&quot;</span></code>.  Note that in the <code 
class="docutils literal"><span class="pre">WordCountProcessor</span></code> 
implementation, you must refer to the
                 same store name <code class="docutils literal"><span 
class="pre">&quot;Counts&quot;</span></code> when accessing the key-value 
store, otherwise an exception will be thrown at runtime,
                 indicating that the state store cannot be found. If the state 
store is not associated with the processor
diff --git a/docs/streams/developer-guide/testing.html 
b/docs/streams/developer-guide/testing.html
index 1b2c2cc4c27..510cf337457 100644
--- a/docs/streams/developer-guide/testing.html
+++ b/docs/streams/developer-guide/testing.html
@@ -284,7 +284,7 @@ processorUnderTest.init(context);</code></pre>
 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
 props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass());
 props.put(&quot;some.other.config&quot;, &quot;some config value&quot;);
-final MockProcessorContext&lt;String, Long&gt; context = new 
MockProcessorContext&lt;&gt;(props);
+final MockProcessorContext&lt;String, Long&gt; context = new 
MockProcessorContext&lt;&gt;(props);</code></pre>
             </p>
             <b>Captured data</b>
             <p>
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index d6016c327fd..7037e8d7fd3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.kstream.KGroupedTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
 import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
 import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
 import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
@@ -510,8 +511,8 @@ public class StreamsBuilder {
      * Adds a state store to the underlying {@link Topology}.
      * <p>
      * It is required to connect state stores to {@link 
org.apache.kafka.streams.processor.api.Processor Processors},
-     * {@link org.apache.kafka.streams.kstream.Transformer Transformers},
-     * or {@link org.apache.kafka.streams.kstream.ValueTransformer 
ValueTransformers} before they can be used.
+     * or {@link 
org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier,
 String...) ValueTransformers}
+     * before they can be used.
      *
      * @param builder the builder used to obtain this state store {@link 
StateStore} instance
      * @return itself
@@ -540,8 +541,7 @@ public class StreamsBuilder {
      * The default {@link TimestampExtractor} as specified in the {@link 
StreamsConfig config} is used.
      * <p>
      * It is not required to connect a global store to the {@link Processor 
Processors},
-     * {@link org.apache.kafka.streams.kstream.Transformer Transformers},
-     * or {@link org.apache.kafka.streams.kstream.ValueTransformer 
ValueTransformer};
+     * or {@link 
org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier,
 String...) ValueTransformer};
      * those have read-only access to all global stores by default.
      *
      * @param storeBuilder          user defined {@link StoreBuilder}; can't 
be {@code null}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java 
b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
index 77dc5049c34..0ab0e0d92ac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
+++ b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
@@ -32,7 +32,7 @@ import java.util.regex.Pattern;
  * In contrast, two sub-topologies are not connected but can be linked to each 
other via topics, i.e., if one
  * sub-topology {@link Topology#addSink(String, String, String...) writes} 
into a topic and another sub-topology
  * {@link Topology#addSource(String, String...) reads} from the same topic.
- * Message {@link ProcessorContext#forward(Record, String) forwards} using 
custom Processors and Transformers are not considered in the topology graph.
+ * Message {@link ProcessorContext#forward(Record, String) forwards} using 
custom Processors are not considered in the topology graph.
  * <p>
  * When {@link KafkaStreams#start()} is called, different sub-topologies will 
be constructed and executed as independent
  * {@link StreamTask tasks}.
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
index be550a1f7b6..9bd16bc7857 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
@@ -16,20 +16,20 @@
  */
 package org.apache.kafka.streams.kstream;
 
+import org.apache.kafka.streams.processor.api.FixedKeyProcessor;
 
 /**
  * The {@code ValueMapper} interface for mapping a value to a new value of 
arbitrary type.
  * This is a stateless record-by-record operation, i.e, {@link #apply(Object)} 
is invoked individually for each record
- * of a stream (cf. {@link ValueTransformer} for stateful value 
transformation).
- * If {@code ValueMapper} is applied to a {@link 
org.apache.kafka.streams.KeyValue key-value pair} record the record's
+ * of a stream (cf. {@link 
org.apache.kafka.streams.processor.api.FixedKeyProcessor} for stateful value 
transformation).
+ * If {@code ValueMapper} is applied to a {@link 
org.apache.kafka.streams.processor.api.Record} the record's
  * key is preserved.
  * If a record's key and value should be modified {@link KeyValueMapper} can 
be used.
  *
  * @param <V>  value type
  * @param <VR> mapped value type
  * @see KeyValueMapper
- * @see ValueTransformer
- * @see ValueTransformerWithKey
+ * @see FixedKeyProcessor
  * @see KStream#mapValues(ValueMapper)
  * @see KStream#mapValues(ValueMapperWithKey)
  * @see KStream#flatMapValues(ValueMapper)
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapperWithKey.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapperWithKey.java
index b20c61ae682..0c315f0e0c1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapperWithKey.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapperWithKey.java
@@ -19,8 +19,8 @@ package org.apache.kafka.streams.kstream;
 /**
  * The {@code ValueMapperWithKey} interface for mapping a value to a new value 
of arbitrary type.
  * This is a stateless record-by-record operation, i.e, {@link #apply(Object, 
Object)} is invoked individually for each
- * record of a stream (cf. {@link ValueTransformer} for stateful value 
transformation).
- * If {@code ValueMapperWithKey} is applied to a {@link 
org.apache.kafka.streams.KeyValue key-value pair} record the
+ * record of a stream (cf. {@link 
org.apache.kafka.streams.processor.api.FixedKeyProcessor} for stateful value 
transformation).
+ * If {@code ValueMapperWithKey} is applied to a {@link 
org.apache.kafka.streams.processor.api.Record} the
  * record's key is preserved.
  * Note that the key is read-only and should not be modified, as this can lead 
to corrupt partitioning.
  * If a record's key and value should be modified {@link KeyValueMapper} can 
be used.
@@ -29,8 +29,7 @@ package org.apache.kafka.streams.kstream;
  * @param <V>  value type
  * @param <VR> mapped value type
  * @see KeyValueMapper
- * @see ValueTransformer
- * @see ValueTransformerWithKey
+ * @see org.apache.kafka.streams.processor.api.FixedKeyProcessor
  * @see KStream#mapValues(ValueMapper)
  * @see KStream#mapValues(ValueMapperWithKey)
  * @see KStream#flatMapValues(ValueMapper)
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index 38c27646b0f..3d057c5ce2b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -110,9 +110,10 @@ public interface ProcessorContext {
     <S extends StateStore> S getStateStore(final String name);
 
     /**
-     * Schedule a periodic operation for processors. A processor may call this 
method during
-     * {@link 
org.apache.kafka.streams.kstream.ValueTransformer#init(ProcessorContext)  
initialization} or
-     * {@link 
org.apache.kafka.streams.kstream.ValueTransformer#transform(Object)  
processing} to
+     * Schedule a periodic operation for processors. A processor may call this 
method during a
+     * {@link 
org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier,
 String...)}'s
+     * {@link 
org.apache.kafka.streams.kstream.ValueTransformerWithKey#init(ProcessorContext) 
initialization} or
+     * {@link 
org.apache.kafka.streams.kstream.ValueTransformerWithKey#transform(Object, 
Object) processing} to
      * schedule a periodic callback &mdash; called a punctuation &mdash; to 
{@link Punctuator#punctuate(long)}.
      * The type parameter controls what notion of time is used for punctuation:
      * <ul>
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
index 17028abe34b..5091074d70b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
@@ -42,8 +42,7 @@ public final class ForwardingDisabledProcessorContext 
implements ProcessorContex
 
     private static final String EXPLANATION = "ProcessorContext#forward() is 
not supported from this context, "
         + "as the framework must ensure the key is not changed (#forward 
allows changing the key on "
-        + "messages which are sent). Try another function, which doesn't allow 
the key to be changed "
-        + "(for example - #transformValues).";
+        + "messages which are sent). Use KStream.process() if you need to 
change the key.";
 
     public ForwardingDisabledProcessorContext(final ProcessorContext delegate) 
{
         this.delegate = Objects.requireNonNull(delegate, "delegate");
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index e526f89d78e..6a53afd07b3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -174,7 +174,7 @@ public final class ProcessorContextImpl extends 
AbstractProcessorContext<Object,
                 " as the store is not connected to the processor. If you add 
stores manually via '.addStateStore()' " +
                 "make sure to connect the added store to the processor by 
providing the processor name to " +
                 "'.addStateStore()' or connect them via 
'.connectProcessorAndStateStores()'. " +
-                "DSL users need to provide the store name to '.process()', 
'.transform()', or '.transformValues()' " +
+                "DSL users need to provide the store name to '.process()', 
'.processValues()', or '.transformValues()' " +
                 "to connect the store to the corresponding operator, or they 
can provide a StoreBuilder by implementing " +
                 "the stores() method on the Supplier itself. If you do not add 
stores manually, " +
                 "please file a bug report at 
https://issues.apache.org/jira/projects/KAFKA.";);
@@ -236,8 +236,8 @@ public final class ProcessorContextImpl extends 
AbstractProcessorContext<Object,
         final ProcessorNode<?, ?, ?, ?> previousNode = currentNode();
         if (previousNode == null) {
             throw new StreamsException("Current node is unknown. This can 
happen if 'forward()' is called " +
-                    "in an illegal scope. The root cause could be that a 
'Processor' or 'Transformer' instance" +
-                    " is shared. To avoid this error, make sure that your 
suppliers return new instances " +
+                    "in an illegal scope. The root cause could be that a 
'Processor' instance " +
+                    "is shared. To avoid this error, make sure that your 
suppliers return new instances " +
                     "each time 'get()' of Supplier is called and do not return 
the same object reference " +
                     "multiple times.");
         }


Reply via email to