Repository: kafka Updated Branches: refs/heads/trunk 1d80f563b -> 79eacf6c9
MINOR: Some more Kafka Streams Javadocs Author: Guozhang Wang <[email protected]> Reviewers: Yasuhiro Matsuda <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes #853 from guozhangwang/KJavaDoc Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/79eacf6c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/79eacf6c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/79eacf6c Branch: refs/heads/trunk Commit: 79eacf6c95506d5d6819add5a1256681b13170b1 Parents: 1d80f56 Author: Guozhang Wang <[email protected]> Authored: Wed Feb 3 11:31:32 2016 -0800 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Wed Feb 3 11:31:32 2016 -0800 ---------------------------------------------------------------------- .../org/apache/kafka/streams/KafkaStreams.java | 35 ++++++++++++++------ .../java/org/apache/kafka/streams/KeyValue.java | 8 +++++ .../apache/kafka/streams/StreamsMetrics.java | 3 ++ .../streams/errors/ProcessorStateException.java | 3 ++ .../streams/errors/TaskAssignmentException.java | 2 +- .../streams/errors/TaskIdFormatException.java | 3 ++ .../errors/TopologyBuilderException.java | 3 ++ .../kafka/streams/kstream/Aggregator.java | 7 ++++ .../kafka/streams/kstream/HoppingWindows.java | 3 ++ .../kafka/streams/kstream/Initializer.java | 5 +++ .../kafka/streams/kstream/JoinWindows.java | 2 +- .../apache/kafka/streams/kstream/KStream.java | 2 +- .../kafka/streams/kstream/KStreamBuilder.java | 11 +++++- .../apache/kafka/streams/kstream/KTable.java | 1 + .../kafka/streams/kstream/KeyValueMapper.java | 7 ++++ .../apache/kafka/streams/kstream/Predicate.java | 7 ++-- .../apache/kafka/streams/kstream/Reducer.java | 5 +++ .../kafka/streams/kstream/TumblingWindows.java | 4 ++- .../kafka/streams/kstream/ValueJoiner.java | 7 ++++ .../kafka/streams/kstream/ValueMapper.java | 6 ++++ .../kstream/ValueTransformerSupplier.java | 1 - .../apache/kafka/streams/kstream/Window.java | 3 ++ .../apache/kafka/streams/kstream/Windows.java | 5 +++ .../processor/DefaultPartitionGrouper.java | 12 +++++-- .../kafka/streams/processor/Processor.java | 2 +- .../streams/processor/ProcessorContext.java | 3 ++ .../streams/processor/StateStoreSupplier.java | 3 ++ .../apache/kafka/streams/processor/TaskId.java | 3 ++ .../kafka/streams/state/KeyValueIterator.java | 6 ++++ .../apache/kafka/streams/state/WindowStore.java | 17 ++++++++++ 30 files changed, 155 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 071cef6..a19f697 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams; +import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; @@ -38,15 +39,18 @@ import java.util.concurrent.atomic.AtomicInteger; * Kafka Streams allows for performing continuous computation on input coming from one or more input topics and * sends output to zero or more output topics. * <p> - * This processing is defined by using the {@link TopologyBuilder} class or its superclass KStreamBuilder to specify - * the transformation. - * The {@link KafkaStreams} instance will be responsible for the lifecycle of these processors. It will instantiate and - * start one or more of these processors to process the Kafka partitions assigned to this particular instance. + * The computational logic can be specified either by using the {@link TopologyBuilder} class to define the a DAG topology of + * {@link org.apache.kafka.streams.processor.Processor}s or by using the {@link org.apache.kafka.streams.kstream.KStreamBuilder} + * class which provides the high-level {@link org.apache.kafka.streams.kstream.KStream} DSL to define the transformation. + * + * The {@link KafkaStreams} class manages the lifecycle of a Kafka Streams instance. One stream instance can contain one or + * more threads specified in the configs for the processing work. * <p> - * This {@link KafkaStreams} instance will co-ordinate with any other instances (whether in this same process, on other processes - * on this machine, or on remote machines). These processes will divide up the work so that all partitions are being - * consumed. If instances are added or die, the corresponding {@link StreamThread} instances will be shutdown or - * started in the appropriate processes to balance processing load. + * A {@link KafkaStreams} instance can co-ordinate with any other instances with the same job ID (whether in this same process, on other processes + * on this machine, or on remote machines) as a single (possibly distributed) stream processing client. These instances will divide up the work + * based on the assignment of the input topic partitions so that all partitions are being + * consumed. If instances are added or failed, all instances will rebelance the partition assignment among themselves + * to balance processing load. * <p> * Internally the {@link KafkaStreams} instance contains a normal {@link org.apache.kafka.clients.producer.KafkaProducer KafkaProducer} * and {@link org.apache.kafka.clients.consumer.KafkaConsumer KafkaConsumer} instance that is used for reading input and writing output. @@ -70,6 +74,9 @@ import java.util.concurrent.atomic.AtomicInteger; * </pre> * */ +// TODO: about example may need to be updated after KAFKA-3153 + [email protected] public class KafkaStreams { private static final Logger log = LoggerFactory.getLogger(KafkaStreams.class); @@ -94,6 +101,12 @@ public class KafkaStreams { this(builder, new StreamsConfig(props)); } + /** + * Construct the stream instance. + * + * @param builder The processor topology builder specifying the computational logic + * @param config The stream configs + */ public KafkaStreams(TopologyBuilder builder, StreamsConfig config) { // create the metrics Time time = new SystemTime(); @@ -124,7 +137,7 @@ public class KafkaStreams { } /** - * Start the stream process by starting all its threads + * Start the stream instance by starting all its threads. */ public synchronized void start() { log.debug("Starting Kafka Stream process"); @@ -142,8 +155,8 @@ public class KafkaStreams { } /** - * Shutdown this stream process by signaling the threads to stop, - * wait for them to join and clean up the process instance. + * Shutdown this stream instance by signaling all the threads to stop, + * and then wait for them to join. */ public synchronized void close() { log.debug("Stopping Kafka Stream process"); http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/KeyValue.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/KeyValue.java b/streams/src/main/java/org/apache/kafka/streams/KeyValue.java index 472e677..d813c47 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KeyValue.java +++ b/streams/src/main/java/org/apache/kafka/streams/KeyValue.java @@ -17,6 +17,14 @@ package org.apache.kafka.streams; +/** + * A key-value pair defined for a single Kafka Streams record. + * If the record comes directly from a Kafka topic then its + * key / value are defined as the message key / value. + * + * @param <K> Key type + * @param <V> Value type + */ public class KeyValue<K, V> { public final K key; http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java b/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java index a151392..d392eef 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java @@ -19,6 +19,9 @@ package org.apache.kafka.streams; import org.apache.kafka.common.metrics.Sensor; +/** + * The stream metrics interface for adding metric sensors and collecting metric values. + */ public interface StreamsMetrics { Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags); http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java b/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java index 6434d04..e6f872a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java @@ -17,6 +17,9 @@ package org.apache.kafka.streams.errors; +/** + * Indicates a processor state operation (e.g. put, get) has failed. + */ public class ProcessorStateException extends StreamsException { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/errors/TaskAssignmentException.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/TaskAssignmentException.java b/streams/src/main/java/org/apache/kafka/streams/errors/TaskAssignmentException.java index 3ae8503..3936ef5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/TaskAssignmentException.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/TaskAssignmentException.java @@ -17,7 +17,7 @@ package org.apache.kafka.streams.errors; /** - * The run time exception class for stream task assignments + * Indicates a run time error incurred while trying to assign stream tasks to threads */ public class TaskAssignmentException extends StreamsException { http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/errors/TaskIdFormatException.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/TaskIdFormatException.java b/streams/src/main/java/org/apache/kafka/streams/errors/TaskIdFormatException.java index bf0ebf5..576b972 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/TaskIdFormatException.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/TaskIdFormatException.java @@ -17,6 +17,9 @@ package org.apache.kafka.streams.errors; +/** + * Indicates a run time error incurred while trying parse the task id from the read string + */ public class TaskIdFormatException extends StreamsException { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java b/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java index 9dd740b..8745693 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java @@ -17,6 +17,9 @@ package org.apache.kafka.streams.errors; +/** + * Indicates a pre-run time error incurred while parsing the builder to construct the processor topology + */ public class TopologyBuilderException extends StreamsException { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java index e3eb18f..0d29409 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java @@ -17,6 +17,13 @@ package org.apache.kafka.streams.kstream; +/** + * The Aggregator interface for aggregating values of the given key. + * + * @param <K> Key type. + * @param <V> Receiving value type. + * @param <T> Aggregate value type. + */ public interface Aggregator<K, V, T> { T apply(K aggKey, V value, T aggregate); http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java index f354ef9..aa866e4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java @@ -22,6 +22,9 @@ import org.apache.kafka.streams.kstream.internals.HoppingWindow; import java.util.HashMap; import java.util.Map; +/** + * The hopping window specifications used for aggregations. + */ public class HoppingWindows extends Windows<HoppingWindow> { private static final long DEFAULT_SIZE_MS = 1000L; http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java index 0aeddc9..fdd5220 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java @@ -17,6 +17,11 @@ package org.apache.kafka.streams.kstream; +/** + * The Initializer interface for creating an initial value for aggregations. + * + * @param <T> Aggregate value type. + */ public interface Initializer<T> { T apply(); http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java index ffc1c1c..70294a8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java @@ -23,7 +23,7 @@ import org.apache.kafka.streams.kstream.internals.TumblingWindow; import java.util.Map; /** - * This class is used to specify the behaviour of windowed joins. + * The window specifications used for joins. */ public class JoinWindows extends Windows<TumblingWindow> { http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index f6fa48d..b83298f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -23,7 +23,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorSupplier; - +// TODO: Javadoc needs to be updated /** * KStream is an abstraction of a stream of key-value pairs. * http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java index b50cffb..3cf198c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java @@ -29,7 +29,8 @@ import java.util.Collections; import java.util.concurrent.atomic.AtomicInteger; /** - * KStreamBuilder is the class to create KStream instances. + * KStreamBuilder is a subclass of {@link TopologyBuilder} that provides the {@link KStream} DSL + * for users to specify computational logic and translates the given logic to a processor topology. */ public class KStreamBuilder extends TopologyBuilder { @@ -39,6 +40,7 @@ public class KStreamBuilder extends TopologyBuilder { super(); } + // TODO: needs updated /** * Creates a KStream instance for the specified topic. * The default deserializers specified in the config are used. @@ -115,6 +117,13 @@ public class KStreamBuilder extends TopologyBuilder { return KStreamImpl.merge(this, streams); } + /** + * Create a unique processor name used for translation into the processor topology. + * This function is only for internal usage. + * + * @param prefix Processor name prefix. + * @return The unique processor name. + */ public String newName(String prefix) { return prefix + String.format("%010d", index.getAndIncrement()); } http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index 5cd9d9c..b83b0de 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KeyValue; +// TODO: Javadoc needs to be updated. /** * KTable is an abstraction of a change log stream. * http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java index 62b07f6..9c04ef5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java @@ -17,6 +17,13 @@ package org.apache.kafka.streams.kstream; +/** + * The KeyValueMapper interface for mapping a key-value pair to a new value (could be another key-value pair). + * + * @param <K> Original key type. + * @param <V> Original value type. + * @param <R> Mapped value type. + */ public interface KeyValueMapper<K, V, R> { R apply(K key, V value); http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java index c73622e..784f5b1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java @@ -18,13 +18,12 @@ package org.apache.kafka.streams.kstream; /** - * Represents a predicate (boolean-valued function) of two arguments. + * The Predicate interface represents a predicate (boolean-valued function) of a key-value pair. * - * @param <K> the type of key - * @param <V> the type of value + * @param <K> Key type. + * @param <V> Value type. */ public interface Predicate<K, V> { boolean test(K key, V value); - } http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java index 418f442..bf25f73 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java @@ -17,6 +17,11 @@ package org.apache.kafka.streams.kstream; +/** + * The Reducer interface for combining two values of the same type into a new value. + * + * @param <V> Value type. + */ public interface Reducer<V> { V apply(V value1, V value2); http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java index 188fe66..cadedba 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java @@ -17,12 +17,14 @@ package org.apache.kafka.streams.kstream; - import org.apache.kafka.streams.kstream.internals.TumblingWindow; import java.util.HashMap; import java.util.Map; +/** + * The tumbling window specifications used for aggregations. + */ public class TumblingWindows extends Windows<TumblingWindow> { private static final long DEFAULT_SIZE_MS = 1000L; http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java index 93fc359..41005b3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java @@ -17,6 +17,13 @@ package org.apache.kafka.streams.kstream; +/** + * The ValueJoiner interface for joining two values and return a the joined new value. + * + * @param <V1> First value type. + * @param <V2> Second value type. + * @param <R> Joined value type. + */ public interface ValueJoiner<V1, V2, R> { R apply(V1 value1, V2 value2); http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java index a32423d..d507c87 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java @@ -17,6 +17,12 @@ package org.apache.kafka.streams.kstream; +/** + * The KeyValueMapper interface for mapping an original value to a new value (could be another key-value pair). + * + * @param <V1> Original value type. + * @param <V2> Mapped value type. + */ public interface ValueMapper<V1, V2> { V2 apply(V1 value); http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java index 5c053c7..04fa9eb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java @@ -20,5 +20,4 @@ package org.apache.kafka.streams.kstream; public interface ValueTransformerSupplier<V, R> { ValueTransformer<V, R> get(); - } http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java index b9401b0..f2965dc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java @@ -17,6 +17,9 @@ package org.apache.kafka.streams.kstream; +/** + * A single window instance, defined by its start and end timestamp. + */ public abstract class Window { private long start; http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java index e4d7d9d..678e351 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java @@ -21,6 +21,11 @@ package org.apache.kafka.streams.kstream; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +/** + * The window specification interface that can be extended for windowing operation in joins and aggregations. + * + * @param <W> Type of the window instance + */ public abstract class Windows<W extends Window> { private static final int DEFAULT_NUM_SEGMENTS = 3; http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java index 57df685..dad5c6f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java @@ -31,12 +31,20 @@ import java.util.Set; /** * DefaultPartitionGrouper groups partitions by the partition id. This behavior is assumed by the join processing in KStream. - * Join processing requires that topics are copartitoned, i.e., being partitioned by the same key and having the same - * number of partitions, are grouped together. Copartitioning is ensured by having the same number of partitions on + * + * Join operations requires that topics of the joining entities are copartitoned, i.e., being partitioned by the same key and having the same + * number of partitions. Copartitioning is ensured by having the same number of partitions on * joined topics, and by using the serialization and Producer's default partitioner. */ public class DefaultPartitionGrouper implements PartitionGrouper { + /** + * Generate tasks with the assigned topic partitions + * + * @param topicGroups {@link TopologyBuilder#topicGroups()} where topics of the same group need to be joined together + * @param metadata Metadata of the consuming cluster + * @return The map from generated task ids to the assigned partitions. + */ public Map<TaskId, Set<TopicPartition>> partitionGroups(Map<Integer, Set<String>> topicGroups, Cluster metadata) { Map<TaskId, Set<TopicPartition>> groups = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java index 3cade3a..fbd72f0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java @@ -18,7 +18,7 @@ package org.apache.kafka.streams.processor; /** - * A processor of messages. + * A processor of key-value pair records. * * @param <K> the type of keys * @param <V> the type of values http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java index af98300..9740fa3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java @@ -23,6 +23,9 @@ import org.apache.kafka.streams.StreamsMetrics; import java.io.File; +/** + * Processor context interface. + */ public interface ProcessorContext { /** http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java index 11545c5..993500d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java @@ -17,6 +17,9 @@ package org.apache.kafka.streams.processor; +/** + * A state store supplier which can create one or more {@link StateStore} instances. + */ public interface StateStoreSupplier { String name(); http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java index 6e7150e..69b29bf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java @@ -24,6 +24,9 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +/** + * The task id representation composed as topic group id plus the assigned partition id. + */ public class TaskId implements Comparable<TaskId> { public final int topicGroupId; http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java index bd118a2..55ec8cf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java @@ -24,6 +24,12 @@ import org.apache.kafka.streams.KeyValue; import java.io.Closeable; import java.util.Iterator; +/** + * Iterator interface of {@link KeyValue}. + * + * @param <K> Type of keys + * @param <V> Type of values + */ public interface KeyValueIterator<K, V> extends Iterator<KeyValue<K, V>>, Closeable { @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/79eacf6c/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java index 1d806e0..cbd373c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java @@ -21,11 +21,28 @@ package org.apache.kafka.streams.state; import org.apache.kafka.streams.processor.StateStore; +/** + * A windowed store interface extending {@link StateStore} + * + * @param <K> Type of keys + * @param <V> Type of values + */ public interface WindowStore<K, V> extends StateStore { + /** + * Put a key-value pair with the current wall-clock time as the timestamp + * into the corresponding window + */ void put(K key, V value); + /** + * Put a key-value pair with the given timestamp into the corresponding window + */ void put(K key, V value, long timestamp); + /** + * Get all the key-value pairs with the given key and the time range from all + * the existing windows. + */ WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo); }
