This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f3ab1e5  MINOR: clarify the record selection algorithm and stream-time 
definition (#6128)
f3ab1e5 is described below

commit f3ab1e5b9e429a930bb3f9fd5a26dd12730c5110
Author: John Roesler <vvcep...@users.noreply.github.com>
AuthorDate: Sun Jan 13 13:43:48 2019 -0600

    MINOR: clarify the record selection algorithm and stream-time definition 
(#6128)
    
    The existing javadoc for PartitionGroup is a little confusing.
    It's relatively important for these concepts to be clear, since
    they form the basis for stream-time in Kafka Streams.
    
    Reviewers: Matthias J. Sax <matth...@confluent.io>, Guozhang Wang 
<wangg...@gmail.com>
---
 .../processor/internals/PartitionGroup.java        | 26 ++++++++++++++++------
 1 file changed, 19 insertions(+), 7 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index 1fdd454..fbafa73 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -27,14 +27,26 @@ import java.util.PriorityQueue;
 import java.util.Set;
 
 /**
- * A PartitionGroup is composed from a set of partitions. It also maintains 
the timestamp of this
- * group, a.k.a. the stream time of the associated task. It is defined as the 
maximum timestamp of
- * all the records having been retrieved for processing from this 
PartitionGroup so far.
+ * PartitionGroup is used to buffer all co-partitioned records for processing.
  *
- * We decide from which partition to retrieve the next record to process based 
on partitions' timestamps.
- * The timestamp of a specific partition is initialized as UNKNOWN (-1), and 
is updated with the head record's timestamp
- * if it is smaller (i.e. it should be monotonically increasing); when the 
partition's buffer becomes empty and there is
- * no head record, the partition's timestamp will not be updated any more.
+ * In other words, it represents the "same" partition over multiple 
co-partitioned topics, and it is used
+ * to buffer records from that partition in each of the contained 
topic-partitions.
+ * Each StreamTask has exactly one PartitionGroup.
+ *
+ * PartitionGroup implements the algorithm that determines in what order 
buffered records are selected for processing.
+ *
+ * Specifically, when polled, it returns the record from the topic-partition 
with the lowest stream-time.
+ * Stream-time for a topic-partition is defined as the highest timestamp
+ * yet observed at the head of that topic-partition.
+ *
+ * PartitionGroup also maintains a stream-time for the group as a whole.
+ * This is defined as the highest timestamp of any record yet polled from the 
PartitionGroup.
+ * The PartitionGroup's stream-time is also the stream-time of its task and is 
used as the
+ * stream-time for any computations that require it.
+ *
+ * The PartitionGroups's stream-time is initially UNKNOWN (-1), and it set to 
a known value upon first poll.
+ * As a consequence of the definition, the PartitionGroup's stream-time is 
non-decreasing
+ * (i.e., it increases or stays the same over time).
  */
 public class PartitionGroup {
 

Reply via email to