@Ted

Yes, I will update the KIP mentioning this as a separate consideration.


@Thomas

The idle period may be happening during the processing as well. Think: if
you are joining two streams with very different throughput traffic, say for
an extreme case, one stream comes in as 100K messages / sec, another comes
in as 1 message / sec. Then it could happen from time to time that we have
reached the tail of the low-traffic stream and do not have any data
received yet from that stream, while the other stream still have
unprocessed buffered data. Currently we will always go ahead and just
process the other stream's buffered data, but bare in mind that when we
eventually have received the data from the low-traffic stream we realized
that its timestamp is even smaller than what we already have processed, and
hence accidentally introduced out-of-ordering data.

What you described in stream-table joins is also a common case (e.g.
https://issues.apache.org/jira/browse/KAFKA-4113). Personally I think it is
more related to a general theme, of "messaging choosing", in which Kafka
Streams today only allows timestamp-synchronization-based message choosing.
Other mechanisms are requested as well, e.g. topic-priority based messaging
choosing, or type based (e.g. always prefer KTable over KStream). And this
is not only for Streams, but also for generally Consumer itself: e.g. Nick
has recently proposed this (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-349%3A+Priorities+for+Source+Topics).
I think this topic itself should be discussed as a separate KIP, maybe for
both Streams and Consumer clients, and hence I intentionally avoid
overlapping with it and stays with a static messaging choosing mechanism in
my KIP.



Guozhang


On Tue, Aug 7, 2018 at 4:55 AM, Thomas Becker <thomas.bec...@tivo.com>
wrote:

> This looks like a big step in the right direction IMO. So am I correct in
> assuming this idle period would only come into play after startup when
> waiting for initial records to be fetched? In other words, once we have
> seen records from all topics and have established the stream time
> processing will not go idle again right?
>
> I still feel that timestamp semantics are not wanted in all cases.
> Consider a simple stream-table join to augment incoming events where the
> table data has been updated recently (and hence has a later timestamp than
> some incoming events). Currently this will not be joined at all (assuming
> older table records have been compacted) until the timestamps on events
> start passing the table updates. For use cases like this I'd like to be
> able to say always prefer processing the table backing topic if it has data
> available, regardless of timestamp.
>
> On Fri, 2018-08-03 at 14:00 -0700, Guozhang Wang wrote:
>
> Hello all,
>
>
> I would like to kick off a discussion on the following KIP, to allow users
>
> control when a task can be processed based on its buffered records, and how
>
> the stream time of a task be advanced.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%
> 3A+Improve+Kafka+Streams+Timestamp+Synchronization
>
>
> This is related to one of the root causes of out-of-ordering data in Kafka
>
> Streams. Any thoughts / comments on this topic is more than welcomed.
>
>
>
> Thanks,
>
> -- Guozhang
>
>
> ________________________________
>
> This email and any attachments may contain confidential and privileged
> material for the sole use of the intended recipient. Any review, copying,
> or distribution of this email (or any attachments) by others is prohibited.
> If you are not the intended recipient, please contact the sender
> immediately and permanently delete this email and any attachments. No
> employee or agent of TiVo Inc. is authorized to conclude any binding
> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
> Inc. may only be made by a signed written agreement.
>



-- 
-- Guozhang

Reply via email to