@Tommy Yes that's the intent. Again note that the current behavior is indeed "just using the timestamp of the last message I saw", and continue processing what's in the buffer from other streams, but this may introduce out-of-ordering.
Guozhang On Tue, Aug 7, 2018 at 9:59 AM, Thomas Becker <thomas.bec...@tivo.com> wrote: > Thanks Guozhang. So in the scenario you describe, where one topic has > vastly lower throughput, you're saying that when the lower throughput topic > is fully caught up (no messages in the buffer), the task will idle rather > than using the timestamp of the last message it saw from that topic? > Initially I was under the impression that this would only happen when the > task had not yet seen any messages from one of the partitions. > > Regarding choosing, you are exactly right. This mechanism is pluggable in > Samza, and I'd like to see something similar done in Kafka Streams. The > timestamp based choosing policy is great and makes sense in a lot of > scenarios, but having something like a priority based policy would be very > nice for some of our usecases. > > -Tommy > > On Tue, 2018-08-07 at 09:30 -0700, Guozhang Wang wrote: > > @Ted > > > Yes, I will update the KIP mentioning this as a separate consideration. > > > > @Thomas > > > The idle period may be happening during the processing as well. Think: if > > you are joining two streams with very different throughput traffic, say for > > an extreme case, one stream comes in as 100K messages / sec, another comes > > in as 1 message / sec. Then it could happen from time to time that we have > > reached the tail of the low-traffic stream and do not have any data > > received yet from that stream, while the other stream still have > > unprocessed buffered data. Currently we will always go ahead and just > > process the other stream's buffered data, but bare in mind that when we > > eventually have received the data from the low-traffic stream we realized > > that its timestamp is even smaller than what we already have processed, and > > hence accidentally introduced out-of-ordering data. > > > What you described in stream-table joins is also a common case (e.g. > > https://issues.apache.org/jira/browse/KAFKA-4113). Personally I think it > is > > more related to a general theme, of "messaging choosing", in which Kafka > > Streams today only allows timestamp-synchronization-based message > choosing. > > Other mechanisms are requested as well, e.g. topic-priority based messaging > > choosing, or type based (e.g. always prefer KTable over KStream). And this > > is not only for Streams, but also for generally Consumer itself: e.g. Nick > > has recently proposed this ( > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 349%3A+Priorities+for+Source+Topics). > > I think this topic itself should be discussed as a separate KIP, maybe for > > both Streams and Consumer clients, and hence I intentionally avoid > > overlapping with it and stays with a static messaging choosing mechanism in > > my KIP. > > > > > Guozhang > > > > On Tue, Aug 7, 2018 at 4:55 AM, Thomas Becker <thomas.bec...@tivo.com< > mailto:thomas.bec...@tivo.com>> > > wrote: > > > This looks like a big step in the right direction IMO. So am I correct in > > assuming this idle period would only come into play after startup when > > waiting for initial records to be fetched? In other words, once we have > > seen records from all topics and have established the stream time > > processing will not go idle again right? > > > I still feel that timestamp semantics are not wanted in all cases. > > Consider a simple stream-table join to augment incoming events where the > > table data has been updated recently (and hence has a later timestamp than > > some incoming events). Currently this will not be joined at all (assuming > > older table records have been compacted) until the timestamps on events > > start passing the table updates. For use cases like this I'd like to be > > able to say always prefer processing the table backing topic if it has data > > available, regardless of timestamp. > > > On Fri, 2018-08-03 at 14:00 -0700, Guozhang Wang wrote: > > > Hello all, > > > > I would like to kick off a discussion on the following KIP, to allow users > > > control when a task can be processed based on its buffered records, and how > > > the stream time of a task be advanced. > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-353% > > 3A+Improve+Kafka+Streams+Timestamp+Synchronization > > > > This is related to one of the root causes of out-of-ordering data in Kafka > > > Streams. Any thoughts / comments on this topic is more than welcomed. > > > > > Thanks, > > > -- Guozhang > > > > ________________________________ > > > This email and any attachments may contain confidential and privileged > > material for the sole use of the intended recipient. Any review, copying, > > or distribution of this email (or any attachments) by others is prohibited. > > If you are not the intended recipient, please contact the sender > > immediately and permanently delete this email and any attachments. No > > employee or agent of TiVo Inc. is authorized to conclude any binding > > agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo > > Inc. may only be made by a signed written agreement. > > > > > > > ________________________________ > > This email and any attachments may contain confidential and privileged > material for the sole use of the intended recipient. Any review, copying, > or distribution of this email (or any attachments) by others is prohibited. > If you are not the intended recipient, please contact the sender > immediately and permanently delete this email and any attachments. No > employee or agent of TiVo Inc. is authorized to conclude any binding > agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo > Inc. may only be made by a signed written agreement. > -- -- Guozhang