[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15990176#comment-15990176
 ] 

Michal Borowiecki commented on KAFKA-3514:
------------------------------------------

I think the description of this ticket is missing an important detail.
If my understanding is correct, it will behave as described if all the records 
arrive in a single batch.
However, if the records preceding the record with timestamp "1" come in a 
separate batch (I'll use brackets to depict batch boundaries):
{code}
Stream A: [5, 6, 7, 8, 9], [1, 10]

Stream B: [2, 3, 4, 5]
{code}
then initially the timestamp for stream A is going to be set to "5" (minimum of 
the first batch) and since it's not allowed to move back, the second batch 
containing the late arriving record "1" is not going to change that. Stream B 
is going to be drained first until "5".
However, if the batch boundaries are different by just one record and the late 
arriving "1" is in the first batch:
{code}
Stream A: [5, 6, 7, 8, 9, 1], [10]

Stream B: [2, 3, 4, 5]
{code}
 then it's going to behave as currently described.

Please correct me if I got this wrong.
But if that is the case, it feels all too non-deterministic and I think the 
timestamp computation deserves further thought beyond the scope of 
[KIP-138|https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics],
 which is limited to punctuate semantics, but not stream time semantics in 
general.

> Stream timestamp computation needs some further thoughts
> --------------------------------------------------------
>
>                 Key: KAFKA-3514
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3514
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Eno Thereska
>              Labels: architecture
>             Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to