[ 
https://issues.apache.org/jira/browse/KAFKA-5144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15990619#comment-15990619
 ] 

Matthias J. Sax edited comment on KAFKA-5144 at 5/1/17 7:29 AM:
----------------------------------------------------------------

It is intended behavior. {{MinTimestampTracker}} tracks the minimum timestamp 
for non-processed records in the buffer. The undocumented usage pattern is, 
that `addElement()` and `removeElement()` are (must be) called in the same 
order. Your tests don't follow this pattern. The logic is about tracking the 
current partition time as minimum of whatever is in the buffer (also 
considering out-of-order records) (cf. KAFKA-3514)

Example: We get a batch of records with ts {{5, 10, 15, 12, 20}} and add them 
consecutively to the timestamp tracker. Thus, when we process those record one 
by one, is the following steps:

 - process {{5}}, queue {{10, 15, 12, 20}}, partition time {{5}}
 - process {{10}}, queue {{15, 12, 20}}, partition time {{10}}
 - process {{15}}, queue {{12, 20}}, partition time {{12}} (!! current minimum 
is {{12}}, not {{15}} !!)
 - process {{12}}, queue {{20}}, partition time {{12}}
 - process {{20}}, queue empty, partition time {{20}}

The tracker will have the following states when adding the records one by one 
(this happens before processing begins):
 - add {{5}}: {{5}}
 - add {{10}}: {{5, 10}}
 - add {{15}}: {{5, 10, 15}}
 - add {{12}}: {{5, 10, 12}} (!! this the the behavior that is not a bug !!)
 - add {{20}}: {{5, 10, 12, 20}}

During processing, we {{poll}} the head record from the queue, call 
{{removeElement}} on the tracker afterwards. Thus, we get (not we start with 
tracker state {{5, 10, 12, 20}}):
 - poll from queue {{5}}, tracker after remove {{10, 12, 20}}
 - poll from queue {{10}}, tracker after remove {{12, 20}}
 - poll from queue {{15}}, tracker after remove {{12, 20}} (!! as we call 
{{removeElement(15)}} we keep {{12}}, this allows us to use {{12}} for two 
records)
 - poll from queue {{12}}, tracker after remove {{20}}
 - poll from queue {{20}}, tracker after remove empty

Note that the call to {{timeTracker.get()}} ({{RecordQueue}} L124) happens 
after {{timeTracker.removeElement(elem)}} ({{RecordQueue}} L120). This is also 
correct, as we advance "partition time" in this step, but only use it _after_ 
the current record got fully processed.

 - process {{5}}, queue {{10, 15, 12, 20}}, tracker after remove {{10, 12, 
20}}, next partition time {{10}}
 - process {{10}}, queue {{15, 12, 20}}, tracker after remove {{12, 20}}, next 
partition time {{12}}
 - process {{15}}, queue {{12, 20}}, tracker after remove {{12, 20}}, next 
partition time {{12}}
 - process {{12}}, queue {{20}}, tracker after remove {{20}},  next partition 
time {{20}}
 - process {{20}}, queue empty, tracker after remove empty, next partition time 
{{20}} (from {{lastKnownTime}})



was (Author: mjsax):
It is intended behavior. {{MinTimestampTracker}} tracks the minimum timestamp 
for non-processed records in the buffer. The undocumented usage pattern is, 
that `addElement()` and `removeElement()` are (must be) called in the same 
order. Your tests don't follow this pattern. The logic is about tracking the 
current partition time as minimum of whatever is in the buffer (also 
considering out-of-order records) (cf. KAFKA-3514)

Example: We get a batch of records with ts {{5, 10, 15, 12, 20}} and add them 
consecutively to the timestamp tracker. Thus, when we process those record one 
by one, is the following steps (note, I am not sure if "partition time" in this 
example is correct; cf. below):

 - process {{5}}, queue {{10, 15, 12, 20}}, partition time {{5}}
 - process {{10}}, queue {{15, 12, 20}}, partition time {{10}}
 - process {{15}}, queue {{12, 20}}, partition time {{12}} (!! current minimum 
is {{12}}, not {{15}} !!)
 - process {{12}}, queue {{20}}, partition time {{12}}
 - process {{20}}, queue empty, partition time {{20}}

The tracker will have the following states when adding the records one by one 
(this happens before processing begins):
 - add {{5}}: {{5}}
 - add {{10}}: {{5, 10}}
 - add {{15}}: {{5, 10, 15}}
 - add {{12}}: {{5, 10, 12}} (!! this the the behavior that is not a bug !!)
 - add {{20}}: {{5, 10, 12, 20}}

During processing, we {{poll}} the head record from the queue, call 
{{removeElement}} on the tracker afterwards. Thus, we get (not we start with 
tracker state {{5, 10, 12, 20}}):
 - poll from queue {{5}}, tracker after remove {{10, 12, 20}}
 - poll from queue {{10}}, tracker after remove {{12, 20}}
 - poll from queue {{15}}, tracker after remove {{12, 20}} (!! as we call 
{{removeElement(15)}} we keep {{12}}, this allows us to use {{12}} for two 
records)
 - poll from queue {{12}}, tracker after remove {{20}}
 - poll from queue {{20}}, tracker after remove empty

However, I am not sure if we actually advance "partition time" as indented -- 
we might want to call {{timeTracker.get()}} ({{RecordQueue}} L124) before 
{{timeTracker.removeElement(elem)}} ({{RecordQueue}} L120) -- it seem the 
"partition time" in my example above is not what Streams computes atm (even if 
the example seems as it would follow what is the intended "partition time"): if 
I am not wrong, Streams would give:

 - process {{5}}, queue {{10, 15, 12, 20}}, tracker after remove {{10, 12, 
20}}, partition time {{10}}
 - process {{10}}, queue {{15, 12, 20}}, tracker after remove {{12, 20}}, 
partition time {{12}}
 - process {{15}}, queue {{12, 20}}, tracker after remove {{12, 20}}, partition 
time {{12}}
 - process {{12}}, queue {{20}}, tracker after remove {{20}},  partition time 
{{20}}
 - process {{20}}, queue empty, tracker after remove empty, partition time 
{{20}} (from {{lastKnownTime}})


> MinTimestampTracker does not correctly add timestamps lower than the current 
> max
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-5144
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5144
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.1
>            Reporter: Michal Borowiecki
>            Assignee: Michal Borowiecki
>
> When adding elements MinTimestampTracker removes all existing elements 
> greater than the added element.
> Perhaps I've missed something and this is intended behaviour but I can't find 
> any evidence for that in comments or tests.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to