Thanks Matthias, will update the KIP accordingly. On Thu, Aug 9, 2018 at 11:26 AM, Matthias J. Sax <matth...@confluent.io> wrote:
> @Guozhang, I think you can start the VOTE for this KIP? I don't have any > further comments. > > One more nit: we should explicitly state, that the new config is > wall-clock time based. > > > -Matthias > > > On 8/7/18 12:59 PM, Matthias J. Sax wrote: > > Correct. It's not about reordering. Records will still be processed in > > offset-order per partition. > > > > For multi-partition task (like joins), we use the timestamp of the > > "head" record of each partition to determine which record to process > > first (to process records across partitions in timestamp order if > > possible) -- however, if one partition does not have a record, we cannot > > make the decision which record to pick next. Thus, the task blocks and > > we don't want to block forever. If we unblock on missing data, we might > > get out-of-order processing with regard to timestamps between two > > partitions. > > > > > > -Matthias > > > > On 8/7/18 12:03 PM, Thomas Becker wrote: > >> In typing up a scenario to illustrate my question, I think I found the > answer ;) We are not assuming timestamps will be strictly increasing within > a topic and trying to make processing order deterministic even in the face > of that. Thanks for making me think about it (or please correct me if I'm > wrong). > >> > >> > >> On Tue, 2018-08-07 at 10:48 -0700, Matthias J. Sax wrote: > >> > >> @Thomas, just to rephrase (from my understanding): > >> > >> > >> So in the scenario you describe, where one topic has > >> > >> vastly lower throughput, you're saying that when the lower throughput > topic > >> > >> is fully caught up (no messages in the buffer), the task will idle > rather > >> > >> than using the timestamp of the last message it saw from that topic? > >> > >> > >> The idea of the KIP is to configure a max blocking time for tasks. This > >> > >> allows to provide a 'grace period such that new data for the empty > >> > >> partition can we received (if an upstream producer still writes data, > >> > >> but with low throughput). After the blocking timeout expires (in case > >> > >> throughput is too small or upstream producer died), the task will > >> > >> process data even if there are empty input topic partitions (because we > >> > >> cannot block forever in a stream processing scenario). > >> > >> > >> Not sure, what you mean by "using the timestamp of the last message"? > >> > >> Using for what? The KIP is about processing records of partitions that > >> > >> do have data if some other partitions are empty (and to allow users to > >> > >> configure a max "blocking time" until processing is "forced"). > >> > >> > >> > >> -Matthias > >> > >> > >> > >> On 8/7/18 10:02 AM, Guozhang Wang wrote: > >> > >> @Tommy > >> > >> > >> Yes that's the intent. Again note that the current behavior is indeed > "just > >> > >> using the timestamp of the last message I saw", and continue processing > >> > >> what's in the buffer from other streams, but this may introduce > >> > >> out-of-ordering. > >> > >> > >> > >> Guozhang > >> > >> > >> > >> On Tue, Aug 7, 2018 at 9:59 AM, Thomas Becker <thomas.bec...@tivo.com< > mailto:thomas.bec...@tivo.com>> > >> > >> wrote: > >> > >> > >> Thanks Guozhang. So in the scenario you describe, where one topic has > >> > >> vastly lower throughput, you're saying that when the lower throughput > topic > >> > >> is fully caught up (no messages in the buffer), the task will idle > rather > >> > >> than using the timestamp of the last message it saw from that topic? > >> > >> Initially I was under the impression that this would only happen when > the > >> > >> task had not yet seen any messages from one of the partitions. > >> > >> > >> Regarding choosing, you are exactly right. This mechanism is pluggable > in > >> > >> Samza, and I'd like to see something similar done in Kafka Streams. The > >> > >> timestamp based choosing policy is great and makes sense in a lot of > >> > >> scenarios, but having something like a priority based policy would be > very > >> > >> nice for some of our usecases. > >> > >> > >> -Tommy > >> > >> > >> On Tue, 2018-08-07 at 09:30 -0700, Guozhang Wang wrote: > >> > >> > >> @Ted > >> > >> > >> > >> Yes, I will update the KIP mentioning this as a separate consideration. > >> > >> > >> > >> > >> @Thomas > >> > >> > >> > >> The idle period may be happening during the processing as well. Think: > if > >> > >> > >> you are joining two streams with very different throughput traffic, say > for > >> > >> > >> an extreme case, one stream comes in as 100K messages / sec, another > comes > >> > >> > >> in as 1 message / sec. Then it could happen from time to time that we > have > >> > >> > >> reached the tail of the low-traffic stream and do not have any data > >> > >> > >> received yet from that stream, while the other stream still have > >> > >> > >> unprocessed buffered data. Currently we will always go ahead and just > >> > >> > >> process the other stream's buffered data, but bare in mind that when we > >> > >> > >> eventually have received the data from the low-traffic stream we > realized > >> > >> > >> that its timestamp is even smaller than what we already have processed, > and > >> > >> > >> hence accidentally introduced out-of-ordering data. > >> > >> > >> > >> What you described in stream-table joins is also a common case (e.g. > >> > >> > >> https://issues.apache.org/jira/browse/KAFKA-4113). Personally I think > it > >> > >> is > >> > >> > >> more related to a general theme, of "messaging choosing", in which Kafka > >> > >> > >> Streams today only allows timestamp-synchronization-based message > >> > >> choosing. > >> > >> > >> Other mechanisms are requested as well, e.g. topic-priority based > messaging > >> > >> > >> choosing, or type based (e.g. always prefer KTable over KStream). And > this > >> > >> > >> is not only for Streams, but also for generally Consumer itself: e.g. > Nick > >> > >> > >> has recently proposed this ( > >> > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >> > >> 349%3A+Priorities+for+Source+Topics). > >> > >> > >> I think this topic itself should be discussed as a separate KIP, maybe > for > >> > >> > >> both Streams and Consumer clients, and hence I intentionally avoid > >> > >> > >> overlapping with it and stays with a static messaging choosing > mechanism in > >> > >> > >> my KIP. > >> > >> > >> > >> > >> > >> Guozhang > >> > >> > >> > >> > >> On Tue, Aug 7, 2018 at 4:55 AM, Thomas Becker <thomas.bec...@tivo.com< > mailto:thomas.bec...@tivo.com>< > >> > >> mailto:thomas.bec...@tivo.com<mailto:thomas.bec...@tivo.com>>> > >> > >> > >> wrote: > >> > >> > >> > >> This looks like a big step in the right direction IMO. So am I correct > in > >> > >> > >> assuming this idle period would only come into play after startup when > >> > >> > >> waiting for initial records to be fetched? In other words, once we have > >> > >> > >> seen records from all topics and have established the stream time > >> > >> > >> processing will not go idle again right? > >> > >> > >> > >> I still feel that timestamp semantics are not wanted in all cases. > >> > >> > >> Consider a simple stream-table join to augment incoming events where the > >> > >> > >> table data has been updated recently (and hence has a later timestamp > than > >> > >> > >> some incoming events). Currently this will not be joined at all > (assuming > >> > >> > >> older table records have been compacted) until the timestamps on events > >> > >> > >> start passing the table updates. For use cases like this I'd like to be > >> > >> > >> able to say always prefer processing the table backing topic if it has > data > >> > >> > >> available, regardless of timestamp. > >> > >> > >> > >> On Fri, 2018-08-03 at 14:00 -0700, Guozhang Wang wrote: > >> > >> > >> > >> Hello all, > >> > >> > >> > >> > >> I would like to kick off a discussion on the following KIP, to allow > users > >> > >> > >> > >> control when a task can be processed based on its buffered records, and > how > >> > >> > >> > >> the stream time of a task be advanced. > >> > >> > >> > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-353% > >> > >> > >> 3A+Improve+Kafka+Streams+Timestamp+Synchronization > >> > >> > >> > >> > >> This is related to one of the root causes of out-of-ordering data in > Kafka > >> > >> > >> > >> Streams. Any thoughts / comments on this topic is more than welcomed. > >> > >> > >> > >> > >> > >> Thanks, > >> > >> > >> > >> -- Guozhang > >> > >> > >> > >> > >> ________________________________ > >> > >> > >> > >> This email and any attachments may contain confidential and privileged > >> > >> > >> material for the sole use of the intended recipient. Any review, > copying, > >> > >> > >> or distribution of this email (or any attachments) by others is > prohibited. > >> > >> > >> If you are not the intended recipient, please contact the sender > >> > >> > >> immediately and permanently delete this email and any attachments. No > >> > >> > >> employee or agent of TiVo Inc. is authorized to conclude any binding > >> > >> > >> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo > >> > >> > >> Inc. may only be made by a signed written agreement. > >> > >> > >> > >> > >> > >> > >> > >> ________________________________ > >> > >> > >> This email and any attachments may contain confidential and privileged > >> > >> material for the sole use of the intended recipient. Any review, > copying, > >> > >> or distribution of this email (or any attachments) by others is > prohibited. > >> > >> If you are not the intended recipient, please contact the sender > >> > >> immediately and permanently delete this email and any attachments. No > >> > >> employee or agent of TiVo Inc. is authorized to conclude any binding > >> > >> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo > >> > >> Inc. may only be made by a signed written agreement. > >> > >> > >> > >> > >> > >> > >> > >> ________________________________ > >> > >> This email and any attachments may contain confidential and privileged > material for the sole use of the intended recipient. Any review, copying, > or distribution of this email (or any attachments) by others is prohibited. > If you are not the intended recipient, please contact the sender > immediately and permanently delete this email and any attachments. No > employee or agent of TiVo Inc. is authorized to conclude any binding > agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo > Inc. may only be made by a signed written agreement. > >> > > > > -- -- Guozhang