@Guozhang, I think you can start the VOTE for this KIP? I don't have any further comments.
One more nit: we should explicitly state, that the new config is wall-clock time based. -Matthias On 8/7/18 12:59 PM, Matthias J. Sax wrote: > Correct. It's not about reordering. Records will still be processed in > offset-order per partition. > > For multi-partition task (like joins), we use the timestamp of the > "head" record of each partition to determine which record to process > first (to process records across partitions in timestamp order if > possible) -- however, if one partition does not have a record, we cannot > make the decision which record to pick next. Thus, the task blocks and > we don't want to block forever. If we unblock on missing data, we might > get out-of-order processing with regard to timestamps between two > partitions. > > > -Matthias > > On 8/7/18 12:03 PM, Thomas Becker wrote: >> In typing up a scenario to illustrate my question, I think I found the >> answer ;) We are not assuming timestamps will be strictly increasing within >> a topic and trying to make processing order deterministic even in the face >> of that. Thanks for making me think about it (or please correct me if I'm >> wrong). >> >> >> On Tue, 2018-08-07 at 10:48 -0700, Matthias J. Sax wrote: >> >> @Thomas, just to rephrase (from my understanding): >> >> >> So in the scenario you describe, where one topic has >> >> vastly lower throughput, you're saying that when the lower throughput topic >> >> is fully caught up (no messages in the buffer), the task will idle rather >> >> than using the timestamp of the last message it saw from that topic? >> >> >> The idea of the KIP is to configure a max blocking time for tasks. This >> >> allows to provide a 'grace period such that new data for the empty >> >> partition can we received (if an upstream producer still writes data, >> >> but with low throughput). After the blocking timeout expires (in case >> >> throughput is too small or upstream producer died), the task will >> >> process data even if there are empty input topic partitions (because we >> >> cannot block forever in a stream processing scenario). >> >> >> Not sure, what you mean by "using the timestamp of the last message"? >> >> Using for what? The KIP is about processing records of partitions that >> >> do have data if some other partitions are empty (and to allow users to >> >> configure a max "blocking time" until processing is "forced"). >> >> >> >> -Matthias >> >> >> >> On 8/7/18 10:02 AM, Guozhang Wang wrote: >> >> @Tommy >> >> >> Yes that's the intent. Again note that the current behavior is indeed "just >> >> using the timestamp of the last message I saw", and continue processing >> >> what's in the buffer from other streams, but this may introduce >> >> out-of-ordering. >> >> >> >> Guozhang >> >> >> >> On Tue, Aug 7, 2018 at 9:59 AM, Thomas Becker >> <thomas.bec...@tivo.com<mailto:thomas.bec...@tivo.com>> >> >> wrote: >> >> >> Thanks Guozhang. So in the scenario you describe, where one topic has >> >> vastly lower throughput, you're saying that when the lower throughput topic >> >> is fully caught up (no messages in the buffer), the task will idle rather >> >> than using the timestamp of the last message it saw from that topic? >> >> Initially I was under the impression that this would only happen when the >> >> task had not yet seen any messages from one of the partitions. >> >> >> Regarding choosing, you are exactly right. This mechanism is pluggable in >> >> Samza, and I'd like to see something similar done in Kafka Streams. The >> >> timestamp based choosing policy is great and makes sense in a lot of >> >> scenarios, but having something like a priority based policy would be very >> >> nice for some of our usecases. >> >> >> -Tommy >> >> >> On Tue, 2018-08-07 at 09:30 -0700, Guozhang Wang wrote: >> >> >> @Ted >> >> >> >> Yes, I will update the KIP mentioning this as a separate consideration. >> >> >> >> >> @Thomas >> >> >> >> The idle period may be happening during the processing as well. Think: if >> >> >> you are joining two streams with very different throughput traffic, say for >> >> >> an extreme case, one stream comes in as 100K messages / sec, another comes >> >> >> in as 1 message / sec. Then it could happen from time to time that we have >> >> >> reached the tail of the low-traffic stream and do not have any data >> >> >> received yet from that stream, while the other stream still have >> >> >> unprocessed buffered data. Currently we will always go ahead and just >> >> >> process the other stream's buffered data, but bare in mind that when we >> >> >> eventually have received the data from the low-traffic stream we realized >> >> >> that its timestamp is even smaller than what we already have processed, and >> >> >> hence accidentally introduced out-of-ordering data. >> >> >> >> What you described in stream-table joins is also a common case (e.g. >> >> >> https://issues.apache.org/jira/browse/KAFKA-4113). Personally I think it >> >> is >> >> >> more related to a general theme, of "messaging choosing", in which Kafka >> >> >> Streams today only allows timestamp-synchronization-based message >> >> choosing. >> >> >> Other mechanisms are requested as well, e.g. topic-priority based messaging >> >> >> choosing, or type based (e.g. always prefer KTable over KStream). And this >> >> >> is not only for Streams, but also for generally Consumer itself: e.g. Nick >> >> >> has recently proposed this ( >> >> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> >> 349%3A+Priorities+for+Source+Topics). >> >> >> I think this topic itself should be discussed as a separate KIP, maybe for >> >> >> both Streams and Consumer clients, and hence I intentionally avoid >> >> >> overlapping with it and stays with a static messaging choosing mechanism in >> >> >> my KIP. >> >> >> >> >> >> Guozhang >> >> >> >> >> On Tue, Aug 7, 2018 at 4:55 AM, Thomas Becker >> <thomas.bec...@tivo.com<mailto:thomas.bec...@tivo.com>< >> >> mailto:thomas.bec...@tivo.com<mailto:thomas.bec...@tivo.com>>> >> >> >> wrote: >> >> >> >> This looks like a big step in the right direction IMO. So am I correct in >> >> >> assuming this idle period would only come into play after startup when >> >> >> waiting for initial records to be fetched? In other words, once we have >> >> >> seen records from all topics and have established the stream time >> >> >> processing will not go idle again right? >> >> >> >> I still feel that timestamp semantics are not wanted in all cases. >> >> >> Consider a simple stream-table join to augment incoming events where the >> >> >> table data has been updated recently (and hence has a later timestamp than >> >> >> some incoming events). Currently this will not be joined at all (assuming >> >> >> older table records have been compacted) until the timestamps on events >> >> >> start passing the table updates. For use cases like this I'd like to be >> >> >> able to say always prefer processing the table backing topic if it has data >> >> >> available, regardless of timestamp. >> >> >> >> On Fri, 2018-08-03 at 14:00 -0700, Guozhang Wang wrote: >> >> >> >> Hello all, >> >> >> >> >> I would like to kick off a discussion on the following KIP, to allow users >> >> >> >> control when a task can be processed based on its buffered records, and how >> >> >> >> the stream time of a task be advanced. >> >> >> >> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-353% >> >> >> 3A+Improve+Kafka+Streams+Timestamp+Synchronization >> >> >> >> >> This is related to one of the root causes of out-of-ordering data in Kafka >> >> >> >> Streams. Any thoughts / comments on this topic is more than welcomed. >> >> >> >> >> >> Thanks, >> >> >> >> -- Guozhang >> >> >> >> >> ________________________________ >> >> >> >> This email and any attachments may contain confidential and privileged >> >> >> material for the sole use of the intended recipient. Any review, copying, >> >> >> or distribution of this email (or any attachments) by others is prohibited. >> >> >> If you are not the intended recipient, please contact the sender >> >> >> immediately and permanently delete this email and any attachments. No >> >> >> employee or agent of TiVo Inc. is authorized to conclude any binding >> >> >> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo >> >> >> Inc. may only be made by a signed written agreement. >> >> >> >> >> >> >> >> ________________________________ >> >> >> This email and any attachments may contain confidential and privileged >> >> material for the sole use of the intended recipient. Any review, copying, >> >> or distribution of this email (or any attachments) by others is prohibited. >> >> If you are not the intended recipient, please contact the sender >> >> immediately and permanently delete this email and any attachments. No >> >> employee or agent of TiVo Inc. is authorized to conclude any binding >> >> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo >> >> Inc. may only be made by a signed written agreement. >> >> >> >> >> >> >> >> ________________________________ >> >> This email and any attachments may contain confidential and privileged >> material for the sole use of the intended recipient. Any review, copying, or >> distribution of this email (or any attachments) by others is prohibited. If >> you are not the intended recipient, please contact the sender immediately >> and permanently delete this email and any attachments. No employee or agent >> of TiVo Inc. is authorized to conclude any binding agreement on behalf of >> TiVo Inc. by email. Binding agreements with TiVo Inc. may only be made by a >> signed written agreement. >> >
signature.asc
Description: OpenPGP digital signature