@Guozhang, I think you can start the VOTE for this KIP? I don't have any
further comments.

One more nit: we should explicitly state, that the new config is
wall-clock time based.


-Matthias


On 8/7/18 12:59 PM, Matthias J. Sax wrote:
> Correct. It's not about reordering. Records will still be processed in
> offset-order per partition.
> 
> For multi-partition task (like joins), we use the timestamp of the
> "head" record of each partition to determine which record to process
> first (to process records across partitions in timestamp order if
> possible) -- however, if one partition does not have a record, we cannot
> make the decision which record to pick next. Thus, the task blocks and
> we don't want to block forever. If we unblock on missing data, we might
> get out-of-order processing with regard to timestamps between two
> partitions.
> 
> 
> -Matthias
> 
> On 8/7/18 12:03 PM, Thomas Becker wrote:
>> In typing up a scenario to illustrate my question, I think I found the 
>> answer ;) We are not assuming timestamps will be strictly increasing within 
>> a topic and trying to make processing order deterministic even in the face 
>> of that. Thanks for making me think about it (or please correct me if I'm 
>> wrong).
>>
>>
>> On Tue, 2018-08-07 at 10:48 -0700, Matthias J. Sax wrote:
>>
>> @Thomas, just to rephrase (from my understanding):
>>
>>
>> So in the scenario you describe, where one topic has
>>
>> vastly lower throughput, you're saying that when the lower throughput topic
>>
>> is fully caught up (no messages in the buffer), the task will idle rather
>>
>> than using the timestamp of the last message it saw from that topic?
>>
>>
>> The idea of the KIP is to configure a max blocking time for tasks. This
>>
>> allows to provide a 'grace period such that new data for the empty
>>
>> partition can we received (if an upstream producer still writes data,
>>
>> but with low throughput). After the blocking timeout expires (in case
>>
>> throughput is too small or upstream producer died), the task will
>>
>> process data even if there are empty input topic partitions (because we
>>
>> cannot block forever in a stream processing scenario).
>>
>>
>> Not sure, what you mean by "using the timestamp of the last message"?
>>
>> Using for what? The KIP is about processing records of partitions that
>>
>> do have data if some other partitions are empty (and to allow users to
>>
>> configure a max "blocking time" until processing is "forced").
>>
>>
>>
>> -Matthias
>>
>>
>>
>> On 8/7/18 10:02 AM, Guozhang Wang wrote:
>>
>> @Tommy
>>
>>
>> Yes that's the intent. Again note that the current behavior is indeed "just
>>
>> using the timestamp of the last message I saw", and continue processing
>>
>> what's in the buffer from other streams, but this may introduce
>>
>> out-of-ordering.
>>
>>
>>
>> Guozhang
>>
>>
>>
>> On Tue, Aug 7, 2018 at 9:59 AM, Thomas Becker 
>> <thomas.bec...@tivo.com<mailto:thomas.bec...@tivo.com>>
>>
>> wrote:
>>
>>
>> Thanks Guozhang. So in the scenario you describe, where one topic has
>>
>> vastly lower throughput, you're saying that when the lower throughput topic
>>
>> is fully caught up (no messages in the buffer), the task will idle rather
>>
>> than using the timestamp of the last message it saw from that topic?
>>
>> Initially I was under the impression that this would only happen when the
>>
>> task had not yet seen any messages from one of the partitions.
>>
>>
>> Regarding choosing, you are exactly right. This mechanism is pluggable in
>>
>> Samza, and I'd like to see something similar done in Kafka Streams. The
>>
>> timestamp based choosing policy is great and makes sense in a lot of
>>
>> scenarios, but having something like a priority based policy would be very
>>
>> nice for some of our usecases.
>>
>>
>> -Tommy
>>
>>
>> On Tue, 2018-08-07 at 09:30 -0700, Guozhang Wang wrote:
>>
>>
>> @Ted
>>
>>
>>
>> Yes, I will update the KIP mentioning this as a separate consideration.
>>
>>
>>
>>
>> @Thomas
>>
>>
>>
>> The idle period may be happening during the processing as well. Think: if
>>
>>
>> you are joining two streams with very different throughput traffic, say for
>>
>>
>> an extreme case, one stream comes in as 100K messages / sec, another comes
>>
>>
>> in as 1 message / sec. Then it could happen from time to time that we have
>>
>>
>> reached the tail of the low-traffic stream and do not have any data
>>
>>
>> received yet from that stream, while the other stream still have
>>
>>
>> unprocessed buffered data. Currently we will always go ahead and just
>>
>>
>> process the other stream's buffered data, but bare in mind that when we
>>
>>
>> eventually have received the data from the low-traffic stream we realized
>>
>>
>> that its timestamp is even smaller than what we already have processed, and
>>
>>
>> hence accidentally introduced out-of-ordering data.
>>
>>
>>
>> What you described in stream-table joins is also a common case (e.g.
>>
>>
>> https://issues.apache.org/jira/browse/KAFKA-4113). Personally I think it
>>
>> is
>>
>>
>> more related to a general theme, of "messaging choosing", in which Kafka
>>
>>
>> Streams today only allows timestamp-synchronization-based message
>>
>> choosing.
>>
>>
>> Other mechanisms are requested as well, e.g. topic-priority based messaging
>>
>>
>> choosing, or type based (e.g. always prefer KTable over KStream). And this
>>
>>
>> is not only for Streams, but also for generally Consumer itself: e.g. Nick
>>
>>
>> has recently proposed this (
>>
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>
>> 349%3A+Priorities+for+Source+Topics).
>>
>>
>> I think this topic itself should be discussed as a separate KIP, maybe for
>>
>>
>> both Streams and Consumer clients, and hence I intentionally avoid
>>
>>
>> overlapping with it and stays with a static messaging choosing mechanism in
>>
>>
>> my KIP.
>>
>>
>>
>>
>>
>> Guozhang
>>
>>
>>
>>
>> On Tue, Aug 7, 2018 at 4:55 AM, Thomas Becker 
>> <thomas.bec...@tivo.com<mailto:thomas.bec...@tivo.com><
>>
>> mailto:thomas.bec...@tivo.com<mailto:thomas.bec...@tivo.com>>>
>>
>>
>> wrote:
>>
>>
>>
>> This looks like a big step in the right direction IMO. So am I correct in
>>
>>
>> assuming this idle period would only come into play after startup when
>>
>>
>> waiting for initial records to be fetched? In other words, once we have
>>
>>
>> seen records from all topics and have established the stream time
>>
>>
>> processing will not go idle again right?
>>
>>
>>
>> I still feel that timestamp semantics are not wanted in all cases.
>>
>>
>> Consider a simple stream-table join to augment incoming events where the
>>
>>
>> table data has been updated recently (and hence has a later timestamp than
>>
>>
>> some incoming events). Currently this will not be joined at all (assuming
>>
>>
>> older table records have been compacted) until the timestamps on events
>>
>>
>> start passing the table updates. For use cases like this I'd like to be
>>
>>
>> able to say always prefer processing the table backing topic if it has data
>>
>>
>> available, regardless of timestamp.
>>
>>
>>
>> On Fri, 2018-08-03 at 14:00 -0700, Guozhang Wang wrote:
>>
>>
>>
>> Hello all,
>>
>>
>>
>>
>> I would like to kick off a discussion on the following KIP, to allow users
>>
>>
>>
>> control when a task can be processed based on its buffered records, and how
>>
>>
>>
>> the stream time of a task be advanced.
>>
>>
>>
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%
>>
>>
>> 3A+Improve+Kafka+Streams+Timestamp+Synchronization
>>
>>
>>
>>
>> This is related to one of the root causes of out-of-ordering data in Kafka
>>
>>
>>
>> Streams. Any thoughts / comments on this topic is more than welcomed.
>>
>>
>>
>>
>>
>> Thanks,
>>
>>
>>
>> -- Guozhang
>>
>>
>>
>>
>> ________________________________
>>
>>
>>
>> This email and any attachments may contain confidential and privileged
>>
>>
>> material for the sole use of the intended recipient. Any review, copying,
>>
>>
>> or distribution of this email (or any attachments) by others is prohibited.
>>
>>
>> If you are not the intended recipient, please contact the sender
>>
>>
>> immediately and permanently delete this email and any attachments. No
>>
>>
>> employee or agent of TiVo Inc. is authorized to conclude any binding
>>
>>
>> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
>>
>>
>> Inc. may only be made by a signed written agreement.
>>
>>
>>
>>
>>
>>
>>
>> ________________________________
>>
>>
>> This email and any attachments may contain confidential and privileged
>>
>> material for the sole use of the intended recipient. Any review, copying,
>>
>> or distribution of this email (or any attachments) by others is prohibited.
>>
>> If you are not the intended recipient, please contact the sender
>>
>> immediately and permanently delete this email and any attachments. No
>>
>> employee or agent of TiVo Inc. is authorized to conclude any binding
>>
>> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
>>
>> Inc. may only be made by a signed written agreement.
>>
>>
>>
>>
>>
>>
>>
>> ________________________________
>>
>> This email and any attachments may contain confidential and privileged 
>> material for the sole use of the intended recipient. Any review, copying, or 
>> distribution of this email (or any attachments) by others is prohibited. If 
>> you are not the intended recipient, please contact the sender immediately 
>> and permanently delete this email and any attachments. No employee or agent 
>> of TiVo Inc. is authorized to conclude any binding agreement on behalf of 
>> TiVo Inc. by email. Binding agreements with TiVo Inc. may only be made by a 
>> signed written agreement.
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to