@Thomas, just to rephrase (from my understanding):

> So in the scenario you describe, where one topic has
>>> vastly lower throughput, you're saying that when the lower throughput topic
>>> is fully caught up (no messages in the buffer), the task will idle rather
>>> than using the timestamp of the last message it saw from that topic?

The idea of the KIP is to configure a max blocking time for tasks. This
allows to provide a 'grace period such that new data for the empty
partition can we received (if an upstream producer still writes data,
but with low throughput). After the blocking timeout expires (in case
throughput is too small or upstream producer died), the task will
process data even if there are empty input topic partitions (because we
cannot block forever in a stream processing scenario).

Not sure, what you mean by "using the timestamp of the last message"?
Using for what? The KIP is about processing records of partitions that
do have data if some other partitions are empty (and to allow users to
configure a max "blocking time" until processing is "forced").


-Matthias


On 8/7/18 10:02 AM, Guozhang Wang wrote:
> @Tommy
> 
> Yes that's the intent. Again note that the current behavior is indeed "just
> using the timestamp of the last message I saw", and continue processing
> what's in the buffer from other streams, but this may introduce
> out-of-ordering.
> 
> 
> Guozhang
> 
> 
> On Tue, Aug 7, 2018 at 9:59 AM, Thomas Becker <thomas.bec...@tivo.com>
> wrote:
> 
>> Thanks Guozhang. So in the scenario you describe, where one topic has
>> vastly lower throughput, you're saying that when the lower throughput topic
>> is fully caught up (no messages in the buffer), the task will idle rather
>> than using the timestamp of the last message it saw from that topic?
>> Initially I was under the impression that this would only happen when the
>> task had not yet seen any messages from one of the partitions.
>>
>> Regarding choosing, you are exactly right. This mechanism is pluggable in
>> Samza, and I'd like to see something similar done in Kafka Streams. The
>> timestamp based choosing policy is great and makes sense in a lot of
>> scenarios, but having something like a priority based policy would be very
>> nice for some of our usecases.
>>
>> -Tommy
>>
>> On Tue, 2018-08-07 at 09:30 -0700, Guozhang Wang wrote:
>>
>> @Ted
>>
>>
>> Yes, I will update the KIP mentioning this as a separate consideration.
>>
>>
>>
>> @Thomas
>>
>>
>> The idle period may be happening during the processing as well. Think: if
>>
>> you are joining two streams with very different throughput traffic, say for
>>
>> an extreme case, one stream comes in as 100K messages / sec, another comes
>>
>> in as 1 message / sec. Then it could happen from time to time that we have
>>
>> reached the tail of the low-traffic stream and do not have any data
>>
>> received yet from that stream, while the other stream still have
>>
>> unprocessed buffered data. Currently we will always go ahead and just
>>
>> process the other stream's buffered data, but bare in mind that when we
>>
>> eventually have received the data from the low-traffic stream we realized
>>
>> that its timestamp is even smaller than what we already have processed, and
>>
>> hence accidentally introduced out-of-ordering data.
>>
>>
>> What you described in stream-table joins is also a common case (e.g.
>>
>> https://issues.apache.org/jira/browse/KAFKA-4113). Personally I think it
>> is
>>
>> more related to a general theme, of "messaging choosing", in which Kafka
>>
>> Streams today only allows timestamp-synchronization-based message
>> choosing.
>>
>> Other mechanisms are requested as well, e.g. topic-priority based messaging
>>
>> choosing, or type based (e.g. always prefer KTable over KStream). And this
>>
>> is not only for Streams, but also for generally Consumer itself: e.g. Nick
>>
>> has recently proposed this (
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 349%3A+Priorities+for+Source+Topics).
>>
>> I think this topic itself should be discussed as a separate KIP, maybe for
>>
>> both Streams and Consumer clients, and hence I intentionally avoid
>>
>> overlapping with it and stays with a static messaging choosing mechanism in
>>
>> my KIP.
>>
>>
>>
>>
>> Guozhang
>>
>>
>>
>> On Tue, Aug 7, 2018 at 4:55 AM, Thomas Becker <thomas.bec...@tivo.com<
>> mailto:thomas.bec...@tivo.com>>
>>
>> wrote:
>>
>>
>> This looks like a big step in the right direction IMO. So am I correct in
>>
>> assuming this idle period would only come into play after startup when
>>
>> waiting for initial records to be fetched? In other words, once we have
>>
>> seen records from all topics and have established the stream time
>>
>> processing will not go idle again right?
>>
>>
>> I still feel that timestamp semantics are not wanted in all cases.
>>
>> Consider a simple stream-table join to augment incoming events where the
>>
>> table data has been updated recently (and hence has a later timestamp than
>>
>> some incoming events). Currently this will not be joined at all (assuming
>>
>> older table records have been compacted) until the timestamps on events
>>
>> start passing the table updates. For use cases like this I'd like to be
>>
>> able to say always prefer processing the table backing topic if it has data
>>
>> available, regardless of timestamp.
>>
>>
>> On Fri, 2018-08-03 at 14:00 -0700, Guozhang Wang wrote:
>>
>>
>> Hello all,
>>
>>
>>
>> I would like to kick off a discussion on the following KIP, to allow users
>>
>>
>> control when a task can be processed based on its buffered records, and how
>>
>>
>> the stream time of a task be advanced.
>>
>>
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%
>>
>> 3A+Improve+Kafka+Streams+Timestamp+Synchronization
>>
>>
>>
>> This is related to one of the root causes of out-of-ordering data in Kafka
>>
>>
>> Streams. Any thoughts / comments on this topic is more than welcomed.
>>
>>
>>
>>
>> Thanks,
>>
>>
>> -- Guozhang
>>
>>
>>
>> ________________________________
>>
>>
>> This email and any attachments may contain confidential and privileged
>>
>> material for the sole use of the intended recipient. Any review, copying,
>>
>> or distribution of this email (or any attachments) by others is prohibited.
>>
>> If you are not the intended recipient, please contact the sender
>>
>> immediately and permanently delete this email and any attachments. No
>>
>> employee or agent of TiVo Inc. is authorized to conclude any binding
>>
>> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
>>
>> Inc. may only be made by a signed written agreement.
>>
>>
>>
>>
>>
>>
>> ________________________________
>>
>> This email and any attachments may contain confidential and privileged
>> material for the sole use of the intended recipient. Any review, copying,
>> or distribution of this email (or any attachments) by others is prohibited.
>> If you are not the intended recipient, please contact the sender
>> immediately and permanently delete this email and any attachments. No
>> employee or agent of TiVo Inc. is authorized to conclude any binding
>> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
>> Inc. may only be made by a signed written agreement.
>>
> 
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to