Repository: kafka Updated Branches: refs/heads/trunk 5e8958a85 -> d07bb1814
MINOR: comments on KStream methods, and fix generics guozhangwang Author: Yasuhiro Matsuda <[email protected]> Reviewers: Guozhang Wang Closes #591 from ymatsuda/comments Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d07bb181 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d07bb181 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d07bb181 Branch: refs/heads/trunk Commit: d07bb1814010ca4d822e44330d1e8ea4b2839c80 Parents: 5e8958a Author: Yasuhiro Matsuda <[email protected]> Authored: Wed Nov 25 16:44:43 2015 -0800 Committer: Guozhang Wang <[email protected]> Committed: Wed Nov 25 16:44:43 2015 -0800 ---------------------------------------------------------------------- .../apache/kafka/streams/kstream/KStream.java | 33 +++++++++----------- .../kafka/streams/kstream/KStreamBuilder.java | 8 ++--- .../streams/kstream/internals/KStreamImpl.java | 21 +++++-------- .../kstream/internals/KStreamImplTest.java | 2 +- 4 files changed, 25 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d07bb181/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 8f0794c..992bd75 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -30,7 +30,7 @@ import org.apache.kafka.streams.processor.ProcessorSupplier; public interface KStream<K, V> { /** - * Creates a new stream consists of all elements of this stream which satisfy a predicate + * Creates a new instance of KStream consists of all elements of this stream which satisfy a predicate * * @param predicate the instance of Predicate * @return the stream with only those elements that satisfy the predicate @@ -38,7 +38,7 @@ public interface KStream<K, V> { KStream<K, V> filter(Predicate<K, V> predicate); /** - * Creates a new stream consists all elements of this stream which do not satisfy a predicate + * Creates a new instance of KStream consists all elements of this stream which do not satisfy a predicate * * @param predicate the instance of Predicate * @return the stream with only those elements that do not satisfy the predicate @@ -56,30 +56,30 @@ public interface KStream<K, V> { <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1, V1>> mapper); /** - * Creates a new stream by applying transforming each value in this stream into a different value in the new stream. + * Creates a new instance of KStream by applying transforming each value in this stream into a different value in the new stream. * * @param mapper the instance of ValueMapper * @param <V1> the value type of the new stream - * @return the mapped stream + * @return the instance of KStream */ <V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapper); /** - * Creates a new stream by applying transforming each element in this stream into zero or more elements in the new stream. + * Creates a new instance of KStream by applying transforming each element in this stream into zero or more elements in the new stream. * * @param mapper the instance of KeyValueMapper * @param <K1> the key type of the new stream * @param <V1> the value type of the new stream - * @return the mapped stream + * @return the instance of KStream */ <K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<K, V, Iterable<KeyValue<K1, V1>>> mapper); /** - * Creates a new stream by applying transforming each value in this stream into zero or more values in the new stream. + * Creates a new instance of KStream by applying transforming each value in this stream into zero or more values in the new stream. * * @param processor the instance of Processor * @param <V1> the value type of the new stream - * @return the mapped stream + * @return the instance of KStream */ <V1> KStream<K, V1> flatMapValues(ValueMapper<V, Iterable<V1>> processor); @@ -98,7 +98,7 @@ public interface KStream<K, V> { * An element will be dropped if none of the predicates evaluate true. * * @param predicates the ordered list of Predicate instances - * @return the new streams that each contain those elements for which their Predicate evaluated to true. + * @return the instances of KStream that each contain those elements for which their Predicate evaluated to true. */ KStream<K, V>[] branch(Predicate<K, V>... predicates); @@ -107,14 +107,12 @@ public interface KStream<K, V> { * This is equivalent to calling to(topic) and from(topic). * * @param topic the topic name - * @param <K1> the key type of the new stream - * @param <V1> the value type of the new stream * @return the new stream that consumes the given topic */ - <K1, V1> KStream<K1, V1> through(String topic); + KStream<K, V> through(String topic); /** - * Sends key-value to a topic, also creates a new stream from the topic. + * Sends key-value to a topic, also creates a new instance of KStream from the topic. * This is equivalent to calling to(topic) and from(topic). * * @param topic the topic name @@ -126,11 +124,9 @@ public interface KStream<K, V> { * if not specified the default key deserializer defined in the configuration will be used * @param valDeserializer value deserializer used to create the new KStream, * if not specified the default value deserializer defined in the configuration will be used - * @param <K1> the key type of the new stream - * @param <V1> the value type of the new stream * @return the new stream that consumes the given topic */ - <K1, V1> KStream<K1, V1> through(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, Deserializer<K1> keyDeserializer, Deserializer<V1> valDeserializer); + KStream<K, V> through(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer); /** * Sends key-value to a topic using default serializers specified in the config. @@ -155,7 +151,7 @@ public interface KStream<K, V> { * * @param transformerSupplier the class of TransformerDef * @param stateStoreNames the names of the state store used by the processor - * @return KStream + * @return the instance of KStream that contains transformed keys and values */ <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier, String... stateStoreNames); @@ -164,7 +160,7 @@ public interface KStream<K, V> { * * @param valueTransformerSupplier the class of TransformerDef * @param stateStoreNames the names of the state store used by the processor - * @return KStream + * @return the instance of KStream that contains transformed keys and values */ <R> KStream<K, R> transformValues(ValueTransformerSupplier<V, R> valueTransformerSupplier, String... stateStoreNames); @@ -173,7 +169,6 @@ public interface KStream<K, V> { * * @param processorSupplier the supplier of the Processor to use * @param stateStoreNames the names of the state store used by the processor - * @return the new stream containing the processed output */ void process(ProcessorSupplier<K, V> processorSupplier, String... stateStoreNames); http://git-wip-us.apache.org/repos/asf/kafka/blob/d07bb181/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java index a95d08c..ae8f694 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java @@ -43,11 +43,7 @@ public class KStreamBuilder extends TopologyBuilder { * @return KStream */ public <K, V> KStream<K, V> from(String... topics) { - String name = newName(KStreamImpl.SOURCE_NAME); - - addSource(name, topics); - - return new KStreamImpl<>(this, name, Collections.singleton(name)); + return from(null, null, topics); } /** @@ -60,7 +56,7 @@ public class KStreamBuilder extends TopologyBuilder { * @param topics the topic names, if empty default to all the topics in the config * @return KStream */ - public <K, V> KStream<K, V> from(Deserializer<? extends K> keyDeserializer, Deserializer<? extends V> valDeserializer, String... topics) { + public <K, V> KStream<K, V> from(Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer, String... topics) { String name = newName(KStreamImpl.SOURCE_NAME); addSource(name, keyDeserializer, valDeserializer, topics); http://git-wip-us.apache.org/repos/asf/kafka/blob/d07bb181/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index a408458..04aa8e9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -32,7 +32,6 @@ import org.apache.kafka.streams.kstream.WindowSupplier; import org.apache.kafka.streams.processor.ProcessorSupplier; import java.lang.reflect.Array; -import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -187,25 +186,21 @@ public class KStreamImpl<K, V> implements KStream<K, V> { } @Override - public <K1, V1> KStream<K1, V1> through(String topic, - Serializer<K> keySerializer, - Serializer<V> valSerializer, - Deserializer<K1> keyDeserializer, - Deserializer<V1> valDeserializer) { + public KStream<K, V> through(String topic, + Serializer<K> keySerializer, + Serializer<V> valSerializer, + Deserializer<K> keyDeserializer, + Deserializer<V> valDeserializer) { String sendName = topology.newName(SINK_NAME); topology.addSink(sendName, topic, keySerializer, valSerializer, this.name); - String sourceName = topology.newName(SOURCE_NAME); - - topology.addSource(sourceName, keyDeserializer, valDeserializer, topic); - - return new KStreamImpl<>(topology, sourceName, Collections.singleton(sourceName)); + return topology.from(keyDeserializer, valDeserializer, topic); } @Override - public <K1, V1> KStream<K1, V1> through(String topic) { - return through(topic, (Serializer<K>) null, (Serializer<V>) null, (Deserializer<K1>) null, (Deserializer<V1>) null); + public KStream<K, V> through(String topic) { + return through(topic, null, null, null, null); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/d07bb181/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index d924a34..1e775b8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -119,7 +119,7 @@ public class KStreamImplTest { stream4.to("topic-5"); - stream5.through("topic-6").process(new MockProcessorSupplier<>()); + stream5.through("topic-6").process(new MockProcessorSupplier<String, Integer>()); assertEquals(2 + // sources 2 + // stream1
