This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 394aa74 KAFKA-6454: Allow timestamp manipulation in Processor API (#4519) 394aa74 is described below commit 394aa7426117d0d04666c1c2a63d5f98229b7894 Author: Matthias J. Sax <mj...@apache.org> AuthorDate: Fri Mar 16 16:02:11 2018 -0700 KAFKA-6454: Allow timestamp manipulation in Processor API (#4519) Reviewers: Bill Bejeck <b...@confluent.io>, Damian Guy <dam...@confluent.io>, Guozhang Wang <guozh...@confluent.io> --- docs/streams/core-concepts.html | 4 + docs/streams/developer-guide/processor-api.html | 4 + docs/streams/upgrade-guide.html | 8 + .../apache/kafka/streams/kstream/Transformer.java | 16 +- .../kafka/streams/kstream/ValueTransformer.java | 20 ++- .../streams/kstream/ValueTransformerWithKey.java | 15 +- .../streams/kstream/internals/KStreamBranch.java | 15 +- .../streams/kstream/internals/KStreamImpl.java | 15 +- .../kstream/internals/KStreamTransformValues.java | 8 + .../kafka/streams/processor/AbstractProcessor.java | 6 +- .../kafka/streams/processor/ProcessorContext.java | 39 +++-- .../org/apache/kafka/streams/processor/To.java | 68 +++++++++ .../internals/AbstractProcessorContext.java | 15 -- .../internals/GlobalProcessorContextImpl.java | 30 +++- .../processor/internals/ProcessorContextImpl.java | 67 ++++++--- .../streams/processor/internals/ProcessorNode.java | 11 +- .../internals/ProcessorRecordContext.java | 6 +- .../streams/processor/internals/RecordContext.java | 5 + .../processor/internals/StandbyContextImpl.java | 11 ++ .../{RecordContext.java => ToInternal.java} | 42 +++--- .../streams/state/internals/LRUCacheEntry.java | 7 +- .../internals/KStreamTransformValuesTest.java | 17 ++- .../internals/AbstractProcessorContextTest.java | 22 +-- .../processor/internals/ProcessorTopologyTest.java | 163 +++++++++++++-------- .../processor/internals/RecordContextStub.java | 13 +- .../apache/kafka/test/MockProcessorContext.java | 46 +++--- .../apache/kafka/test/NoOpProcessorContext.java | 6 + 27 files changed, 459 insertions(+), 220 deletions(-) diff --git a/docs/streams/core-concepts.html b/docs/streams/core-concepts.html index 2f22be7..889fe06 100644 --- a/docs/streams/core-concepts.html +++ b/docs/streams/core-concepts.html @@ -127,6 +127,10 @@ <li> For aggregations, the timestamp of a resulting aggregate update record will be that of the latest arrived input record that triggered the update.</li> </ul> + <p> + Note, that the describe default behavior can be changed in the Processor API by assigning timestamps to output records explicitly when calling <code>#forward()</code>. + </p> + <h3><a id="streams_state" href="#streams_state">States</a></h3> <p> diff --git a/docs/streams/developer-guide/processor-api.html b/docs/streams/developer-guide/processor-api.html index fdf6c86..b51bc22 100644 --- a/docs/streams/developer-guide/processor-api.html +++ b/docs/streams/developer-guide/processor-api.html @@ -77,6 +77,10 @@ its corresponding message offset, and further such information. You can also use this context instance to schedule a punctuation function (via <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code>), to forward a new record as a key-value pair to the downstream processors (via <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>), and to commit the current processing progress (via <code class="docutils literal"><span class="pre">ProcessorContext#commit()</span></code>).</p> + <p>When records are forwarded via downstream processors they also get a timestamp assigned. There are two different default behaviors: + (1) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">#process()</span></code> the output record inherits the input record timestamp. + (2) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">punctuate()</span></code></p> the output record inherits the current punctuation timestamp (either current 'stream time' or system wall-clock time). + Note, that <code class="docutils literal"><span class="pre">#forward()</span></code> also allows to change the default behavior by passing a custom timestamp for the output record.</p> <p>Specifically, <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> accepts a user <code class="docutils literal"><span class="pre">Punctuator</span></code> callback interface, which triggers its <code class="docutils literal"><span class="pre">punctuate()</span></code> API method periodically based on the <code class="docutils literal"><span class="pre">PunctuationType</span></code>. The <code class="docutils literal"><span class="pre">PunctuationType</span></code> determines what notion of time is used for the punctuation scheduling: either <a class="reference internal" href="../concepts.html#streams-concepts-time"><span class="std std-ref">stream-time</span></a> or wall-clock-time (by default, stream-time diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 46be969..baf9633 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -85,6 +85,14 @@ In addition, in <code>StreamsConfig</code> we have also added <code>default.windowed.key.serde.inner</code> and <code>default.windowed.value.serde.inner</code> to let users specify inner serdes if the default serde classes are windowed serdes. For more details, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-265%3A+Make+Windowed+Serde+to+public+APIs">KIP-265</a>. + /<p> + + <p> + Kafka 1.2.0 allows to manipulate timestamps of output records using the Processor API (<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-251%3A+Allow+timestamp+manipulation+in+Processor+API">KIP-251</a>). + To enable this new feature, <code>ProcessorContext#forward(...)</code> was modified. + The two existing overloads <code>#forward(Object key, Object value, String childName)</code> and <code>#forward(Object key, Object value, int childIndex)</code> were deprecated and a new overload <code>#forward(Object key, Object value, To to)</code> was added. + The new class <code>To</code> allows you to send records to all or specific downstream processors by name and to set the timestamp for the output record. + Forwarding based on child index is not supported in the new API any longer. </p> <h3><a id="streams_api_changes_110" href="#streams_api_changes_110">Streams API changes in 1.1.0</a></h3> diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java index 308fcad..a83b4a3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java @@ -22,6 +22,7 @@ import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TimestampExtractor; +import org.apache.kafka.streams.processor.To; /** * The {@code Transformer} interface is for stateful mapping of an input record to zero, one, or multiple new output @@ -69,9 +70,8 @@ public interface Transformer<K, V, R> { * attached} to this operator can be accessed and modified * arbitrarily (cf. {@link ProcessorContext#getStateStore(String)}). * <p> - * If more than one output record should be forwarded downstream {@link ProcessorContext#forward(Object, Object)}, - * {@link ProcessorContext#forward(Object, Object, int)}, and - * {@link ProcessorContext#forward(Object, Object, String)} can be used. + * If more than one output record should be forwarded downstream {@link ProcessorContext#forward(Object, Object)} + * and {@link ProcessorContext#forward(Object, Object, To)} can be used. * If not record should be forwarded downstream, {@code transform} can return {@code null}. * * @param key the key for the record @@ -86,9 +86,8 @@ public interface Transformer<K, V, R> { * {@link ProcessorContext#schedule(long) schedules itself} with the context during * {@link #init(ProcessorContext) initialization}. * <p> - * To generate new {@link KeyValue} pairs {@link ProcessorContext#forward(Object, Object)}, - * {@link ProcessorContext#forward(Object, Object, int)}, and - * {@link ProcessorContext#forward(Object, Object, String)} can be used. + * To generate new {@link KeyValue} pairs {@link ProcessorContext#forward(Object, Object)} and + * {@link ProcessorContext#forward(Object, Object, To)} can be used. * <p> * Note that {@code punctuate} is called based on <it>stream time</it> (i.e., time progresses with regard to * timestamps return by the used {@link TimestampExtractor}) @@ -105,9 +104,8 @@ public interface Transformer<K, V, R> { /** * Close this processor and clean up any resources. * <p> - * To generate new {@link KeyValue} pairs {@link ProcessorContext#forward(Object, Object)}, - * {@link ProcessorContext#forward(Object, Object, int)}, and - * {@link ProcessorContext#forward(Object, Object, String)} can be used. + * To generate new {@link KeyValue} pairs {@link ProcessorContext#forward(Object, Object)} and + * {@link ProcessorContext#forward(Object, Object, To)} can be used. */ void close(); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java index 1802a61..1da779e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java @@ -23,6 +23,7 @@ import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TimestampExtractor; +import org.apache.kafka.streams.processor.To; /** * The {@code ValueTransformer} interface for stateful mapping of a value to a new value (with possible new type). @@ -58,9 +59,8 @@ public interface ValueTransformer<V, VR> { * Note that {@link ProcessorContext} is updated in the background with the current record's meta data. * Thus, it only contains valid record meta data when accessed within {@link #transform(Object)}. * <p> - * Note that using {@link ProcessorContext#forward(Object, Object)}, - * {@link ProcessorContext#forward(Object, Object, int)}, or - * {@link ProcessorContext#forward(Object, Object, String)} is not allowed within any method of + * Note that using {@link ProcessorContext#forward(Object, Object)} or + * {@link ProcessorContext#forward(Object, Object, To)} is not allowed within any method of * {@code ValueTransformer} and will result in an {@link StreamsException exception}. * * @param context the context @@ -75,9 +75,8 @@ public interface ValueTransformer<V, VR> { * attached} to this operator can be accessed and modified arbitrarily (cf. * {@link ProcessorContext#getStateStore(String)}). * <p> - * Note, that using {@link ProcessorContext#forward(Object, Object)}, - * {@link ProcessorContext#forward(Object, Object, int)}, and - * {@link ProcessorContext#forward(Object, Object, String)} is not allowed within {@code transform} and + * Note, that using {@link ProcessorContext#forward(Object, Object)} or + * {@link ProcessorContext#forward(Object, Object, To)} is not allowed within {@code transform} and * will result in an {@link StreamsException exception}. * * @param value the value to be transformed @@ -90,9 +89,8 @@ public interface ValueTransformer<V, VR> { * the context during {@link #init(ProcessorContext) initialization}. * <p> * It is not possible to return any new output records within {@code punctuate}. - * Using {@link ProcessorContext#forward(Object, Object)}, {@link ProcessorContext#forward(Object, Object, int)}, - * or {@link ProcessorContext#forward(Object, Object, String)} will result in an - * {@link StreamsException exception}. + * Using {@link ProcessorContext#forward(Object, Object)} or {@link ProcessorContext#forward(Object, Object, To)} + * will result in an {@link StreamsException exception}. * Furthermore, {@code punctuate} must return {@code null}. * <p> * Note, that {@code punctuate} is called base on <it>stream time</it> (i.e., time progress with regard to @@ -111,8 +109,8 @@ public interface ValueTransformer<V, VR> { * Close this processor and clean up any resources. * <p> * It is not possible to return any new output records within {@code close()}. - * Using {@link ProcessorContext#forward(Object, Object)}, {@link ProcessorContext#forward(Object, Object, int)}, - * or {@link ProcessorContext#forward(Object, Object, String)} will result in an {@link StreamsException exception}. + * Using {@link ProcessorContext#forward(Object, Object)} or {@link ProcessorContext#forward(Object, Object, To)} + * will result in an {@link StreamsException exception}. */ void close(); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java index 128c61f..7f399b5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java @@ -22,6 +22,7 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.To; /** * The {@code ValueTransformerWithKey} interface for stateful mapping of a value to a new value (with possible new type). @@ -62,9 +63,8 @@ public interface ValueTransformerWithKey<K, V, VR> { * Note that {@link ProcessorContext} is updated in the background with the current record's meta data. * Thus, it only contains valid record meta data when accessed within {@link #transform(Object, Object)}. * <p> - * Note that using {@link ProcessorContext#forward(Object, Object)}, - * {@link ProcessorContext#forward(Object, Object, int)}, or - * {@link ProcessorContext#forward(Object, Object, String)} is not allowed within any method of + * Note that using {@link ProcessorContext#forward(Object, Object)} or + * {@link ProcessorContext#forward(Object, Object, To)} is not allowed within any method of * {@code ValueTransformerWithKey} and will result in an {@link StreamsException exception}. * * @param context the context @@ -79,9 +79,8 @@ public interface ValueTransformerWithKey<K, V, VR> { * attached} to this operator can be accessed and modified arbitrarily (cf. * {@link ProcessorContext#getStateStore(String)}). * <p> - * Note, that using {@link ProcessorContext#forward(Object, Object)}, - * {@link ProcessorContext#forward(Object, Object, int)}, and - * {@link ProcessorContext#forward(Object, Object, String)} is not allowed within {@code transform} and + * Note, that using {@link ProcessorContext#forward(Object, Object)} or + * {@link ProcessorContext#forward(Object, Object, To)} is not allowed within {@code transform} and * will result in an {@link StreamsException exception}. * * @param readOnlyKey the read-only key @@ -94,8 +93,8 @@ public interface ValueTransformerWithKey<K, V, VR> { * Close this processor and clean up any resources. * <p> * It is not possible to return any new output records within {@code close()}. - * Using {@link ProcessorContext#forward(Object, Object)}, {@link ProcessorContext#forward(Object, Object, int)}, - * or {@link ProcessorContext#forward(Object, Object, String)} will result in an {@link StreamsException exception}. + * Using {@link ProcessorContext#forward(Object, Object)} or {@link ProcessorContext#forward(Object, Object, To)}, + * will result in an {@link StreamsException exception}. */ void close(); } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java index 317c5bf..baa9b63 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java @@ -16,18 +16,21 @@ */ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorSupplier; -import org.apache.kafka.streams.kstream.Predicate; +import org.apache.kafka.streams.processor.To; class KStreamBranch<K, V> implements ProcessorSupplier<K, V> { private final Predicate<K, V>[] predicates; + private final String[] childNodes; - @SuppressWarnings("unchecked") - public KStreamBranch(Predicate<K, V> ... predicates) { + KStreamBranch(final Predicate<K, V>[] predicates, + final String[] childNodes) { this.predicates = predicates; + this.childNodes = childNodes; } @Override @@ -37,12 +40,12 @@ class KStreamBranch<K, V> implements ProcessorSupplier<K, V> { private class KStreamBranchProcessor extends AbstractProcessor<K, V> { @Override - public void process(K key, V value) { + public void process(final K key, final V value) { for (int i = 0; i < predicates.length; i++) { if (predicates[i].test(key, value)) { - // use forward with childIndex here and then break the loop + // use forward with child here and then break the loop // so that no record is going to be piped to multiple streams - context().forward(key, value, i); + context().forward(key, value, To.child(childNodes[i])); break; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 07bc67d..349be86 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -358,17 +358,20 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V for (final Predicate<? super K, ? super V> predicate : predicates) { Objects.requireNonNull(predicate, "predicates can't have null values"); } - String branchName = builder.newProcessorName(BRANCH_NAME); - builder.internalTopologyBuilder.addProcessor(branchName, new KStreamBranch(predicates.clone()), this.name); + String branchName = builder.newProcessorName(BRANCH_NAME); - KStream<K, V>[] branchChildren = (KStream<K, V>[]) Array.newInstance(KStream.class, predicates.length); + String[] childNames = new String[predicates.length]; for (int i = 0; i < predicates.length; i++) { - String childName = builder.newProcessorName(BRANCHCHILD_NAME); + childNames[i] = builder.newProcessorName(BRANCHCHILD_NAME); + } - builder.internalTopologyBuilder.addProcessor(childName, new KStreamPassThrough<K, V>(), branchName); + builder.internalTopologyBuilder.addProcessor(branchName, new KStreamBranch(predicates.clone(), childNames), this.name); - branchChildren[i] = new KStreamImpl<>(builder, childName, sourceNodes, this.repartitionRequired); + KStream<K, V>[] branchChildren = (KStream<K, V>[]) Array.newInstance(KStream.class, predicates.length); + for (int i = 0; i < predicates.length; i++) { + builder.internalTopologyBuilder.addProcessor(childNames[i], new KStreamPassThrough<K, V>(), branchName); + branchChildren[i] = new KStreamImpl<>(builder, childNames[i], sourceNodes, this.repartitionRequired); } return branchChildren; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java index ace4f69..e644597 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java @@ -28,6 +28,7 @@ import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.To; import java.io.File; import java.util.Map; @@ -117,10 +118,17 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V> } @Override + public <K, V> void forward(final K key, final V value, final To to) { + throw new StreamsException("ProcessorContext#forward() must not be called within TransformValues."); + } + + @SuppressWarnings("deprecation") + @Override public <K, V> void forward(final K key, final V value, final int childIndex) { throw new StreamsException("ProcessorContext#forward() must not be called within TransformValues."); } + @SuppressWarnings("deprecation") @Override public <K, V> void forward(final K key, final V value, final String childName) { throw new StreamsException("ProcessorContext#forward() must not be called within TransformValues."); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java index 1cfe78a..14e6c2a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java @@ -31,7 +31,7 @@ public abstract class AbstractProcessor<K, V> implements Processor<K, V> { } @Override - public void init(ProcessorContext context) { + public void init(final ProcessorContext context) { this.context = context; } @@ -46,7 +46,7 @@ public abstract class AbstractProcessor<K, V> implements Processor<K, V> { */ @SuppressWarnings("deprecation") @Override - public void punctuate(long timestamp) { + public void punctuate(final long timestamp) { // do nothing } @@ -67,6 +67,6 @@ public abstract class AbstractProcessor<K, V> implements Processor<K, V> { * @return the processor context; null only when called prior to {@link #init(ProcessorContext) initialization}. */ protected final ProcessorContext context() { - return this.context; + return context; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java index 42902a8..404b225 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java @@ -83,7 +83,9 @@ public interface ProcessorContext { * @throws IllegalStateException If store gets registered after initialized is already finished * @throws StreamsException if the store's change log does not contain the partition */ - void register(StateStore store, boolean loggingEnabledIsDeprecatedAndIgnored, StateRestoreCallback stateRestoreCallback); + void register(final StateStore store, + final boolean loggingEnabledIsDeprecatedAndIgnored, + final StateRestoreCallback stateRestoreCallback); /** * Get the state store given the store name. @@ -91,7 +93,7 @@ public interface ProcessorContext { * @param name The store name * @return The state store instance */ - StateStore getStateStore(String name); + StateStore getStateStore(final String name); /** * Schedules a periodic operation for processors. A processor may call this method during @@ -125,7 +127,9 @@ public interface ProcessorContext { * @param callback a function consuming timestamps representing the current stream or system time * @return a handle allowing cancellation of the punctuation schedule established by this method */ - Cancellable schedule(long intervalMs, PunctuationType type, Punctuator callback); + Cancellable schedule(final long intervalMs, + final PunctuationType type, + final Punctuator callback); /** * Schedules a periodic operation for processors. A processor may call this method during @@ -137,30 +141,47 @@ public interface ProcessorContext { * @param interval the time interval between punctuations */ @Deprecated - void schedule(long interval); + void schedule(final long interval); /** - * Forwards a key/value pair to the downstream processors + * Forwards a key/value pair to all downstream processors. + * Used the input record's timestamp as timestamp for the output record. + * * @param key key * @param value value */ - <K, V> void forward(K key, V value); + <K, V> void forward(final K key, final V value); + + /** + * Forwards a key/value pair to the specified downstream processors. + * Can be used to set the timestamp of the output record. + * + * @param key key + * @param value value + * @param to the options to use when forwarding + */ + <K, V> void forward(final K key, final V value, final To to); /** * Forwards a key/value pair to one of the downstream processors designated by childIndex * @param key key * @param value value * @param childIndex index in list of children of this node + * @deprecated please use {@link #forward(Object, Object, To)} instead */ - <K, V> void forward(K key, V value, int childIndex); + // TODO when we remove this method, we can also remove `ProcessorNode#children` + @Deprecated + <K, V> void forward(final K key, final V value, final int childIndex); /** * Forwards a key/value pair to one of the downstream processors designated by the downstream processor name * @param key key * @param value value * @param childName name of downstream processor + * @deprecated please use {@link #forward(Object, Object, To)} instead */ - <K, V> void forward(K key, V value, String childName); + @Deprecated + <K, V> void forward(final K key, final V value, final String childName); /** * Requests a commit @@ -231,6 +252,6 @@ public interface ProcessorContext { * @return the key/values matching the given prefix from the StreamsConfig properties. * */ - Map<String, Object> appConfigsWithPrefix(String prefix); + Map<String, Object> appConfigsWithPrefix(final String prefix); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/To.java b/streams/src/main/java/org/apache/kafka/streams/processor/To.java new file mode 100644 index 0000000..52007df --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/To.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor; + +/** + * This class is used to provide the optional parameters when sending output records to downstream processor + * using {@link ProcessorContext#forward(Object, Object, To)}. + */ +public class To { + protected String childName; + protected long timestamp; + + private To(final String childName, + final long timestamp) { + this.childName = childName; + this.timestamp = timestamp; + } + + protected To(final To to) { + this(to.childName, to.timestamp); + } + + protected void update(final To to) { + childName = to.childName; + timestamp = to.timestamp; + } + + /** + * Forward the key/value pair to one of the downstream processors designated by the downstream processor name. + * @param childName name of downstream processor + * @return a new {@link To} instance configured with {@code childName} + */ + public static To child(final String childName) { + return new To(childName, -1); + } + + /** + * Forward the key/value pair to all downstream processors + * @return a new {@link To} instance configured for all downstream processor + */ + public static To all() { + return new To((String) null, -1); + } + + /** + * Set the timestamp of the output record. + * @param timestamp the output record timestamp + * @return itself (i.e., {@code this}) + */ + public To withTimestamp(final long timestamp) { + this.timestamp = timestamp; + return this; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java index e9b5a4c..87408c6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java @@ -26,7 +26,6 @@ import org.apache.kafka.streams.state.internals.ThreadCache; import java.io.File; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Objects; @@ -164,20 +163,6 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte return combined; } - @SuppressWarnings("unchecked") - @Override - public <K, V> void forward(final K key, final V value) { - final ProcessorNode previousNode = currentNode(); - try { - for (final ProcessorNode child : (List<ProcessorNode>) currentNode().children()) { - setCurrentNode(child); - child.process(key, value); - } - } finally { - setCurrentNode(previousNode); - } - } - @Override public Map<String, Object> appConfigsWithPrefix(final String prefix) { return config.originalsWithPrefix(prefix); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java index 37e7cb5..88d9f56 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java @@ -23,8 +23,11 @@ import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.state.internals.ThreadCache; +import java.util.List; + public class GlobalProcessorContextImpl extends AbstractProcessorContext { @@ -40,20 +43,43 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext { return stateManager.getGlobalStore(name); } + @SuppressWarnings("unchecked") + @Override + public <K, V> void forward(final K key, final V value) { + final ProcessorNode previousNode = currentNode(); + try { + for (final ProcessorNode child : (List<ProcessorNode<K, V>>) currentNode().children()) { + setCurrentNode(child); + child.process(key, value); + } + } finally { + setCurrentNode(previousNode); + } + } + /** * @throws UnsupportedOperationException on every invocation */ @Override - public <K, V> void forward(K key, V value, int childIndex) { + public <K, V> void forward(final K key, final V value, final To to) { throw new UnsupportedOperationException("this should not happen: forward() not supported in global processor context."); } + /** + * @throws UnsupportedOperationException on every invocation + */ + @SuppressWarnings("deprecation") + @Override + public <K, V> void forward(final K key, final V value, final int childIndex) { + throw new UnsupportedOperationException("this should not happen: forward() not supported in global processor context."); + } /** * @throws UnsupportedOperationException on every invocation */ + @SuppressWarnings("deprecation") @Override - public <K, V> void forward(K key, V value, String childName) { + public <K, V> void forward(final K key, final V value, final String childName) { throw new UnsupportedOperationException("this should not happen: forward() not supported in global processor context."); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 42d3d70..3761bfb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -18,12 +18,13 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.Cancellable; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.state.internals.ThreadCache; import java.util.List; @@ -32,6 +33,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re private final StreamTask task; private final RecordCollector collector; + private final ToInternal toInternal = new ToInternal(); + private final static To SEND_TO_ALL = To.all(); ProcessorContextImpl(final TaskId id, final StreamTask task, @@ -77,32 +80,60 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re @SuppressWarnings("unchecked") @Override + public <K, V> void forward(final K key, final V value) { + forward(key, value, SEND_TO_ALL); + } + + @SuppressWarnings({"unchecked", "deprecation"}) + @Override public <K, V> void forward(final K key, final V value, final int childIndex) { + forward(key, value, To.child(((List<ProcessorNode>) currentNode().children()).get(childIndex).name())); + } + + @SuppressWarnings({"unchecked", "deprecation"}) + @Override + public <K, V> void forward(final K key, final V value, final String childName) { + forward(key, value, To.child(childName)); + } + + @SuppressWarnings("unchecked") + @Override + public <K, V> void forward(final K key, final V value, final To to) { + toInternal.update(to); + if (toInternal.hasTimestamp()) { + recordContext.setTimestamp(toInternal.timestamp()); + } final ProcessorNode previousNode = currentNode(); - final ProcessorNode child = (ProcessorNode<K, V>) currentNode().children().get(childIndex); - setCurrentNode(child); try { - child.process(key, value); + final List<ProcessorNode<K, V>> children = (List<ProcessorNode<K, V>>) currentNode().children(); + final String sendTo = toInternal.child(); + if (sendTo != null) { + final ProcessorNode child = currentNode().getChild(sendTo); + if (child == null) { + throw new StreamsException("Unknown processor name: " + sendTo); + } + forward(child, key, value); + } else { + if (children.size() == 1) { + final ProcessorNode child = children.get(0); + forward(child, key, value); + } else { + for (final ProcessorNode child : children) { + forward(child, key, value); + } + } + } } finally { setCurrentNode(previousNode); } } @SuppressWarnings("unchecked") - @Override - public <K, V> void forward(final K key, final V value, final String childName) { - for (ProcessorNode child : (List<ProcessorNode<K, V>>) currentNode().children()) { - if (child.name().equals(childName)) { - ProcessorNode previousNode = currentNode(); - setCurrentNode(child); - try { - child.process(key, value); - return; - } finally { - setCurrentNode(previousNode); - } - } - } + private <K, V> void forward(final ProcessorNode child, + final K key, + final V value) { + setCurrentNode(child); + child.process(key, value); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java index 29f442f..94e8640 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java @@ -26,12 +26,16 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.Punctuator; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; public class ProcessorNode<K, V> { + // TODO: 'children' can be removed when #forward() via index is removed private final List<ProcessorNode<?, ?>> children; + private final Map<String, ProcessorNode<?, ?>> childByName; private final String name; private final Processor<K, V> processor; @@ -75,6 +79,7 @@ public class ProcessorNode<K, V> { this.name = name; this.processor = processor; this.children = new ArrayList<>(); + this.childByName = new HashMap<>(); this.stateStores = stateStores; this.time = new SystemTime(); } @@ -92,11 +97,15 @@ public class ProcessorNode<K, V> { return children; } + public final ProcessorNode getChild(final String childName) { + return childByName.get(childName); + } + public void addChild(ProcessorNode<?, ?> child) { children.add(child); + childByName.put(child.name, child); } - public void init(ProcessorContext context) { this.context = context; try { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java index aa20103..92acfc9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java @@ -20,7 +20,7 @@ import java.util.Objects; public class ProcessorRecordContext implements RecordContext { - private final long timestamp; + private long timestamp; private final long offset; private final String topic; private final int partition; @@ -44,6 +44,10 @@ public class ProcessorRecordContext implements RecordContext { return timestamp; } + public void setTimestamp(final long timestamp) { + this.timestamp = timestamp; + } + @Override public String topic() { return topic; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java index dc752cb..dd58f4c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java @@ -34,6 +34,11 @@ public interface RecordContext { long timestamp(); /** + * Sets a new timestamp for the output record. + */ + void setTimestamp(final long timestamp); + + /** * @return The topic the record was received on */ String topic(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java index e38b821..360c4ab 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java @@ -27,6 +27,7 @@ import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.state.internals.ThreadCache; import java.util.Collections; @@ -142,6 +143,15 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle * @throws UnsupportedOperationException on every invocation */ @Override + public <K, V> void forward(final K key, final V value, final To to) { + throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks."); + } + + /** + * @throws UnsupportedOperationException on every invocation + */ + @SuppressWarnings("deprecation") + @Override public <K, V> void forward(final K key, final V value, final int childIndex) { throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks."); } @@ -149,6 +159,7 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle /** * @throws UnsupportedOperationException on every invocation */ + @SuppressWarnings("deprecation") @Override public <K, V> void forward(final K key, final V value, final String childName) { throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks."); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ToInternal.java similarity index 59% copy from streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java copy to streams/src/main/java/org/apache/kafka/streams/processor/internals/ToInternal.java index dc752cb..6c5798e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ToInternal.java @@ -16,30 +16,26 @@ */ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.To; -/** - * The context associated with the current record being processed by - * an {@link Processor} - */ -public interface RecordContext { - /** - * @return The offset of the original record received from Kafka - */ - long offset(); +public class ToInternal extends To { + public ToInternal() { + super(To.all()); + } + + public void update(final To to) { + super.update(to); + } - /** - * @return The timestamp extracted from the record received from Kafka - */ - long timestamp(); + public boolean hasTimestamp() { + return timestamp != -1; + } - /** - * @return The topic the record was received on - */ - String topic(); + public long timestamp() { + return timestamp; + } - /** - * @return The partition the record was received on - */ - int partition(); -} + public String child() { + return childName; + } +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java index dedb906..af7059b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java @@ -27,7 +27,7 @@ class LRUCacheEntry implements RecordContext { private final long offset; private final String topic; private final int partition; - private final long timestamp; + private long timestamp; private long sizeBytes; private boolean isDirty; @@ -64,6 +64,11 @@ class LRUCacheEntry implements RecordContext { } @Override + public void setTimestamp(final long timestamp) { + throw new UnsupportedOperationException(); + } + + @Override public String topic() { return topic; } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java index 1b34fab..dc0b886 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java @@ -23,18 +23,19 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.ValueTransformer; -import org.apache.kafka.streams.kstream.ValueTransformerWithKey; import org.apache.kafka.streams.kstream.ValueTransformerSupplier; +import org.apache.kafka.streams.kstream.ValueTransformerWithKey; import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.To; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorSupplier; import org.junit.Rule; import org.junit.Test; -import static org.junit.Assert.fail; import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.fail; public class KStreamTransformValuesTest { @@ -192,6 +193,13 @@ public class KStreamTransformValuesTest { } try { + transformValueProcessor.process(null, 3); + fail("should not allow call to context.forward() within ValueTransformer"); + } catch (final StreamsException e) { + // expected + } + + try { transformValueProcessor.punctuate(0); fail("should not allow ValueTransformer#puntuate() to return not-null value"); } catch (final StreamsException e) { @@ -213,11 +221,14 @@ public class KStreamTransformValuesTest { context.forward(null, null); } if (value == 1) { - context.forward(null, null, null); + context.forward(null, null, (String) null); } if (value == 2) { context.forward(null, null, 0); } + if (value == 3) { + context.forward(null, null, To.all()); + } throw new RuntimeException("Should never happen in this test"); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java index 46c23c6..aac275d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.state.RocksDBConfigSetter; import org.apache.kafka.streams.state.internals.ThreadCache; import org.apache.kafka.test.MockStateStore; @@ -176,28 +177,21 @@ public class AbstractProcessorContextTest { } @Override - public void schedule(final long interval) { - - } + public void schedule(final long interval) {} @Override - public <K, V> void forward(final K key, final V value) { - - } + public <K, V> void forward(final K key, final V value) {} @Override - public <K, V> void forward(final K key, final V value, final int childIndex) { - - } + public <K, V> void forward(final K key, final V value, final To to) {} @Override - public <K, V> void forward(final K key, final V value, final String childName) { - - } + public <K, V> void forward(final K key, final V value, final int childIndex) {} @Override - public void commit() { + public <K, V> void forward(final K key, final V value, final String childName) {} - } + @Override + public void commit() {} } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index da2e5dc..d07274a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -31,6 +31,7 @@ import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TimestampExtractor; +import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; @@ -72,8 +73,8 @@ public class ProcessorTopologyTest { @Before public void setup() { // Create a new directory in which we'll put all of the state for this test, enabling running tests in parallel ... - File localState = TestUtils.tempDirectory(); - Properties props = new Properties(); + final File localState = TestUtils.tempDirectory(); + final Properties props = new Properties(); props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "processor-topology-test"); props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091"); props.setProperty(StreamsConfig.STATE_DIR_CONFIG, localState.getAbsolutePath()); @@ -120,8 +121,8 @@ public class ProcessorTopologyTest { } @Test - public void testDrivingSimpleTopology() throws Exception { - int partition = 10; + public void testDrivingSimpleTopology() { + final int partition = 10; driver = new ProcessorTopologyTestDriver(config, createSimpleTopology(partition).internalTopologyBuilder); driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER); assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition); @@ -142,7 +143,7 @@ public class ProcessorTopologyTest { @Test - public void testDrivingMultiplexingTopology() throws Exception { + public void testDrivingMultiplexingTopology() { driver = new ProcessorTopologyTestDriver(config, createMultiplexingTopology().internalTopologyBuilder); driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER); assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)"); @@ -164,7 +165,7 @@ public class ProcessorTopologyTest { } @Test - public void testDrivingMultiplexByNameTopology() throws Exception { + public void testDrivingMultiplexByNameTopology() { driver = new ProcessorTopologyTestDriver(config, createMultiplexByNameTopology().internalTopologyBuilder); driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER); assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)"); @@ -186,8 +187,8 @@ public class ProcessorTopologyTest { } @Test - public void testDrivingStatefulTopology() throws Exception { - String storeName = "entries"; + public void testDrivingStatefulTopology() { + final String storeName = "entries"; driver = new ProcessorTopologyTestDriver(config, createStatefulTopology(storeName).internalTopologyBuilder); driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER); driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER); @@ -195,7 +196,7 @@ public class ProcessorTopologyTest { driver.process(INPUT_TOPIC_1, "key1", "value4", STRING_SERIALIZER, STRING_SERIALIZER); assertNoOutputRecord(OUTPUT_TOPIC_1); - KeyValueStore<String, String> store = driver.getKeyValueStore("entries"); + final KeyValueStore<String, String> store = driver.getKeyValueStore(storeName); assertEquals("value4", store.get("key1")); assertEquals("value2", store.get("key2")); assertEquals("value3", store.get("key3")); @@ -205,15 +206,16 @@ public class ProcessorTopologyTest { @SuppressWarnings("unchecked") @Test public void shouldDriveGlobalStore() { - final StateStoreSupplier storeSupplier = Stores.create("my-store") + final String storeName = "my-store"; + final StateStoreSupplier storeSupplier = Stores.create(storeName) .withStringKeys().withStringValues().inMemory().disableLogging().build(); final String global = "global"; final String topic = "topic"; final TopologyBuilder topologyBuilder = this.builder - .addGlobalStore(storeSupplier, global, STRING_DESERIALIZER, STRING_DESERIALIZER, topic, "processor", define(new StatefulProcessor("my-store"))); + .addGlobalStore(storeSupplier, global, STRING_DESERIALIZER, STRING_DESERIALIZER, topic, "processor", define(new StatefulProcessor(storeName))); driver = new ProcessorTopologyTestDriver(config, topologyBuilder.internalTopologyBuilder); - final KeyValueStore<String, String> globalStore = (KeyValueStore<String, String>) topologyBuilder.globalStateStores().get("my-store"); + final KeyValueStore<String, String> globalStore = (KeyValueStore<String, String>) topologyBuilder.globalStateStores().get(storeName); driver.process(topic, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER); driver.process(topic, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER); assertEquals("value1", globalStore.get("key1")); @@ -221,8 +223,8 @@ public class ProcessorTopologyTest { } @Test - public void testDrivingSimpleMultiSourceTopology() throws Exception { - int partition = 10; + public void testDrivingSimpleMultiSourceTopology() { + final int partition = 10; driver = new ProcessorTopologyTestDriver(config, createSimpleMultiSourceTopology(partition).internalTopologyBuilder); driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER); @@ -235,7 +237,7 @@ public class ProcessorTopologyTest { } @Test - public void testDrivingForwardToSourceTopology() throws Exception { + public void testDrivingForwardToSourceTopology() { driver = new ProcessorTopologyTestDriver(config, createForwardToSourceTopology().internalTopologyBuilder); driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER); driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER); @@ -246,7 +248,7 @@ public class ProcessorTopologyTest { } @Test - public void testDrivingInternalRepartitioningTopology() throws Exception { + public void testDrivingInternalRepartitioningTopology() { driver = new ProcessorTopologyTestDriver(config, createInternalRepartitioningTopology().internalTopologyBuilder); driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER); driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER); @@ -257,7 +259,7 @@ public class ProcessorTopologyTest { } @Test - public void testDrivingInternalRepartitioningForwardingTimestampTopology() throws Exception { + public void testDrivingInternalRepartitioningForwardingTimestampTopology() { driver = new ProcessorTopologyTestDriver(config, createInternalRepartitioningWithValueTimestampTopology().internalTopologyBuilder); driver.process(INPUT_TOPIC_1, "key1", "value1@1000", STRING_SERIALIZER, STRING_SERIALIZER); driver.process(INPUT_TOPIC_1, "key2", "value2@2000", STRING_SERIALIZER, STRING_SERIALIZER); @@ -315,7 +317,7 @@ public class ProcessorTopologyTest { } @Test - public void shouldConsiderTimeStamps() throws Exception { + public void shouldConsiderTimeStamps() { final int partition = 10; driver = new ProcessorTopologyTestDriver(config, createSimpleTopology(partition).internalTopologyBuilder); driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER, 10L); @@ -326,6 +328,17 @@ public class ProcessorTopologyTest { assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", partition, 30L); } + @Test + public void shouldConsiderModifiedTimeStamps() { + final int partition = 10; + driver = new ProcessorTopologyTestDriver(config, createTimestampTopology(partition).internalTopologyBuilder); + driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER, 10L); + driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER, 20L); + driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER, 30L); + assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition, 20L); + assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", partition, 30L); + assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", partition, 40L); + } private void assertNextOutputRecord(final String topic, final String key, @@ -345,7 +358,7 @@ public class ProcessorTopologyTest { final String value, final Integer partition, final Long timestamp) { - ProducerRecord<String, String> record = driver.readOutput(topic, STRING_DESERIALIZER, STRING_DESERIALIZER); + final ProducerRecord<String, String> record = driver.readOutput(topic, STRING_DESERIALIZER, STRING_DESERIALIZER); assertEquals(topic, record.topic()); assertEquals(key, record.key()); assertEquals(value, record.value()); @@ -353,51 +366,63 @@ public class ProcessorTopologyTest { assertEquals(timestamp, record.timestamp()); } - private void assertNoOutputRecord(String topic) { + private void assertNoOutputRecord(final String topic) { assertNull(driver.readOutput(topic)); } private StreamPartitioner<Object, Object> constantPartitioner(final Integer partition) { return new StreamPartitioner<Object, Object>() { @Override - public Integer partition(Object key, Object value, int numPartitions) { + public Integer partition(final Object key, final Object value, final int numPartitions) { return partition; } }; } - private TopologyBuilder createSimpleTopology(int partition) { - return builder.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) - .addProcessor("processor", define(new ForwardingProcessor()), "source") - .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor"); + private TopologyBuilder createSimpleTopology(final int partition) { + return builder + .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) + .addProcessor("processor", define(new ForwardingProcessor()), "source") + .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor"); + } + + private TopologyBuilder createTimestampTopology(final int partition) { + return builder + .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) + .addProcessor("processor", define(new TimestampProcessor()), "source") + .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor"); } private TopologyBuilder createMultiplexingTopology() { - return builder.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) - .addProcessor("processor", define(new MultiplexingProcessor(2)), "source") - .addSink("sink1", OUTPUT_TOPIC_1, "processor") - .addSink("sink2", OUTPUT_TOPIC_2, "processor"); + return builder + .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) + .addProcessor("processor", define(new MultiplexingProcessor(2)), "source") + .addSink("sink1", OUTPUT_TOPIC_1, "processor") + .addSink("sink2", OUTPUT_TOPIC_2, "processor"); } private TopologyBuilder createMultiplexByNameTopology() { - return builder.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) + return builder + .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) .addProcessor("processor", define(new MultiplexByNameProcessor(2)), "source") .addSink("sink0", OUTPUT_TOPIC_1, "processor") .addSink("sink1", OUTPUT_TOPIC_2, "processor"); } - private TopologyBuilder createStatefulTopology(String storeName) { - return builder.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) - .addProcessor("processor", define(new StatefulProcessor(storeName)), "source") - .addStateStore( - Stores.create(storeName).withStringKeys().withStringValues().inMemory().build(), - "processor" - ) - .addSink("counts", OUTPUT_TOPIC_1, "processor"); + private TopologyBuilder createStatefulTopology(final String storeName) { + return builder + .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) + .addProcessor("processor", define(new StatefulProcessor(storeName)), "source") + .addStateStore( + Stores.create(storeName).withStringKeys().withStringValues().inMemory().build(), + "processor" + ) + .addSink("counts", OUTPUT_TOPIC_1, "processor"); } private TopologyBuilder createInternalRepartitioningTopology() { - return builder.addSource("source", INPUT_TOPIC_1) + return builder + .addSource("source", INPUT_TOPIC_1) .addInternalTopic(THROUGH_TOPIC_1) .addSink("sink0", THROUGH_TOPIC_1, "source") .addSource("source1", THROUGH_TOPIC_1) @@ -405,12 +430,13 @@ public class ProcessorTopologyTest { } private TopologyBuilder createInternalRepartitioningWithValueTimestampTopology() { - return builder.addSource("source", INPUT_TOPIC_1) - .addInternalTopic(THROUGH_TOPIC_1) - .addProcessor("processor", define(new ValueTimestampProcessor()), "source") - .addSink("sink0", THROUGH_TOPIC_1, "processor") - .addSource("source1", THROUGH_TOPIC_1) - .addSink("sink1", OUTPUT_TOPIC_1, "source1"); + return builder + .addSource("source", INPUT_TOPIC_1) + .addInternalTopic(THROUGH_TOPIC_1) + .addProcessor("processor", define(new ValueTimestampProcessor()), "source") + .addSink("sink0", THROUGH_TOPIC_1, "processor") + .addSource("source1", THROUGH_TOPIC_1) + .addSink("sink1", OUTPUT_TOPIC_1, "source1"); } private TopologyBuilder createForwardToSourceTopology() { @@ -434,26 +460,34 @@ public class ProcessorTopologyTest { * A processor that simply forwards all messages to all children. */ protected static class ForwardingProcessor extends AbstractProcessor<String, String> { - @Override - public void process(String key, String value) { + public void process(final String key, final String value) { context().forward(key, value); } @Override - public void punctuate(long streamTime) { + public void punctuate(final long streamTime) { context().forward(Long.toString(streamTime), "punctuate"); } } /** + * A processor that simply forwards all messages to all children with advanced timestamps. + */ + protected static class TimestampProcessor extends AbstractProcessor<String, String> { + @Override + public void process(final String key, final String value) { + context().forward(key, value, To.all().withTimestamp(context().timestamp() + 10)); + } + } + + /** * A processor that removes custom timestamp information from messages and forwards modified messages to each child. * A message contains custom timestamp information if the value is in ".*@[0-9]+" format. */ protected static class ValueTimestampProcessor extends AbstractProcessor<String, String> { - @Override - public void process(String key, String value) { + public void process(final String key, final String value) { context().forward(key, value.split("@")[0]); } } @@ -462,22 +496,23 @@ public class ProcessorTopologyTest { * A processor that forwards slightly-modified messages to each child. */ protected static class MultiplexingProcessor extends AbstractProcessor<String, String> { - private final int numChildren; - public MultiplexingProcessor(int numChildren) { + MultiplexingProcessor(final int numChildren) { this.numChildren = numChildren; } + @SuppressWarnings("deprecation") @Override - public void process(String key, String value) { + public void process(final String key, final String value) { for (int i = 0; i != numChildren; ++i) { context().forward(key, value + "(" + (i + 1) + ")", i); } } + @SuppressWarnings("deprecation") @Override - public void punctuate(long streamTime) { + public void punctuate(final long streamTime) { for (int i = 0; i != numChildren; ++i) { context().forward(Long.toString(streamTime), "punctuate(" + (i + 1) + ")", i); } @@ -489,22 +524,23 @@ public class ProcessorTopologyTest { * Note: the children are assumed to be named "sink{child number}", e.g., sink1, or sink2, etc. */ protected static class MultiplexByNameProcessor extends AbstractProcessor<String, String> { - private final int numChildren; - public MultiplexByNameProcessor(int numChildren) { + MultiplexByNameProcessor(final int numChildren) { this.numChildren = numChildren; } + @SuppressWarnings("deprecation") @Override - public void process(String key, String value) { + public void process(final String key, final String value) { for (int i = 0; i != numChildren; ++i) { - context().forward(key, value + "(" + (i + 1) + ")", "sink" + i); + context().forward(key, value + "(" + (i + 1) + ")", "sink" + i); } } + @SuppressWarnings("deprecation") @Override - public void punctuate(long streamTime) { + public void punctuate(final long streamTime) { for (int i = 0; i != numChildren; ++i) { context().forward(Long.toString(streamTime), "punctuate(" + (i + 1) + ")", "sink" + i); } @@ -516,28 +552,27 @@ public class ProcessorTopologyTest { * {@link #punctuate(long)} is called, it outputs the total number of entries in the store. */ protected static class StatefulProcessor extends AbstractProcessor<String, String> { - private KeyValueStore<String, String> store; private final String storeName; - public StatefulProcessor(String storeName) { + StatefulProcessor(final String storeName) { this.storeName = storeName; } @Override @SuppressWarnings("unchecked") - public void init(ProcessorContext context) { + public void init(final ProcessorContext context) { super.init(context); store = (KeyValueStore<String, String>) context.getStateStore(storeName); } @Override - public void process(String key, String value) { + public void process(final String key, final String value) { store.put(key, value); } @Override - public void punctuate(long streamTime) { + public void punctuate(final long streamTime) { int count = 0; try (KeyValueIterator<String, String> iter = store.all()) { while (iter.hasNext()) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java index 7932d1f..0af5e17 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java @@ -19,14 +19,18 @@ package org.apache.kafka.streams.processor.internals; public class RecordContextStub implements RecordContext { private final long offset; - private final long timestamp; + private long timestamp; private final int partition; private final String topic; public RecordContextStub() { this(-1, -1, -1, ""); } - public RecordContextStub(final long offset, final long timestamp, final int partition, final String topic) { + + public RecordContextStub(final long offset, + final long timestamp, + final int partition, + final String topic) { this.offset = offset; this.timestamp = timestamp; this.partition = partition; @@ -44,6 +48,11 @@ public class RecordContextStub implements RecordContext { } @Override + public void setTimestamp(final long timestamp) { + this.timestamp = timestamp; + } + + @Override public String topic() { return topic; } diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java index 06137fb..6b0cb66 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java @@ -28,12 +28,14 @@ import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.processor.internals.AbstractProcessorContext; import org.apache.kafka.streams.processor.internals.CompositeRestoreListener; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.RecordCollector; +import org.apache.kafka.streams.processor.internals.ToInternal; import org.apache.kafka.streams.processor.internals.WrappedBatchingStateRestoreCallback; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.internals.ThreadCache; @@ -53,6 +55,7 @@ public class MockProcessorContext extends AbstractProcessorContext implements Re private final RecordCollector.Supplier recordCollectorSupplier; private final Map<String, StateStore> storeMap = new LinkedHashMap<>(); private final Map<String, StateRestoreCallback> restoreFuncs = new HashMap<>(); + private final ToInternal toInternal = new ToInternal(); private Serde<?> keySerde; private Serde<?> valSerde; @@ -179,44 +182,39 @@ public class MockProcessorContext extends AbstractProcessorContext implements Re @Override @SuppressWarnings("unchecked") public <K, V> void forward(final K key, final V value) { - final ProcessorNode thisNode = currentNode; - for (final ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) { - currentNode = childNode; - try { - childNode.process(key, value); - } finally { - currentNode = thisNode; - } - } + forward(key, value, To.all()); } @Override @SuppressWarnings("unchecked") public <K, V> void forward(final K key, final V value, final int childIndex) { - final ProcessorNode thisNode = currentNode; - final ProcessorNode childNode = (ProcessorNode<K, V>) thisNode.children().get(childIndex); - currentNode = childNode; - try { - childNode.process(key, value); - } finally { - currentNode = thisNode; - } + forward(key, value, To.child(((List<ProcessorNode>) currentNode().children()).get(childIndex).name())); } @Override @SuppressWarnings("unchecked") public <K, V> void forward(final K key, final V value, final String childName) { + forward(key, value, To.child(childName)); + } + + @SuppressWarnings("unchecked") + @Override + public <K, V> void forward(final K key, final V value, final To to) { + toInternal.update(to); + if (toInternal.hasTimestamp()) { + setTime(toInternal.timestamp()); + } final ProcessorNode thisNode = currentNode; - for (final ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) { - if (childNode.name().equals(childName)) { - currentNode = childNode; - try { + try { + for (final ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) { + if (toInternal.child() == null || toInternal.child().equals(childNode.name())) { + currentNode = childNode; childNode.process(key, value); - } finally { - currentNode = thisNode; + toInternal.update(to); // need to reset because MockProcessorContext is shared over multiple Processors and toInternal might have been modified } - break; } + } finally { + currentNode = thisNode; } } diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java index afa0639..6b5d47a 100644 --- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.processor.internals.AbstractProcessorContext; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; @@ -65,6 +66,11 @@ public class NoOpProcessorContext extends AbstractProcessorContext { } @Override + public <K, V> void forward(final K key, final V value, final To to) { + forwardedValues.put(key, value); + } + + @Override public <K, V> void forward(final K key, final V value, final int childIndex) { forward(key, value); } -- To stop receiving notification emails like this one, please contact mj...@apache.org.