@Guozhang, I've read the KIP and I don't have any further comments in
addition to what's already been discussed.

Thanks,
Bill

On Thu, Aug 9, 2018 at 2:26 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> @Guozhang, I think you can start the VOTE for this KIP? I don't have any
> further comments.
>
> One more nit: we should explicitly state, that the new config is
> wall-clock time based.
>
>
> -Matthias
>
>
> On 8/7/18 12:59 PM, Matthias J. Sax wrote:
> > Correct. It's not about reordering. Records will still be processed in
> > offset-order per partition.
> >
> > For multi-partition task (like joins), we use the timestamp of the
> > "head" record of each partition to determine which record to process
> > first (to process records across partitions in timestamp order if
> > possible) -- however, if one partition does not have a record, we cannot
> > make the decision which record to pick next. Thus, the task blocks and
> > we don't want to block forever. If we unblock on missing data, we might
> > get out-of-order processing with regard to timestamps between two
> > partitions.
> >
> >
> > -Matthias
> >
> > On 8/7/18 12:03 PM, Thomas Becker wrote:
> >> In typing up a scenario to illustrate my question, I think I found the
> answer ;) We are not assuming timestamps will be strictly increasing within
> a topic and trying to make processing order deterministic even in the face
> of that. Thanks for making me think about it (or please correct me if I'm
> wrong).
> >>
> >>
> >> On Tue, 2018-08-07 at 10:48 -0700, Matthias J. Sax wrote:
> >>
> >> @Thomas, just to rephrase (from my understanding):
> >>
> >>
> >> So in the scenario you describe, where one topic has
> >>
> >> vastly lower throughput, you're saying that when the lower throughput
> topic
> >>
> >> is fully caught up (no messages in the buffer), the task will idle
> rather
> >>
> >> than using the timestamp of the last message it saw from that topic?
> >>
> >>
> >> The idea of the KIP is to configure a max blocking time for tasks. This
> >>
> >> allows to provide a 'grace period such that new data for the empty
> >>
> >> partition can we received (if an upstream producer still writes data,
> >>
> >> but with low throughput). After the blocking timeout expires (in case
> >>
> >> throughput is too small or upstream producer died), the task will
> >>
> >> process data even if there are empty input topic partitions (because we
> >>
> >> cannot block forever in a stream processing scenario).
> >>
> >>
> >> Not sure, what you mean by "using the timestamp of the last message"?
> >>
> >> Using for what? The KIP is about processing records of partitions that
> >>
> >> do have data if some other partitions are empty (and to allow users to
> >>
> >> configure a max "blocking time" until processing is "forced").
> >>
> >>
> >>
> >> -Matthias
> >>
> >>
> >>
> >> On 8/7/18 10:02 AM, Guozhang Wang wrote:
> >>
> >> @Tommy
> >>
> >>
> >> Yes that's the intent. Again note that the current behavior is indeed
> "just
> >>
> >> using the timestamp of the last message I saw", and continue processing
> >>
> >> what's in the buffer from other streams, but this may introduce
> >>
> >> out-of-ordering.
> >>
> >>
> >>
> >> Guozhang
> >>
> >>
> >>
> >> On Tue, Aug 7, 2018 at 9:59 AM, Thomas Becker <thomas.bec...@tivo.com
> <mailto:thomas.bec...@tivo.com>>
> >>
> >> wrote:
> >>
> >>
> >> Thanks Guozhang. So in the scenario you describe, where one topic has
> >>
> >> vastly lower throughput, you're saying that when the lower throughput
> topic
> >>
> >> is fully caught up (no messages in the buffer), the task will idle
> rather
> >>
> >> than using the timestamp of the last message it saw from that topic?
> >>
> >> Initially I was under the impression that this would only happen when
> the
> >>
> >> task had not yet seen any messages from one of the partitions.
> >>
> >>
> >> Regarding choosing, you are exactly right. This mechanism is pluggable
> in
> >>
> >> Samza, and I'd like to see something similar done in Kafka Streams. The
> >>
> >> timestamp based choosing policy is great and makes sense in a lot of
> >>
> >> scenarios, but having something like a priority based policy would be
> very
> >>
> >> nice for some of our usecases.
> >>
> >>
> >> -Tommy
> >>
> >>
> >> On Tue, 2018-08-07 at 09:30 -0700, Guozhang Wang wrote:
> >>
> >>
> >> @Ted
> >>
> >>
> >>
> >> Yes, I will update the KIP mentioning this as a separate consideration.
> >>
> >>
> >>
> >>
> >> @Thomas
> >>
> >>
> >>
> >> The idle period may be happening during the processing as well. Think:
> if
> >>
> >>
> >> you are joining two streams with very different throughput traffic, say
> for
> >>
> >>
> >> an extreme case, one stream comes in as 100K messages / sec, another
> comes
> >>
> >>
> >> in as 1 message / sec. Then it could happen from time to time that we
> have
> >>
> >>
> >> reached the tail of the low-traffic stream and do not have any data
> >>
> >>
> >> received yet from that stream, while the other stream still have
> >>
> >>
> >> unprocessed buffered data. Currently we will always go ahead and just
> >>
> >>
> >> process the other stream's buffered data, but bare in mind that when we
> >>
> >>
> >> eventually have received the data from the low-traffic stream we
> realized
> >>
> >>
> >> that its timestamp is even smaller than what we already have processed,
> and
> >>
> >>
> >> hence accidentally introduced out-of-ordering data.
> >>
> >>
> >>
> >> What you described in stream-table joins is also a common case (e.g.
> >>
> >>
> >> https://issues.apache.org/jira/browse/KAFKA-4113). Personally I think
> it
> >>
> >> is
> >>
> >>
> >> more related to a general theme, of "messaging choosing", in which Kafka
> >>
> >>
> >> Streams today only allows timestamp-synchronization-based message
> >>
> >> choosing.
> >>
> >>
> >> Other mechanisms are requested as well, e.g. topic-priority based
> messaging
> >>
> >>
> >> choosing, or type based (e.g. always prefer KTable over KStream). And
> this
> >>
> >>
> >> is not only for Streams, but also for generally Consumer itself: e.g.
> Nick
> >>
> >>
> >> has recently proposed this (
> >>
> >>
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>
> >> 349%3A+Priorities+for+Source+Topics).
> >>
> >>
> >> I think this topic itself should be discussed as a separate KIP, maybe
> for
> >>
> >>
> >> both Streams and Consumer clients, and hence I intentionally avoid
> >>
> >>
> >> overlapping with it and stays with a static messaging choosing
> mechanism in
> >>
> >>
> >> my KIP.
> >>
> >>
> >>
> >>
> >>
> >> Guozhang
> >>
> >>
> >>
> >>
> >> On Tue, Aug 7, 2018 at 4:55 AM, Thomas Becker <thomas.bec...@tivo.com
> <mailto:thomas.bec...@tivo.com><
> >>
> >> mailto:thomas.bec...@tivo.com<mailto:thomas.bec...@tivo.com>>>
> >>
> >>
> >> wrote:
> >>
> >>
> >>
> >> This looks like a big step in the right direction IMO. So am I correct
> in
> >>
> >>
> >> assuming this idle period would only come into play after startup when
> >>
> >>
> >> waiting for initial records to be fetched? In other words, once we have
> >>
> >>
> >> seen records from all topics and have established the stream time
> >>
> >>
> >> processing will not go idle again right?
> >>
> >>
> >>
> >> I still feel that timestamp semantics are not wanted in all cases.
> >>
> >>
> >> Consider a simple stream-table join to augment incoming events where the
> >>
> >>
> >> table data has been updated recently (and hence has a later timestamp
> than
> >>
> >>
> >> some incoming events). Currently this will not be joined at all
> (assuming
> >>
> >>
> >> older table records have been compacted) until the timestamps on events
> >>
> >>
> >> start passing the table updates. For use cases like this I'd like to be
> >>
> >>
> >> able to say always prefer processing the table backing topic if it has
> data
> >>
> >>
> >> available, regardless of timestamp.
> >>
> >>
> >>
> >> On Fri, 2018-08-03 at 14:00 -0700, Guozhang Wang wrote:
> >>
> >>
> >>
> >> Hello all,
> >>
> >>
> >>
> >>
> >> I would like to kick off a discussion on the following KIP, to allow
> users
> >>
> >>
> >>
> >> control when a task can be processed based on its buffered records, and
> how
> >>
> >>
> >>
> >> the stream time of a task be advanced.
> >>
> >>
> >>
> >>
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%
> >>
> >>
> >> 3A+Improve+Kafka+Streams+Timestamp+Synchronization
> >>
> >>
> >>
> >>
> >> This is related to one of the root causes of out-of-ordering data in
> Kafka
> >>
> >>
> >>
> >> Streams. Any thoughts / comments on this topic is more than welcomed.
> >>
> >>
> >>
> >>
> >>
> >> Thanks,
> >>
> >>
> >>
> >> -- Guozhang
> >>
> >>
> >>
> >>
> >> ________________________________
> >>
> >>
> >>
> >> This email and any attachments may contain confidential and privileged
> >>
> >>
> >> material for the sole use of the intended recipient. Any review,
> copying,
> >>
> >>
> >> or distribution of this email (or any attachments) by others is
> prohibited.
> >>
> >>
> >> If you are not the intended recipient, please contact the sender
> >>
> >>
> >> immediately and permanently delete this email and any attachments. No
> >>
> >>
> >> employee or agent of TiVo Inc. is authorized to conclude any binding
> >>
> >>
> >> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
> >>
> >>
> >> Inc. may only be made by a signed written agreement.
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> ________________________________
> >>
> >>
> >> This email and any attachments may contain confidential and privileged
> >>
> >> material for the sole use of the intended recipient. Any review,
> copying,
> >>
> >> or distribution of this email (or any attachments) by others is
> prohibited.
> >>
> >> If you are not the intended recipient, please contact the sender
> >>
> >> immediately and permanently delete this email and any attachments. No
> >>
> >> employee or agent of TiVo Inc. is authorized to conclude any binding
> >>
> >> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
> >>
> >> Inc. may only be made by a signed written agreement.
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> ________________________________
> >>
> >> This email and any attachments may contain confidential and privileged
> material for the sole use of the intended recipient. Any review, copying,
> or distribution of this email (or any attachments) by others is prohibited.
> If you are not the intended recipient, please contact the sender
> immediately and permanently delete this email and any attachments. No
> employee or agent of TiVo Inc. is authorized to conclude any binding
> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
> Inc. may only be made by a signed written agreement.
> >>
> >
>
>

Reply via email to