This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new f3ab1e5 MINOR: clarify the record selection algorithm and stream-time definition (#6128) f3ab1e5 is described below commit f3ab1e5b9e429a930bb3f9fd5a26dd12730c5110 Author: John Roesler <vvcep...@users.noreply.github.com> AuthorDate: Sun Jan 13 13:43:48 2019 -0600 MINOR: clarify the record selection algorithm and stream-time definition (#6128) The existing javadoc for PartitionGroup is a little confusing. It's relatively important for these concepts to be clear, since they form the basis for stream-time in Kafka Streams. Reviewers: Matthias J. Sax <matth...@confluent.io>, Guozhang Wang <wangg...@gmail.com> --- .../processor/internals/PartitionGroup.java | 26 ++++++++++++++++------ 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java index 1fdd454..fbafa73 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java @@ -27,14 +27,26 @@ import java.util.PriorityQueue; import java.util.Set; /** - * A PartitionGroup is composed from a set of partitions. It also maintains the timestamp of this - * group, a.k.a. the stream time of the associated task. It is defined as the maximum timestamp of - * all the records having been retrieved for processing from this PartitionGroup so far. + * PartitionGroup is used to buffer all co-partitioned records for processing. * - * We decide from which partition to retrieve the next record to process based on partitions' timestamps. - * The timestamp of a specific partition is initialized as UNKNOWN (-1), and is updated with the head record's timestamp - * if it is smaller (i.e. it should be monotonically increasing); when the partition's buffer becomes empty and there is - * no head record, the partition's timestamp will not be updated any more. + * In other words, it represents the "same" partition over multiple co-partitioned topics, and it is used + * to buffer records from that partition in each of the contained topic-partitions. + * Each StreamTask has exactly one PartitionGroup. + * + * PartitionGroup implements the algorithm that determines in what order buffered records are selected for processing. + * + * Specifically, when polled, it returns the record from the topic-partition with the lowest stream-time. + * Stream-time for a topic-partition is defined as the highest timestamp + * yet observed at the head of that topic-partition. + * + * PartitionGroup also maintains a stream-time for the group as a whole. + * This is defined as the highest timestamp of any record yet polled from the PartitionGroup. + * The PartitionGroup's stream-time is also the stream-time of its task and is used as the + * stream-time for any computations that require it. + * + * The PartitionGroups's stream-time is initially UNKNOWN (-1), and it set to a known value upon first poll. + * As a consequence of the definition, the PartitionGroup's stream-time is non-decreasing + * (i.e., it increases or stays the same over time). */ public class PartitionGroup {