In typing up a scenario to illustrate my question, I think I found the answer 
;) We are not assuming timestamps will be strictly increasing within a topic 
and trying to make processing order deterministic even in the face of that. 
Thanks for making me think about it (or please correct me if I'm wrong).


On Tue, 2018-08-07 at 10:48 -0700, Matthias J. Sax wrote:

@Thomas, just to rephrase (from my understanding):


So in the scenario you describe, where one topic has

vastly lower throughput, you're saying that when the lower throughput topic

is fully caught up (no messages in the buffer), the task will idle rather

than using the timestamp of the last message it saw from that topic?


The idea of the KIP is to configure a max blocking time for tasks. This

allows to provide a 'grace period such that new data for the empty

partition can we received (if an upstream producer still writes data,

but with low throughput). After the blocking timeout expires (in case

throughput is too small or upstream producer died), the task will

process data even if there are empty input topic partitions (because we

cannot block forever in a stream processing scenario).


Not sure, what you mean by "using the timestamp of the last message"?

Using for what? The KIP is about processing records of partitions that

do have data if some other partitions are empty (and to allow users to

configure a max "blocking time" until processing is "forced").



-Matthias



On 8/7/18 10:02 AM, Guozhang Wang wrote:

@Tommy


Yes that's the intent. Again note that the current behavior is indeed "just

using the timestamp of the last message I saw", and continue processing

what's in the buffer from other streams, but this may introduce

out-of-ordering.



Guozhang



On Tue, Aug 7, 2018 at 9:59 AM, Thomas Becker 
<thomas.bec...@tivo.com<mailto:thomas.bec...@tivo.com>>

wrote:


Thanks Guozhang. So in the scenario you describe, where one topic has

vastly lower throughput, you're saying that when the lower throughput topic

is fully caught up (no messages in the buffer), the task will idle rather

than using the timestamp of the last message it saw from that topic?

Initially I was under the impression that this would only happen when the

task had not yet seen any messages from one of the partitions.


Regarding choosing, you are exactly right. This mechanism is pluggable in

Samza, and I'd like to see something similar done in Kafka Streams. The

timestamp based choosing policy is great and makes sense in a lot of

scenarios, but having something like a priority based policy would be very

nice for some of our usecases.


-Tommy


On Tue, 2018-08-07 at 09:30 -0700, Guozhang Wang wrote:


@Ted



Yes, I will update the KIP mentioning this as a separate consideration.




@Thomas



The idle period may be happening during the processing as well. Think: if


you are joining two streams with very different throughput traffic, say for


an extreme case, one stream comes in as 100K messages / sec, another comes


in as 1 message / sec. Then it could happen from time to time that we have


reached the tail of the low-traffic stream and do not have any data


received yet from that stream, while the other stream still have


unprocessed buffered data. Currently we will always go ahead and just


process the other stream's buffered data, but bare in mind that when we


eventually have received the data from the low-traffic stream we realized


that its timestamp is even smaller than what we already have processed, and


hence accidentally introduced out-of-ordering data.



What you described in stream-table joins is also a common case (e.g.


https://issues.apache.org/jira/browse/KAFKA-4113). Personally I think it

is


more related to a general theme, of "messaging choosing", in which Kafka


Streams today only allows timestamp-synchronization-based message

choosing.


Other mechanisms are requested as well, e.g. topic-priority based messaging


choosing, or type based (e.g. always prefer KTable over KStream). And this


is not only for Streams, but also for generally Consumer itself: e.g. Nick


has recently proposed this (


https://cwiki.apache.org/confluence/display/KAFKA/KIP-

349%3A+Priorities+for+Source+Topics).


I think this topic itself should be discussed as a separate KIP, maybe for


both Streams and Consumer clients, and hence I intentionally avoid


overlapping with it and stays with a static messaging choosing mechanism in


my KIP.





Guozhang




On Tue, Aug 7, 2018 at 4:55 AM, Thomas Becker 
<thomas.bec...@tivo.com<mailto:thomas.bec...@tivo.com><

mailto:thomas.bec...@tivo.com<mailto:thomas.bec...@tivo.com>>>


wrote:



This looks like a big step in the right direction IMO. So am I correct in


assuming this idle period would only come into play after startup when


waiting for initial records to be fetched? In other words, once we have


seen records from all topics and have established the stream time


processing will not go idle again right?



I still feel that timestamp semantics are not wanted in all cases.


Consider a simple stream-table join to augment incoming events where the


table data has been updated recently (and hence has a later timestamp than


some incoming events). Currently this will not be joined at all (assuming


older table records have been compacted) until the timestamps on events


start passing the table updates. For use cases like this I'd like to be


able to say always prefer processing the table backing topic if it has data


available, regardless of timestamp.



On Fri, 2018-08-03 at 14:00 -0700, Guozhang Wang wrote:



Hello all,




I would like to kick off a discussion on the following KIP, to allow users



control when a task can be processed based on its buffered records, and how



the stream time of a task be advanced.




https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%


3A+Improve+Kafka+Streams+Timestamp+Synchronization




This is related to one of the root causes of out-of-ordering data in Kafka



Streams. Any thoughts / comments on this topic is more than welcomed.





Thanks,



-- Guozhang




________________________________



This email and any attachments may contain confidential and privileged


material for the sole use of the intended recipient. Any review, copying,


or distribution of this email (or any attachments) by others is prohibited.


If you are not the intended recipient, please contact the sender


immediately and permanently delete this email and any attachments. No


employee or agent of TiVo Inc. is authorized to conclude any binding


agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo


Inc. may only be made by a signed written agreement.







________________________________


This email and any attachments may contain confidential and privileged

material for the sole use of the intended recipient. Any review, copying,

or distribution of this email (or any attachments) by others is prohibited.

If you are not the intended recipient, please contact the sender

immediately and permanently delete this email and any attachments. No

employee or agent of TiVo Inc. is authorized to conclude any binding

agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo

Inc. may only be made by a signed written agreement.







________________________________

This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.

Reply via email to