Thanks Guozhang. So in the scenario you describe, where one topic has vastly 
lower throughput, you're saying that when the lower throughput topic is fully 
caught up (no messages in the buffer), the task will idle rather than using the 
timestamp of the last message it saw from that topic? Initially I was under the 
impression that this would only happen when the task had not yet seen any 
messages from one of the partitions.

Regarding choosing, you are exactly right. This mechanism is pluggable in 
Samza, and I'd like to see something similar done in Kafka Streams. The 
timestamp based choosing policy is great and makes sense in a lot of scenarios, 
but having something like a priority based policy would be very nice for some 
of our usecases.

-Tommy

On Tue, 2018-08-07 at 09:30 -0700, Guozhang Wang wrote:

@Ted


Yes, I will update the KIP mentioning this as a separate consideration.



@Thomas


The idle period may be happening during the processing as well. Think: if

you are joining two streams with very different throughput traffic, say for

an extreme case, one stream comes in as 100K messages / sec, another comes

in as 1 message / sec. Then it could happen from time to time that we have

reached the tail of the low-traffic stream and do not have any data

received yet from that stream, while the other stream still have

unprocessed buffered data. Currently we will always go ahead and just

process the other stream's buffered data, but bare in mind that when we

eventually have received the data from the low-traffic stream we realized

that its timestamp is even smaller than what we already have processed, and

hence accidentally introduced out-of-ordering data.


What you described in stream-table joins is also a common case (e.g.

https://issues.apache.org/jira/browse/KAFKA-4113). Personally I think it is

more related to a general theme, of "messaging choosing", in which Kafka

Streams today only allows timestamp-synchronization-based message choosing.

Other mechanisms are requested as well, e.g. topic-priority based messaging

choosing, or type based (e.g. always prefer KTable over KStream). And this

is not only for Streams, but also for generally Consumer itself: e.g. Nick

has recently proposed this (

https://cwiki.apache.org/confluence/display/KAFKA/KIP-349%3A+Priorities+for+Source+Topics).

I think this topic itself should be discussed as a separate KIP, maybe for

both Streams and Consumer clients, and hence I intentionally avoid

overlapping with it and stays with a static messaging choosing mechanism in

my KIP.




Guozhang



On Tue, Aug 7, 2018 at 4:55 AM, Thomas Becker 
<thomas.bec...@tivo.com<mailto:thomas.bec...@tivo.com>>

wrote:


This looks like a big step in the right direction IMO. So am I correct in

assuming this idle period would only come into play after startup when

waiting for initial records to be fetched? In other words, once we have

seen records from all topics and have established the stream time

processing will not go idle again right?


I still feel that timestamp semantics are not wanted in all cases.

Consider a simple stream-table join to augment incoming events where the

table data has been updated recently (and hence has a later timestamp than

some incoming events). Currently this will not be joined at all (assuming

older table records have been compacted) until the timestamps on events

start passing the table updates. For use cases like this I'd like to be

able to say always prefer processing the table backing topic if it has data

available, regardless of timestamp.


On Fri, 2018-08-03 at 14:00 -0700, Guozhang Wang wrote:


Hello all,



I would like to kick off a discussion on the following KIP, to allow users


control when a task can be processed based on its buffered records, and how


the stream time of a task be advanced.



https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%

3A+Improve+Kafka+Streams+Timestamp+Synchronization



This is related to one of the root causes of out-of-ordering data in Kafka


Streams. Any thoughts / comments on this topic is more than welcomed.




Thanks,


-- Guozhang



________________________________


This email and any attachments may contain confidential and privileged

material for the sole use of the intended recipient. Any review, copying,

or distribution of this email (or any attachments) by others is prohibited.

If you are not the intended recipient, please contact the sender

immediately and permanently delete this email and any attachments. No

employee or agent of TiVo Inc. is authorized to conclude any binding

agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo

Inc. may only be made by a signed written agreement.






________________________________

This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.

Reply via email to