Repository: kafka Updated Branches: refs/heads/trunk 86a9036a7 -> 8189f9d58
MINOR: some javadocs for kstream public api guozhangwang Author: Yasuhiro Matsuda <[email protected]> Reviewers: Guozhang Wang Closes #844 from ymatsuda/javadoc Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8189f9d5 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8189f9d5 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8189f9d5 Branch: refs/heads/trunk Commit: 8189f9d58002ec0092737741bf6c74eebab4dc73 Parents: 86a9036 Author: Yasuhiro Matsuda <[email protected]> Authored: Tue Feb 2 10:52:43 2016 -0800 Committer: Guozhang Wang <[email protected]> Committed: Tue Feb 2 10:52:43 2016 -0800 ---------------------------------------------------------------------- .../streams/processor/AbstractProcessor.java | 10 ++--- .../processor/DefaultPartitionGrouper.java | 6 +++ .../streams/processor/PartitionGrouper.java | 8 ++-- .../streams/processor/ProcessorContext.java | 41 ++++++++++++++++++++ .../streams/processor/TimestampExtractor.java | 2 +- 5 files changed, 58 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/8189f9d5/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java index 01d0024..1932e5e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java @@ -19,7 +19,7 @@ package org.apache.kafka.streams.processor; /** * An abstract implementation of {@link Processor} that manages the {@link ProcessorContext} instance and provides default no-op * implementations of {@link #punctuate(long)} and {@link #close()}. - * + * * @param <K> the type of keys * @param <V> the type of values */ @@ -41,11 +41,11 @@ public abstract class AbstractProcessor<K, V> implements Processor<K, V> { * <p> * This method does nothing by default; if desired, subclasses should override it with custom functionality. * </p> - * - * @param streamTime the stream time when this method is being called + * + * @param timestamp the wallclock time when this method is being called */ @Override - public void punctuate(long streamTime) { + public void punctuate(long timestamp) { // do nothing } @@ -62,7 +62,7 @@ public abstract class AbstractProcessor<K, V> implements Processor<K, V> { /** * Get the processor's context set during {@link #init(ProcessorContext) initialization}. - * + * * @return the processor context; null only when called prior to {@link #init(ProcessorContext) initialization}. */ protected final ProcessorContext context() { http://git-wip-us.apache.org/repos/asf/kafka/blob/8189f9d5/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java index 47c5e58..57df685 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java @@ -29,6 +29,12 @@ import java.util.List; import java.util.Map; import java.util.Set; +/** + * DefaultPartitionGrouper groups partitions by the partition id. This behavior is assumed by the join processing in KStream. + * Join processing requires that topics are copartitoned, i.e., being partitioned by the same key and having the same + * number of partitions, are grouped together. Copartitioning is ensured by having the same number of partitions on + * joined topics, and by using the serialization and Producer's default partitioner. + */ public class DefaultPartitionGrouper implements PartitionGrouper { public Map<TaskId, Set<TopicPartition>> partitionGroups(Map<Integer, Set<String>> topicGroups, Cluster metadata) { http://git-wip-us.apache.org/repos/asf/kafka/blob/8189f9d5/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java index a40a1c4..f8311e7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java @@ -26,11 +26,13 @@ import java.util.Set; public interface PartitionGrouper { /** - * Returns a map of task ids to groups of partitions. + * Returns a map of task ids to groups of partitions. A partition group forms a task, thus, partitions that are + * expected to be processed together must be in the same group. DefaultPartitionGrouper implements this + * interface. See {@link DefaultPartitionGrouper} for more information. * - * @param topicGroups The subscribed topic groups + * @param topicGroups The map from the {@link TopologyBuilder#topicGroups() topic group} id to topics * @param metadata Metadata of the consuming cluster * @return a map of task ids to groups of partitions */ Map<TaskId, Set<TopicPartition>> partitionGroups(Map<Integer, Set<String>> topicGroups, Cluster metadata); -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/8189f9d5/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java index 41e2235..af98300 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java @@ -83,19 +83,60 @@ public interface ProcessorContext { StateStore getStateStore(String name); + /** + * Schedules a periodic operation for processors. A processor may call this method during + * {@link Processor#init(ProcessorContext) initialization} to + * schedule a periodic call called a punctuation to {@link Processor#punctuate(long)}. + * + * @param interval the time interval between punctuations + */ void schedule(long interval); + /** + * Forwards a key/value pair to the downstream processors + * @param key key + * @param value value + */ <K, V> void forward(K key, V value); + /** + * Forwards a key/value pair to one of the downstream processors designated by childIndex + * @param key key + * @param value value + */ <K, V> void forward(K key, V value, int childIndex); + /** + * Requests a commit + */ void commit(); + /** + * Returns the topic name of the current input record + * + * @return the topic name + */ String topic(); + /** + * Returns the partition id of the current input record + * + * @return the partition id + */ int partition(); + /** + * Returns the offset of the current input record + * + * @return the offset + */ long offset(); + /** + * Returns the timestamp of the current input record. The timestamp is extracted from + * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} by {@link TimestampExtractor}. + * + * @return the timestamp + */ long timestamp(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/8189f9d5/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java index 62098f2..ce0ba70 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java @@ -20,7 +20,7 @@ package org.apache.kafka.streams.processor; import org.apache.kafka.clients.consumer.ConsumerRecord; /** - * An interface that allows the KStream framework to extract a timestamp from a key-value pair + * An interface that allows the Kafka Streams framework to extract a timestamp from an instance of {@link ConsumerRecord} */ public interface TimestampExtractor {
