This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 4.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push: new c438ee91668 MINOR: Kafka Streams doc updates for 4.0 release (#18488) c438ee91668 is described below commit c438ee9166897026f896c8b42f3495d95d0ce42f Author: Matthias J. Sax <matth...@confluent.io> AuthorDate: Wed Jan 15 11:58:53 2025 -0800 MINOR: Kafka Streams doc updates for 4.0 release (#18488) Reviewers: Bill Bejeck <b...@confluent.io> --- docs/streams/developer-guide/processor-api.html | 114 ++++++++++----------- docs/streams/developer-guide/testing.html | 2 +- .../org/apache/kafka/streams/StreamsBuilder.java | 8 +- .../apache/kafka/streams/TopologyDescription.java | 2 +- .../apache/kafka/streams/kstream/ValueMapper.java | 8 +- .../kafka/streams/kstream/ValueMapperWithKey.java | 7 +- .../kafka/streams/processor/ProcessorContext.java | 7 +- .../ForwardingDisabledProcessorContext.java | 3 +- .../processor/internals/ProcessorContextImpl.java | 6 +- 9 files changed, 73 insertions(+), 84 deletions(-) diff --git a/docs/streams/developer-guide/processor-api.html b/docs/streams/developer-guide/processor-api.html index e25ecd601ac..72ed7fbe852 100644 --- a/docs/streams/developer-guide/processor-api.html +++ b/docs/streams/developer-guide/processor-api.html @@ -67,42 +67,40 @@ <p><b>Tip</b></p> <p class="last"><strong>Combining the DSL and the Processor API:</strong> You can combine the convenience of the DSL with the power and flexibility of the Processor API as described in the - section <a class="reference internal" href="dsl-api.html#streams-developer-guide-dsl-process"><span class="std std-ref">Applying processors and transformers (Processor API integration)</span></a>.</p> + section <a class="reference internal" href="dsl-api.html#streams-developer-guide-dsl-process"><span class="std std-ref">Applying processors (Processor API integration)</span></a>.</p> </div> <p>For a complete list of available API functionality, see the <a href="/{{version}}/javadoc/org/apache/kafka/streams/package-summary.html">Streams</a> API docs.</p> </div> <div class="section" id="defining-a-stream-processor"> <span id="streams-developer-guide-stream-processor"></span><h2><a class="toc-backref" href="#id2">Defining a Stream Processor</a><a class="headerlink" href="#defining-a-stream-processor" title="Permalink to this headline"></a></h2> - <p>A <a class="reference internal" href="../core-concepts.html#streams_processor_node"><span class="std std-ref">stream processor</span></a> is a node in the processor topology that represents a single processing step. - With the Processor API, you can define arbitrary stream processors that processes one received record at a time, and connect - these processors with their associated state stores to compose the processor topology.</p> - <p>You can define a customized stream processor by implementing the <code class="docutils literal"><span class="pre">Processor</span></code> interface, which provides the <code class="docutils literal"><span class="pre">process()</span></code> API method. - The <code class="docutils literal"><span class="pre">process()</span></code> method is called on each of the received records.</p> - <p>The <code class="docutils literal"><span class="pre">Processor</span></code> interface also has an <code class="docutils literal"><span class="pre">init()</span></code> method, which is called by the Kafka Streams library during task construction - phase. Processor instances should perform any required initialization in this method. The <code class="docutils literal"><span class="pre">init()</span></code> method passes in a <code class="docutils literal"><span class="pre">ProcessorContext</span></code> - instance, which provides access to the metadata of the currently processed record, including its source Kafka topic and partition, - its corresponding message offset, and further such information. You can also use this context instance to schedule a punctuation - function (via <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code>), to forward a new record as a key-value pair to the downstream processors (via <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>), - and to commit the current processing progress (via <code class="docutils literal"><span class="pre">ProcessorContext#commit()</span></code>). - Any resources you set up in <code class="docutils literal"><span class="pre">init()</span></code> can be cleaned up in the - <code class="docutils literal"><span class="pre">close()</span></code> method. Note that Kafka Streams may re-use a single - <code class="docutils literal"><span class="pre">Processor</span></code> object by calling - <code class="docutils literal"><span class="pre">init()</span></code> on it again after <code class="docutils literal"><span class="pre">close()</span></code>.</p> - <p> - The <code class="docutils literal"><span class="pre">Processor</span></code> interface takes two sets of generic parameters: + <p>A <a class="reference internal" href="../core-concepts.html#streams_processor_node"><span class="std std-ref">stream processor</span></a> is a node in the processor topology that represents a single processing step. + With the Processor API, you can define arbitrary stream processors that processes one received record at a time, and connect + these processors with their associated state stores to compose the processor topology.</p> + <p>You can define a customized stream processor by implementing the <code class="docutils literal"><span class="pre">Processor</span></code> interface, which provides the <code class="docutils literal"><span class="pre">process()</span></code> API method. + The <code class="docutils literal"><span class="pre">process()</span></code> method is called on each of the received records.</p> + <p>The <code class="docutils literal"><span class="pre">Processor</span></code> interface also has an <code class="docutils literal"><span class="pre">init()</span></code> method, which is called by the Kafka Streams library during task construction + phase. Processor instances should perform any required initialization in this method. The <code class="docutils literal"><span class="pre">init()</span></code> method passes in a <code class="docutils literal"><span class="pre">ProcessorContext</span></code> + instance, which provides access to the metadata of the currently processed record, including its source Kafka topic and partition, + its corresponding message offset, and further such information. You can also use this context instance to schedule a punctuation + function (via <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code>), to forward a new record to the downstream processors (via <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>), + and to request a commit of the current processing progress (via <code class="docutils literal"><span class="pre">ProcessorContext#commit()</span></code>). + Any resources you set up in <code class="docutils literal"><span class="pre">init()</span></code> can be cleaned up in the + <code class="docutils literal"><span class="pre">close()</span></code> method. Note that Kafka Streams may re-use a single + <code class="docutils literal"><span class="pre">Processor</span></code> object by calling + <code class="docutils literal"><span class="pre">init()</span></code> on it again after <code class="docutils literal"><span class="pre">close()</span></code>.</p> + <p>The <code class="docutils literal"><span class="pre">Processor</span></code> interface takes four generic parameters: <code class="docutils literal"><span class="pre">KIn, VIn, KOut, VOut</span></code>. These define the input and output types that the processor implementation can handle. <code class="docutils literal"><span class="pre">KIn</span></code> and - <code class="docutils literal"><span class="pre">VIn</span></code> define the key and value types that will be passed + <code class="docutils literal"><span class="pre">VIn</span></code> define the key and value types of the <code class="docutils literal"><span class="pre">Record</span></code> that will be passed to <code class="docutils literal"><span class="pre">process()</span></code>. Likewise, <code class="docutils literal"><span class="pre">KOut</span></code> and <code class="docutils literal"><span class="pre">VOut</span></code> - define the forwarded key and value types that <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code> + define the forwarded key and value types for the result <code class="docutils literal"><span class="pre">Record</span></code> that <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code> will accept. If your processor does not forward any records at all (or if it only forwards <code class="docutils literal"><span class="pre">null</span></code> keys or values), a best practice is to set the output generic type argument to <code class="docutils literal"><span class="pre">Void</span></code>. If it needs to forward multiple types that don't share a common superclass, you will - have to set the output generic type argument to <code class="docutils literal"><span class="pre">Object</span></code>. - </p> + have to set the output generic type argument to <code class="docutils literal"><span class="pre">Object</span></code>.</p> <p> Both the <code class="docutils literal"><span class="pre">Processor#process()</span></code> and the <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code> @@ -120,40 +118,38 @@ Note that this does not mutate <code class="docutils literal"><span class="pre">inputRecord</span></code>, but instead creates a shallow copy. Beware that this is only a shallow copy, so if you plan to mutate the key, value, or headers elsewhere in the program, you will want to - create a deep copy of those fields yourself. - </p> - <p> - In addition to handling incoming records via - <code class="docutils literal"><span class="pre">Processor#process()</span></code>, - you have the option to schedule periodic invocation (called "punctuation") - in your processor's <code class="docutils literal"><span class="pre">init()</span></code> - method by calling <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> - and passing it a <code class="docutils literal"><span class="pre">Punctuator</span></code>. - The <code class="docutils literal"><span class="pre">PunctuationType</span></code> determines what notion of time is used - for the punctuation scheduling: either <a class="reference internal" href="../core-concepts.html#streams_time"><span class="std std-ref">stream-time</span></a> or wall-clock-time (by default, stream-time - is configured to represent event-time via <code class="docutils literal"><span class="pre">TimestampExtractor</span></code>). When stream-time is used, <code class="docutils literal"><span class="pre">punctuate()</span></code> is triggered purely - by data because stream-time is determined (and advanced forward) by the timestamps derived from the input data. When there - is no new input data arriving, stream-time is not advanced and thus <code class="docutils literal"><span class="pre">punctuate()</span></code> is not called.</p> - <p>For example, if you schedule a <code class="docutils literal"><span class="pre">Punctuator</span></code> function every 10 seconds based on <code class="docutils literal"><span class="pre">PunctuationType.STREAM_TIME</span></code> and if you - process a stream of 60 records with consecutive timestamps from 1 (first record) to 60 seconds (last record), - then <code class="docutils literal"><span class="pre">punctuate()</span></code> would be called 6 times. This happens regardless of the time required to actually process those records. <code class="docutils literal"><span class="pre">punctuate()</span></code> - would be called 6 times regardless of whether processing these 60 records takes a second, a minute, or an hour.</p> - <p>When wall-clock-time (i.e. <code class="docutils literal"><span class="pre">PunctuationType.WALL_CLOCK_TIME</span></code>) is used, <code class="docutils literal"><span class="pre">punctuate()</span></code> is triggered purely by the wall-clock time. - Reusing the example above, if the <code class="docutils literal"><span class="pre">Punctuator</span></code> function is scheduled based on <code class="docutils literal"><span class="pre">PunctuationType.WALL_CLOCK_TIME</span></code>, and if these - 60 records were processed within 20 seconds, <code class="docutils literal"><span class="pre">punctuate()</span></code> is called 2 times (one time every 10 seconds). If these 60 records - were processed within 5 seconds, then no <code class="docutils literal"><span class="pre">punctuate()</span></code> is called at all. Note that you can schedule multiple <code class="docutils literal"><span class="pre">Punctuator</span></code> - callbacks with different <code class="docutils literal"><span class="pre">PunctuationType</span></code> types within the same processor by calling <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> multiple - times inside <code class="docutils literal"><span class="pre">init()</span></code> method.</p> - <div class="admonition attention"> + create a deep copy of those fields yourself.</p> + <p>In addition to handling incoming records via + <code class="docutils literal"><span class="pre">Processor#process()</span></code>, + you have the option to schedule periodic invocation (called "punctuation") + in your processor's <code class="docutils literal"><span class="pre">init()</span></code> + method by calling <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> + and passing it a <code class="docutils literal"><span class="pre">Punctuator</span></code>. + The <code class="docutils literal"><span class="pre">PunctuationType</span></code> determines what notion of time is used + for the punctuation scheduling: either <a class="reference internal" href="../core-concepts.html#streams_time"><span class="std std-ref">stream-time</span></a> or wall-clock-time (by default, stream-time + is configured to represent event-time via <code class="docutils literal"><span class="pre">TimestampExtractor</span></code>). When stream-time is used, <code class="docutils literal"><span class="pre">punctuate()</span></code> is triggered purely + by data because stream-time is determined (and advanced forward) by the timestamps derived from the input data. When there + is no new input data arriving, stream-time is not advanced and thus <code class="docutils literal"><span class="pre">punctuate()</span></code> is not called.</p> + <p>For example, if you schedule a <code class="docutils literal"><span class="pre">Punctuator</span></code> function every 10 seconds based on <code class="docutils literal"><span class="pre">PunctuationType.STREAM_TIME</span></code> and if you + process a stream of 60 records with consecutive timestamps from 1 (first record) to 60 seconds (last record), + then <code class="docutils literal"><span class="pre">punctuate()</span></code> would be called 6 times. This happens regardless of the time required to actually process those records. <code class="docutils literal"><span class="pre">punctuate()</span></code> + would be called 6 times regardless of whether processing these 60 records takes a second, a minute, or an hour.</p> + <p>When wall-clock-time (i.e. <code class="docutils literal"><span class="pre">PunctuationType.WALL_CLOCK_TIME</span></code>) is used, <code class="docutils literal"><span class="pre">punctuate()</span></code> is triggered purely by the wall-clock time. + Reusing the example above, if the <code class="docutils literal"><span class="pre">Punctuator</span></code> function is scheduled based on <code class="docutils literal"><span class="pre">PunctuationType.WALL_CLOCK_TIME</span></code>, and if these + 60 records were processed within 20 seconds, <code class="docutils literal"><span class="pre">punctuate()</span></code> is called 2 times (one time every 10 seconds). If these 60 records + were processed within 5 seconds, then no <code class="docutils literal"><span class="pre">punctuate()</span></code> is called at all. Note that you can schedule multiple <code class="docutils literal"><span class="pre">Punctuator</span></code> + callbacks with different <code class="docutils literal"><span class="pre">PunctuationType</span></code> types within the same processor by calling <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> multiple + times inside <code class="docutils literal"><span class="pre">init()</span></code> method.</p> + <div class="admonition attention"> <p class="first admonition-title"><b>Attention</b></p> <p class="last">Stream-time is only advanced when Streams processes records. If there are no records to process, or if Streams is waiting for new records due to the <a class="reference internal" href="/documentation/#streamsconfigs_max.task.idle.ms">Task Idling</a> configuration, then the stream time will not advance and <code class="docutils literal"><span class="pre">punctuate()</span></code> will not be triggered if <code class="docutils literal"><span class="pre">PunctuationType.STREAM_TIME</span></code> was specified. This behavior is independent of the configured timestamp extractor, i.e., using <code class="docutils literal"><span class="pre">WallclockTimestampExtractor</span></code> does not enable wall-clock triggering of <code class="docutils literal"><span class="pre">punctuate()</span></code>.</p> - </div> - <p><b>Example</b></p> - <p>The following example <code class="docutils literal"><span class="pre">Processor</span></code> defines a simple word-count algorithm and the following actions are performed:</p> + </div> + <p><b>Example</b></p> + <p>The following example <code class="docutils literal"><span class="pre">Processor</span></code> defines a simple word-count algorithm and the following actions are performed:</p> <ul class="simple"> <li>In the <code class="docutils literal"><span class="pre">init()</span></code> method, schedule the punctuation every 1000 time units (the time unit is normally milliseconds, which in this example would translate to punctuation every 1 second) and retrieve the local state store by its name “Counts”.</li> <li>In the <code class="docutils literal"><span class="pre">process()</span></code> method, upon each received record, split the value string into words, and update their counts into the state store (we will talk about this later in this section).</li> @@ -216,8 +212,8 @@ </div> <div class="section" id="state-stores"> <span id="streams-developer-guide-state-store"></span><h2><a class="toc-backref" href="#id3">State Stores</a><a class="headerlink" href="#state-stores" title="Permalink to this headline"></a></h2> - <p>To implement a <strong>stateful</strong> <code class="docutils literal"><span class="pre">Processor</span></code> or <code class="docutils literal"><span class="pre">Transformer</span></code>, you must provide one or more state stores to the processor - or transformer (<em>stateless</em> processors or transformers do not need state stores). State stores can be used to remember + <p>To implement a <strong>stateful</strong> <code class="docutils literal"><span class="pre">Processor</span></code>, you must provide one or more state stores to the processor + (<em>stateless</em> processors do not need state stores). State stores can be used to remember recently received input records, to track rolling aggregates, to de-duplicate input records, and more. Another feature of state stores is that they can be <a class="reference internal" href="interactive-queries.html#streams-developer-guide-interactive-queries"><span class="std std-ref">interactively queried</span></a> from other applications, such as a @@ -499,15 +495,9 @@ StoreBuilder<KeyValueStore<String, Long>> countStoreSupplier = Store <p>As we have mentioned in the <a href=#defining-a-stream-processor>Defining a Stream Processor</a> section, a <code>ProcessorContext</code> control the processing workflow, such as scheduling a punctuation function, and committing the current processed state.</p> <p>This object can also be used to access the metadata related with the application like <code class="docutils literal"><span class="pre">applicationId</span></code>, <code class="docutils literal"><span class="pre">taskId</span></code>, - and <code class="docutils literal"><span class="pre">stateDir</span></code>, and also record related metadata as <code class="docutils literal"><span class="pre">topic</span></code>, - <code class="docutils literal"><span class="pre">partition</span></code>, <code class="docutils literal"><span class="pre">offset</span></code>, <code class="docutils literal"><span class="pre">timestamp</span></code> and - <code class="docutils literal"><span class="pre">headers</span></code>.</p> - <p>Here is an example implementation of how to add a new header to the record:</p> - <pre class="line-numbers"><code class="language-java">public void process(String key, String value) { - - // add a header to the elements - context().headers().add.("key", "value"); -}</code></pre> + and <code class="docutils literal"><span class="pre">stateDir</span></code>, and also <code class="docutils literal"><span class="pre">RecordMetadata</span></code> such as + <code class="docutils literal"><span class="pre">topic</span></code>, + <code class="docutils literal"><span class="pre">partition</span></code>, and <code class="docutils literal"><span class="pre">offset</span></code>.</p> <div class="section" id="connecting-processors-and-state-stores"> <h2><a class="toc-backref" href="#id8">Connecting Processors and State Stores</a><a class="headerlink" href="#connecting-processors-and-state-stores" title="Permalink to this headline"></a></h2> <p>Now that a <a class="reference internal" href="#streams-developer-guide-stream-processor"><span class="std std-ref">processor</span></a> (WordCountProcessor) and the @@ -572,7 +562,7 @@ builder.addSource("Source", "source-topic") upstream processor of the <code class="docutils literal"><span class="pre">"Sink"</span></code> node. As a result, whenever the <code class="docutils literal"><span class="pre">"Source"</span></code> node forwards a newly fetched record from Kafka to its downstream <code class="docutils literal"><span class="pre">"Process"</span></code> node, the <code class="docutils literal"><span class="pre">WordCountProcessor#process()</span></code> method is triggered to process the record and update the associated state store. Whenever <code class="docutils literal"><span class="pre">context#forward()</span></code> is called in the - <code class="docutils literal"><span class="pre">WordCountProcessor#punctuate()</span></code> method, the aggregate key-value pair will be sent via the <code class="docutils literal"><span class="pre">"Sink"</span></code> processor node to + <code class="docutils literal"><span class="pre">WordCountProcessor#punctuate()</span></code> method, the aggregate records will be sent via the <code class="docutils literal"><span class="pre">"Sink"</span></code> processor node to the Kafka topic <code class="docutils literal"><span class="pre">"sink-topic"</span></code>. Note that in the <code class="docutils literal"><span class="pre">WordCountProcessor</span></code> implementation, you must refer to the same store name <code class="docutils literal"><span class="pre">"Counts"</span></code> when accessing the key-value store, otherwise an exception will be thrown at runtime, indicating that the state store cannot be found. If the state store is not associated with the processor diff --git a/docs/streams/developer-guide/testing.html b/docs/streams/developer-guide/testing.html index 1b2c2cc4c27..510cf337457 100644 --- a/docs/streams/developer-guide/testing.html +++ b/docs/streams/developer-guide/testing.html @@ -284,7 +284,7 @@ processorUnderTest.init(context);</code></pre> props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); props.put("some.other.config", "some config value"); -final MockProcessorContext<String, Long> context = new MockProcessorContext<>(props); +final MockProcessorContext<String, Long> context = new MockProcessorContext<>(props);</code></pre> </p> <b>Captured data</b> <p> diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java index d6016c327fd..7037e8d7fd3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java @@ -25,6 +25,7 @@ import org.apache.kafka.streams.kstream.KGroupedTable; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; import org.apache.kafka.streams.kstream.internals.ConsumedInternal; import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder; import org.apache.kafka.streams.kstream.internals.MaterializedInternal; @@ -510,8 +511,8 @@ public class StreamsBuilder { * Adds a state store to the underlying {@link Topology}. * <p> * It is required to connect state stores to {@link org.apache.kafka.streams.processor.api.Processor Processors}, - * {@link org.apache.kafka.streams.kstream.Transformer Transformers}, - * or {@link org.apache.kafka.streams.kstream.ValueTransformer ValueTransformers} before they can be used. + * or {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...) ValueTransformers} + * before they can be used. * * @param builder the builder used to obtain this state store {@link StateStore} instance * @return itself @@ -540,8 +541,7 @@ public class StreamsBuilder { * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used. * <p> * It is not required to connect a global store to the {@link Processor Processors}, - * {@link org.apache.kafka.streams.kstream.Transformer Transformers}, - * or {@link org.apache.kafka.streams.kstream.ValueTransformer ValueTransformer}; + * or {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...) ValueTransformer}; * those have read-only access to all global stores by default. * * @param storeBuilder user defined {@link StoreBuilder}; can't be {@code null} diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java index 77dc5049c34..0ab0e0d92ac 100644 --- a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java +++ b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java @@ -32,7 +32,7 @@ import java.util.regex.Pattern; * In contrast, two sub-topologies are not connected but can be linked to each other via topics, i.e., if one * sub-topology {@link Topology#addSink(String, String, String...) writes} into a topic and another sub-topology * {@link Topology#addSource(String, String...) reads} from the same topic. - * Message {@link ProcessorContext#forward(Record, String) forwards} using custom Processors and Transformers are not considered in the topology graph. + * Message {@link ProcessorContext#forward(Record, String) forwards} using custom Processors are not considered in the topology graph. * <p> * When {@link KafkaStreams#start()} is called, different sub-topologies will be constructed and executed as independent * {@link StreamTask tasks}. diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java index be550a1f7b6..9bd16bc7857 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java @@ -16,20 +16,20 @@ */ package org.apache.kafka.streams.kstream; +import org.apache.kafka.streams.processor.api.FixedKeyProcessor; /** * The {@code ValueMapper} interface for mapping a value to a new value of arbitrary type. * This is a stateless record-by-record operation, i.e, {@link #apply(Object)} is invoked individually for each record - * of a stream (cf. {@link ValueTransformer} for stateful value transformation). - * If {@code ValueMapper} is applied to a {@link org.apache.kafka.streams.KeyValue key-value pair} record the record's + * of a stream (cf. {@link org.apache.kafka.streams.processor.api.FixedKeyProcessor} for stateful value transformation). + * If {@code ValueMapper} is applied to a {@link org.apache.kafka.streams.processor.api.Record} the record's * key is preserved. * If a record's key and value should be modified {@link KeyValueMapper} can be used. * * @param <V> value type * @param <VR> mapped value type * @see KeyValueMapper - * @see ValueTransformer - * @see ValueTransformerWithKey + * @see FixedKeyProcessor * @see KStream#mapValues(ValueMapper) * @see KStream#mapValues(ValueMapperWithKey) * @see KStream#flatMapValues(ValueMapper) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapperWithKey.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapperWithKey.java index b20c61ae682..0c315f0e0c1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapperWithKey.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapperWithKey.java @@ -19,8 +19,8 @@ package org.apache.kafka.streams.kstream; /** * The {@code ValueMapperWithKey} interface for mapping a value to a new value of arbitrary type. * This is a stateless record-by-record operation, i.e, {@link #apply(Object, Object)} is invoked individually for each - * record of a stream (cf. {@link ValueTransformer} for stateful value transformation). - * If {@code ValueMapperWithKey} is applied to a {@link org.apache.kafka.streams.KeyValue key-value pair} record the + * record of a stream (cf. {@link org.apache.kafka.streams.processor.api.FixedKeyProcessor} for stateful value transformation). + * If {@code ValueMapperWithKey} is applied to a {@link org.apache.kafka.streams.processor.api.Record} the * record's key is preserved. * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. * If a record's key and value should be modified {@link KeyValueMapper} can be used. @@ -29,8 +29,7 @@ package org.apache.kafka.streams.kstream; * @param <V> value type * @param <VR> mapped value type * @see KeyValueMapper - * @see ValueTransformer - * @see ValueTransformerWithKey + * @see org.apache.kafka.streams.processor.api.FixedKeyProcessor * @see KStream#mapValues(ValueMapper) * @see KStream#mapValues(ValueMapperWithKey) * @see KStream#flatMapValues(ValueMapper) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java index 38c27646b0f..3d057c5ce2b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java @@ -110,9 +110,10 @@ public interface ProcessorContext { <S extends StateStore> S getStateStore(final String name); /** - * Schedule a periodic operation for processors. A processor may call this method during - * {@link org.apache.kafka.streams.kstream.ValueTransformer#init(ProcessorContext) initialization} or - * {@link org.apache.kafka.streams.kstream.ValueTransformer#transform(Object) processing} to + * Schedule a periodic operation for processors. A processor may call this method during a + * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}'s + * {@link org.apache.kafka.streams.kstream.ValueTransformerWithKey#init(ProcessorContext) initialization} or + * {@link org.apache.kafka.streams.kstream.ValueTransformerWithKey#transform(Object, Object) processing} to * schedule a periodic callback — called a punctuation — to {@link Punctuator#punctuate(long)}. * The type parameter controls what notion of time is used for punctuation: * <ul> diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java index 17028abe34b..5091074d70b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java @@ -42,8 +42,7 @@ public final class ForwardingDisabledProcessorContext implements ProcessorContex private static final String EXPLANATION = "ProcessorContext#forward() is not supported from this context, " + "as the framework must ensure the key is not changed (#forward allows changing the key on " - + "messages which are sent). Try another function, which doesn't allow the key to be changed " - + "(for example - #transformValues)."; + + "messages which are sent). Use KStream.process() if you need to change the key."; public ForwardingDisabledProcessorContext(final ProcessorContext delegate) { this.delegate = Objects.requireNonNull(delegate, "delegate"); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index e526f89d78e..6a53afd07b3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -174,7 +174,7 @@ public final class ProcessorContextImpl extends AbstractProcessorContext<Object, " as the store is not connected to the processor. If you add stores manually via '.addStateStore()' " + "make sure to connect the added store to the processor by providing the processor name to " + "'.addStateStore()' or connect them via '.connectProcessorAndStateStores()'. " + - "DSL users need to provide the store name to '.process()', '.transform()', or '.transformValues()' " + + "DSL users need to provide the store name to '.process()', '.processValues()', or '.transformValues()' " + "to connect the store to the corresponding operator, or they can provide a StoreBuilder by implementing " + "the stores() method on the Supplier itself. If you do not add stores manually, " + "please file a bug report at https://issues.apache.org/jira/projects/KAFKA."); @@ -236,8 +236,8 @@ public final class ProcessorContextImpl extends AbstractProcessorContext<Object, final ProcessorNode<?, ?, ?, ?> previousNode = currentNode(); if (previousNode == null) { throw new StreamsException("Current node is unknown. This can happen if 'forward()' is called " + - "in an illegal scope. The root cause could be that a 'Processor' or 'Transformer' instance" + - " is shared. To avoid this error, make sure that your suppliers return new instances " + + "in an illegal scope. The root cause could be that a 'Processor' instance " + + "is shared. To avoid this error, make sure that your suppliers return new instances " + "each time 'get()' of Supplier is called and do not return the same object reference " + "multiple times."); }