@Tommy

Yes that's the intent. Again note that the current behavior is indeed "just
using the timestamp of the last message I saw", and continue processing
what's in the buffer from other streams, but this may introduce
out-of-ordering.


Guozhang


On Tue, Aug 7, 2018 at 9:59 AM, Thomas Becker <thomas.bec...@tivo.com>
wrote:

> Thanks Guozhang. So in the scenario you describe, where one topic has
> vastly lower throughput, you're saying that when the lower throughput topic
> is fully caught up (no messages in the buffer), the task will idle rather
> than using the timestamp of the last message it saw from that topic?
> Initially I was under the impression that this would only happen when the
> task had not yet seen any messages from one of the partitions.
>
> Regarding choosing, you are exactly right. This mechanism is pluggable in
> Samza, and I'd like to see something similar done in Kafka Streams. The
> timestamp based choosing policy is great and makes sense in a lot of
> scenarios, but having something like a priority based policy would be very
> nice for some of our usecases.
>
> -Tommy
>
> On Tue, 2018-08-07 at 09:30 -0700, Guozhang Wang wrote:
>
> @Ted
>
>
> Yes, I will update the KIP mentioning this as a separate consideration.
>
>
>
> @Thomas
>
>
> The idle period may be happening during the processing as well. Think: if
>
> you are joining two streams with very different throughput traffic, say for
>
> an extreme case, one stream comes in as 100K messages / sec, another comes
>
> in as 1 message / sec. Then it could happen from time to time that we have
>
> reached the tail of the low-traffic stream and do not have any data
>
> received yet from that stream, while the other stream still have
>
> unprocessed buffered data. Currently we will always go ahead and just
>
> process the other stream's buffered data, but bare in mind that when we
>
> eventually have received the data from the low-traffic stream we realized
>
> that its timestamp is even smaller than what we already have processed, and
>
> hence accidentally introduced out-of-ordering data.
>
>
> What you described in stream-table joins is also a common case (e.g.
>
> https://issues.apache.org/jira/browse/KAFKA-4113). Personally I think it
> is
>
> more related to a general theme, of "messaging choosing", in which Kafka
>
> Streams today only allows timestamp-synchronization-based message
> choosing.
>
> Other mechanisms are requested as well, e.g. topic-priority based messaging
>
> choosing, or type based (e.g. always prefer KTable over KStream). And this
>
> is not only for Streams, but also for generally Consumer itself: e.g. Nick
>
> has recently proposed this (
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 349%3A+Priorities+for+Source+Topics).
>
> I think this topic itself should be discussed as a separate KIP, maybe for
>
> both Streams and Consumer clients, and hence I intentionally avoid
>
> overlapping with it and stays with a static messaging choosing mechanism in
>
> my KIP.
>
>
>
>
> Guozhang
>
>
>
> On Tue, Aug 7, 2018 at 4:55 AM, Thomas Becker <thomas.bec...@tivo.com<
> mailto:thomas.bec...@tivo.com>>
>
> wrote:
>
>
> This looks like a big step in the right direction IMO. So am I correct in
>
> assuming this idle period would only come into play after startup when
>
> waiting for initial records to be fetched? In other words, once we have
>
> seen records from all topics and have established the stream time
>
> processing will not go idle again right?
>
>
> I still feel that timestamp semantics are not wanted in all cases.
>
> Consider a simple stream-table join to augment incoming events where the
>
> table data has been updated recently (and hence has a later timestamp than
>
> some incoming events). Currently this will not be joined at all (assuming
>
> older table records have been compacted) until the timestamps on events
>
> start passing the table updates. For use cases like this I'd like to be
>
> able to say always prefer processing the table backing topic if it has data
>
> available, regardless of timestamp.
>
>
> On Fri, 2018-08-03 at 14:00 -0700, Guozhang Wang wrote:
>
>
> Hello all,
>
>
>
> I would like to kick off a discussion on the following KIP, to allow users
>
>
> control when a task can be processed based on its buffered records, and how
>
>
> the stream time of a task be advanced.
>
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%
>
> 3A+Improve+Kafka+Streams+Timestamp+Synchronization
>
>
>
> This is related to one of the root causes of out-of-ordering data in Kafka
>
>
> Streams. Any thoughts / comments on this topic is more than welcomed.
>
>
>
>
> Thanks,
>
>
> -- Guozhang
>
>
>
> ________________________________
>
>
> This email and any attachments may contain confidential and privileged
>
> material for the sole use of the intended recipient. Any review, copying,
>
> or distribution of this email (or any attachments) by others is prohibited.
>
> If you are not the intended recipient, please contact the sender
>
> immediately and permanently delete this email and any attachments. No
>
> employee or agent of TiVo Inc. is authorized to conclude any binding
>
> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
>
> Inc. may only be made by a signed written agreement.
>
>
>
>
>
>
> ________________________________
>
> This email and any attachments may contain confidential and privileged
> material for the sole use of the intended recipient. Any review, copying,
> or distribution of this email (or any attachments) by others is prohibited.
> If you are not the intended recipient, please contact the sender
> immediately and permanently delete this email and any attachments. No
> employee or agent of TiVo Inc. is authorized to conclude any binding
> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
> Inc. may only be made by a signed written agreement.
>



-- 
-- Guozhang

Reply via email to