Repository: kafka Updated Branches: refs/heads/trunk b041c8d87 -> 667cd60dc
KAFKA-5816; add Produced class, KStream#to(topic, Produced), and KStream#through(topic, Produced) Add the `Produced` class and `KStream` overloads that use it: `KStream#to(String, Produced)` `KStream#through(String, Produced)` Deprecate all other to and through methods accept the single param methods that take a topic param Author: Damian Guy <[email protected]> Reviewers: Bill Bejeck <[email protected]>, Matthias J. Sax <[email protected]>, Guozhang Wang <[email protected]> Closes #3770 from dguy/kafka-5652-produced Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/667cd60d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/667cd60d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/667cd60d Branch: refs/heads/trunk Commit: 667cd60dc6ba68831423a256b6e455f7d955581c Parents: b041c8d Author: Damian Guy <[email protected]> Authored: Thu Sep 7 08:54:10 2017 +0100 Committer: Damian Guy <[email protected]> Committed: Thu Sep 7 08:54:10 2017 +0100 ---------------------------------------------------------------------- docs/streams/developer-guide.html | 18 +- .../apache/kafka/streams/kstream/KStream.java | 41 +++++ .../apache/kafka/streams/kstream/Produced.java | 163 +++++++++++++++++++ .../streams/kstream/internals/KStreamImpl.java | 34 ++-- .../kstream/internals/KStreamImplTest.java | 47 ++++++ 5 files changed, 287 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/667cd60d/docs/streams/developer-guide.html ---------------------------------------------------------------------- diff --git a/docs/streams/developer-guide.html b/docs/streams/developer-guide.html index 4da50a2..10220fb 100644 --- a/docs/streams/developer-guide.html +++ b/docs/streams/developer-guide.html @@ -1955,7 +1955,9 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r </p> <pre class="brush: java;"> - joined.to("topic4"); + joined.to("topic4"); + // or using custom Serdes and a StreamPartitioner + joined.to("topic5", Produced.with(keySerde, valueSerde, myStreamPartitioner)); </pre> If your application needs to continue reading and processing the records after they have been materialized @@ -1963,11 +1965,15 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r Kafka Streams provides a convenience method called <code>through</code>: <pre class="brush: java;"> - // equivalent to - // - // joined.to("topic4"); - // materialized = builder.stream("topic4"); - KStream<String, String> materialized = joined.through("topic4"); + // equivalent to + // + // joined.to("topic4"); + // materialized = builder.stream("topic4"); + KStream<String, String> materialized = joined.through("topic4"); + // if you need to provide serdes or a custom StreamPartitioner you can use + // the overloaded version + KStream<String, String> materialized = joined.through("topic5", + Produced.with(keySerde, valueSerde, myStreamPartitioner)); </pre> <br> http://git-wip-us.apache.org/repos/asf/kafka/blob/667cd60d/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index b8b5b8d..5a36cde 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -675,7 +675,9 @@ public interface KStream<K, V> { * if not specified producer's {@link DefaultPartitioner} will be used * @param topic the topic name * @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream} + * @deprecated use {@code through(String, Produced)} */ + @Deprecated KStream<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner, final String topic); @@ -696,7 +698,9 @@ public interface KStream<K, V> { * if not specified the default value serde defined in the configuration will be used * @param topic the topic name * @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream} + * @deprecated use {@code through(String, Produced)} */ + @Deprecated KStream<K, V> through(final Serde<K> keySerde, final Serde<V> valSerde, final String topic); @@ -721,13 +725,33 @@ public interface KStream<K, V> { * be used * @param topic the topic name * @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream} + * @deprecated use {@code through(String, Produced)} */ + @Deprecated KStream<K, V> through(final Serde<K> keySerde, final Serde<V> valSerde, final StreamPartitioner<? super K, ? super V> partitioner, final String topic); /** + * Materialize this stream to a topic and creates a new {@code KStream} from the topic using the + * {@link Produced} instance for configuration of the {@link Serde key serde}, {@link Serde value serde}, + * and {@link StreamPartitioner}. + * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is + * started). + * <p> + * This is equivalent to calling {@link #to(String, Produced) to(someTopic, Produced.with(keySerde, valueSerde)} + * and {@link StreamsBuilder#stream(Serde, Serde, String...) + * StreamsBuilder#stream(keySerde, valSerde, someTopicName)}. + * + * @param topic + * @param produced + * @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream} + */ + KStream<K, V> through(final String topic, + final Produced<K, V> produced); + + /** * Materialize this stream to a topic using default serializers specified in the config and producer's * {@link DefaultPartitioner}. * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is @@ -746,7 +770,9 @@ public interface KStream<K, V> { * @param partitioner the function used to determine how records are distributed among partitions of the topic, * if not specified producer's {@link DefaultPartitioner} will be used * @param topic the topic name + * @deprecated use {@code to(String, Produced} */ + @Deprecated void to(final StreamPartitioner<? super K, ? super V> partitioner, final String topic); @@ -762,7 +788,9 @@ public interface KStream<K, V> { * @param valSerde value serde used to send key-value pairs, * if not specified the default serde defined in the configs will be used * @param topic the topic name + * @deprecated use {@code to(String, Produced} */ + @Deprecated void to(final Serde<K> keySerde, final Serde<V> valSerde, final String topic); @@ -782,13 +810,26 @@ public interface KStream<K, V> { * {@link WindowedStreamPartitioner} will be used—otherwise {@link DefaultPartitioner} will * be used * @param topic the topic name + * @deprecated use {@code to(String, Produced} */ + @Deprecated void to(final Serde<K> keySerde, final Serde<V> valSerde, final StreamPartitioner<? super K, ? super V> partitioner, final String topic); /** + * Materialize this stream to a topic using the provided {@link Produced} instance. + * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is + * started). + * + * @param produced the options to use when producing to the topic + * @param topic the topic name + */ + void to(final String topic, + final Produced<K, V> produced); + + /** * Transform each record of the input stream into zero or more records in the output stream (both key and value type * can be altered arbitrarily). * A {@link Transformer} (provided by the given {@link TransformerSupplier}) is applied to each input record and http://git-wip-us.apache.org/repos/asf/kafka/blob/667cd60d/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java new file mode 100644 index 0000000..488bd15 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream; + +import org.apache.kafka.clients.producer.internals.DefaultPartitioner; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.kstream.internals.WindowedSerializer; +import org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner; +import org.apache.kafka.streams.processor.StreamPartitioner; + +/** + * This class is used to provide the optional parameters when producing to new topics + * using {@link KStream#through(String, Produced)} or {@link KStream#to(String, Produced)}. + * @param <K> key type + * @param <V> value type + */ +public class Produced<K, V> { + + private Serde<K> keySerde; + private Serde<V> valueSerde; + private StreamPartitioner<? super K, ? super V> partitioner; + + private Produced(final Serde<K> keySerde, + final Serde<V> valueSerde, + final StreamPartitioner<? super K, ? super V> partitioner) { + this.keySerde = keySerde; + this.valueSerde = valueSerde; + this.partitioner = partitioner; + } + + /** + * Create a Produced instance with provided keySerde and valueSerde. + * @param keySerde Serde to use for serializing the key + * @param valueSerde Serde to use for serializing the value + * @param <K> key type + * @param <V> value type + * @return A new {@link Produced} instance configured with keySerde and valueSerde + * @see KStream#through(String, Produced) + * @see KStream#to(String, Produced) + */ + public static <K, V> Produced<K, V> with(final Serde<K> keySerde, + final Serde<V> valueSerde) { + return new Produced<>(keySerde, valueSerde, null); + } + + /** + * Create a Produced instance with provided keySerde, valueSerde, and partitioner. + * @param keySerde Serde to use for serializing the key + * @param valueSerde Serde to use for serializing the value + * @param partitioner the function used to determine how records are distributed among partitions of the topic, + * if not specified and {@code keySerde} provides a {@link WindowedSerializer} for the key + * {@link WindowedStreamPartitioner} will be used—otherwise {@link DefaultPartitioner} wil be used + * @param <K> key type + * @param <V> value type + * @return A new {@link Produced} instance configured with keySerde, valueSerde, and partitioner + * @see KStream#through(String, Produced) + * @see KStream#to(String, Produced) + */ + public static <K, V> Produced<K, V> with(final Serde<K> keySerde, + final Serde<V> valueSerde, + final StreamPartitioner<? super K, ? super V> partitioner) { + return new Produced<>(keySerde, valueSerde, partitioner); + } + + /** + * Create a Produced instance with provided keySerde. + * @param keySerde Serde to use for serializing the key + * @param <K> key type + * @param <V> value type + * @return A new {@link Produced} instance configured with keySerde + * @see KStream#through(String, Produced) + * @see KStream#to(String, Produced) + */ + public static <K, V> Produced<K, V> keySerde(final Serde<K> keySerde) { + return new Produced<>(keySerde, null, null); + } + + /** + * Create a Produced instance with provided valueSerde. + * @param valueSerde Serde to use for serializing the key + * @param <K> key type + * @param <V> value type + * @return A new {@link Produced} instance configured with valueSerde + * @see KStream#through(String, Produced) + * @see KStream#to(String, Produced) + */ + public static <K, V> Produced<K, V> valueSerde(final Serde<V> valueSerde) { + return new Produced<>(null, valueSerde, null); + } + + /** + * Create a Produced instance with provided partitioner. + * @param partitioner the function used to determine how records are distributed among partitions of the topic, + * if not specified and the key serde provides a {@link WindowedSerializer} for the key + * {@link WindowedStreamPartitioner} will be used—otherwise {@link DefaultPartitioner} will be used + * @param <K> key type + * @param <V> value type + * @return A new {@link Produced} instance configured with partitioner + * @see KStream#through(String, Produced) + * @see KStream#to(String, Produced) + */ + public static <K, V> Produced<K, V> streamPartitioner(final StreamPartitioner<? super K, ? super V> partitioner) { + return new Produced<>(null, null, partitioner); + } + + /** + * Produce records using the provided partitioner. + * @param partitioner the function used to determine how records are distributed among partitions of the topic, + * if not specified and the key serde provides a {@link WindowedSerializer} for the key + * {@link WindowedStreamPartitioner} will be used—otherwise {@link DefaultPartitioner} wil be used + * @return this + */ + public Produced<K, V> withStreamPartitioner(final StreamPartitioner<? super K, ? super V> partitioner) { + this.partitioner = partitioner; + return this; + } + + /** + * Produce records using the provided valueSerde. + * @param valueSerde Serde to use for serializing the value + * @return this + */ + public Produced<K, V> withValueSerde(final Serde<V> valueSerde) { + this.valueSerde = valueSerde; + return this; + } + + /** + * Produce records using the provided keySerde. + * @param keySerde Serde to use for serializing the key + * @return this + */ + public Produced<K, V> withKeySerde(final Serde<K> keySerde) { + this.keySerde = keySerde; + return this; + } + + public Serde<K> keySerde() { + return keySerde; + } + + public Serde<V> valueSerde() { + return valueSerde; + } + + public StreamPartitioner<? super K, ? super V> streamPartitioner() { + return partitioner; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/667cd60d/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 8534da8..8aa7c58 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -31,6 +31,7 @@ import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.PrintForeachAction; +import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Serialized; import org.apache.kafka.streams.kstream.TransformerSupplier; import org.apache.kafka.streams.kstream.ValueJoiner; @@ -379,9 +380,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V public KStream<K, V> through(final Serde<K> keySerde, final Serde<V> valSerde, final StreamPartitioner<? super K, ? super V> partitioner, String topic) { - to(keySerde, valSerde, partitioner, topic); - return builder.stream(null, new FailOnInvalidTimestamp(), keySerde, valSerde, topic); + return through(topic, Produced.with(keySerde, valSerde, partitioner)); + } + + @Override + public KStream<K, V> through(final String topic, final Produced<K, V> produced) { + to(topic, produced); + return builder.stream(null, new FailOnInvalidTimestamp(), produced.keySerde(), produced.valueSerde(), topic); } @Override @@ -406,13 +412,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V public KStream<K, V> through(final Serde<K> keySerde, final Serde<V> valSerde, final String topic) { - return through(keySerde, valSerde, null, topic); + return through(topic, Produced.with(keySerde, valSerde)); } @Override public KStream<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner, final String topic) { - return through(null, null, partitioner, topic); + return through(topic, Produced.streamPartitioner(partitioner)); } @Override @@ -422,20 +428,20 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @Override public void to(final String topic) { - to(null, null, null, topic); + to(topic, Produced.<K, V>with(null, null, null)); } @Override public void to(final StreamPartitioner<? super K, ? super V> partitioner, final String topic) { - to(null, null, partitioner, topic); + to(topic, Produced.streamPartitioner(partitioner)); } @Override public void to(final Serde<K> keySerde, final Serde<V> valSerde, final String topic) { - to(keySerde, valSerde, null, topic); + to(topic, Produced.with(keySerde, valSerde)); } @SuppressWarnings("unchecked") @@ -445,10 +451,18 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V final StreamPartitioner<? super K, ? super V> partitioner, final String topic) { Objects.requireNonNull(topic, "topic can't be null"); - final String name = builder.newName(SINK_NAME); + to(topic, Produced.with(keySerde, valSerde, partitioner)); + } - final Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer(); - final Serializer<V> valSerializer = valSerde == null ? null : valSerde.serializer(); + @SuppressWarnings("unchecked") + @Override + public void to(final String topic, final Produced<K, V> produced) { + Objects.requireNonNull(topic, "topic can't be null"); + Objects.requireNonNull(produced, "Produced can't be null"); + final String name = builder.newName(SINK_NAME); + final Serializer<K> keySerializer = produced.keySerde() == null ? null : produced.keySerde().serializer(); + final Serializer<V> valSerializer = produced.valueSerde() == null ? null : produced.valueSerde().serializer(); + final StreamPartitioner<? super K, ? super V> partitioner = produced.streamPartitioner(); if (partitioner == null && keySerializer != null && keySerializer instanceof WindowedSerializer) { final WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer; http://git-wip-us.apache.org/repos/asf/kafka/blob/667cd60d/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index 1fed374..ca454f1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -30,20 +30,24 @@ import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Predicate; +import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.streams.processor.internals.SourceNode; +import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockKeyValueMapper; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockValueJoiner; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import java.util.Collections; import java.util.concurrent.TimeUnit; +import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; @@ -57,6 +61,9 @@ public class KStreamImplTest { private KStream<String, String> testStream; private StreamsBuilder builder; + @Rule + public final KStreamTestDriver driver = new KStreamTestDriver(); + @Before public void before() { builder = new StreamsBuilder(); @@ -179,6 +186,33 @@ public class KStreamImplTest { } @Test + public void shouldSendDataThroughTopicUsingProduced() { + final StreamsBuilder builder = new StreamsBuilder(); + final String input = "topic"; + final KStream<String, String> stream = builder.stream(stringSerde, stringSerde, input); + final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>(); + stream.through("through-topic", Produced.with(stringSerde, stringSerde)).process(processorSupplier); + + driver.setUp(builder); + driver.process(input, "a", "b"); + assertThat(processorSupplier.processed, equalTo(Collections.singletonList("a:b"))); + } + + @Test + public void shouldSendDataToTopicUsingProduced() { + final StreamsBuilder builder = new StreamsBuilder(); + final String input = "topic"; + final KStream<String, String> stream = builder.stream(stringSerde, stringSerde, input); + final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>(); + stream.to("to-topic", Produced.with(stringSerde, stringSerde)); + builder.stream(stringSerde, stringSerde, "to-topic").process(processorSupplier); + + driver.setUp(builder); + driver.process(input, "e", "f"); + assertThat(processorSupplier.processed, equalTo(Collections.singletonList("e:f"))); + } + + @Test // TODO: this test should be refactored when we removed KStreamBuilder so that the created Topology contains internal topics as well public void shouldUseRecordMetadataTimestampExtractorWhenInternalRepartitioningTopicCreated() { final KStreamBuilder builder = new KStreamBuilder(); @@ -376,6 +410,18 @@ public class KStreamImplTest { null); } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerOnThroughWhenProducedIsNull() { + testStream.through("topic", null); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerOnToWhenProducedIsNull() { + testStream.to("topic", null); + } + + @Test public void shouldThrowNullPointerOnLeftJoinWithTableWhenJoinedIsNull() { final KTable<String, String> table = builder.table(Serdes.String(), Serdes.String(), "blah"); @@ -411,4 +457,5 @@ public class KStreamImplTest { public void shouldThrowNullPointerOnOuterJoinJoinedIsNull() { testStream.outerJoin(testStream, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(10), null); } + }
