Hello, Just to make some updates. I changed the name of the metric so that it was more in line with usual Kafka naming conventions for metrics / sensors. Below is the updated description of the metric:
dropped-idempotent-updates : (Level 2 - Per Task) DEBUG (rate | total) Description: This metric will record the number of updates that have been dropped since they are essentially re-performing an earlier operation. Note: - The rate option indicates the ratio of records dropped to actual volume of records passing through the task. - The total option will just give a raw count of the number of records dropped. I hope that this is more on point. Best, Richard On Fri, Feb 21, 2020 at 2:20 PM Richard Yu <yohan.richard...@gmail.com> wrote: > Hi all, > > Thanks for the clarification. I was just confused a little on what was > going on. > > So I guess then that for the actual proposal. We got the following: > > 1. We check for binary equality, and perform no extra look ups. > 2. Emphasize that this applies only to materialized tables. > 3. We drop aggregation updates if key, value and timestamp is the same. > > Then that settles the behavior changes. So it looks like the Metric that > is the only thing that is left. In this case, I think the metric would be > named the following: IdempotentUpdateMetric. This is mostly off the top of > my head. So if you think that we change it, feel free to say so. > The metric will report the number of dropped operations inherently. > > It will probably be added as a Sensor, similar to the dropped records > sensor we already have. > > If there isn't anything else, I will probably start the voting process > next week! > > Cheers, > Richard > > > On Fri, Feb 21, 2020 at 11:23 AM John Roesler <vvcep...@apache.org> wrote: > >> Hi Bruno, >> >> Thanks for the clarification. Indeed, I was thinking two things: >> 1. For the initial implementation, we can just avoid adding any extra >> lookups, but only do the comparison when we already happen to have >> the prior value. >> 2. I think, as a result of the timestamp semantics, we actually _do_ look >> up the prior value approximately all the time, so the idempotence check >> should be quite effective. >> >> I think that second point is the same thing you're referring to >> potentially >> being unnecessary. It does mean that we do fetch the whole value in a >> lot of cases where we really only need the timestamp, so it could >> certainly >> be optimized in the future. In that future, we would need to weigh that >> optimization against losing the idempotence check. But, that's a problem >> for tomorrow :) >> >> I'm 100% on board with scrutinizing the performance as we implement >> this feature. >> >> Thanks again, >> -John >> >> On Thu, Feb 20, 2020, at 03:25, Bruno Cadonna wrote: >> > Hi John, >> > >> > I am glad to help you with your imagination. With overhead, I mainly >> > meant the additional lookup into the state store to get the current >> > value, but I see now in the code that we do that lookup anyways >> > (although I think we could avoid that in the cases where we do not >> > need the old value). With or without config, we need to evaluate the >> > performance benefits of this change, in any case. >> > >> > Best, >> > Bruno >> > >> > On Wed, Feb 19, 2020 at 7:48 PM John Roesler <vvcep...@apache.org> >> wrote: >> > > >> > > Thanks for your remarks, Bruno! >> > > >> > > I'm in favor of standardizing on terminology like "not forwarding >> > > idempotent updates" or "dropping idempotent updates". Maybe we >> > > should make a pass on the KIP and just convert everything to this >> > > phrasing. In retrospect, even the term "emit-on-change" has too much >> > > semantic baggage, since it implies the semantics from the SECRET >> > > paper, which we don't really want to imply here. >> > > >> > > I'm also in favor of the metric as you propose. >> > > >> > > Likewise with stream aggregations, I was also under the impression >> > > that we agreed on dropping idempotent updates to the aggregation >> > > result, any time we find that our "new" (key, value, timestamp) result >> > > is identical to the prior one. >> > > >> > > Also, I'm +1 on all your recommendations for updating the KIP document >> > > for clarity. >> > > >> > > Regarding the opt-out config. Perhaps I'm suffering from a failure of >> > > imagination, but I don't see how the current proposal could really >> have >> > > a measurable impact on latency. If all we do is make a single extra >> pass >> > > to compare two byte arrays for equality, only in the cases where we >> already >> > > have the byte arrays available, it seems unlikely to measurably >> affect the >> > > processing of non-idempotent updates. It seems guaranteed to >> _decrease_ >> > > the latency of processing idempotent updates, since we get to skip a >> > > store#put, at least one producer#send, and also all downstream >> processing, >> > > including all the disk and network operations associated with >> downstream >> > > operations. >> > > >> > > It seems like if we're pretty sure this change would only _help_, we >> shouldn't >> > > introduce the operational burden of an extra configuration. If we >> want to >> > > be more aggressive about dropping idempotent operations in the future, >> > > such as depending on equals() or adding a ChangeDetector interface, >> then >> > > we should consider adding a configuration as part of that future >> work. In >> > > fact, if we add a simple "opt-in/opt-out" switch right now, we might >> find >> > > that it's actually insufficient for whatever future feature we might >> propose, >> > > then we have a mess of deprecating the opt-out config and replacing >> it. >> > > >> > > What do you think? >> > > -John >> > > >> > > On Wed, Feb 19, 2020, at 09:50, Bruno Cadonna wrote: >> > > > Hi all, >> > > > >> > > > Sorry for the late reply! >> > > > >> > > > I am also in favour of baby steps. >> > > > >> > > > I am undecided whether the KIP should contain a opt-out config or >> not. >> > > > The overhead of emit-on-change might affect latency. For >> applications >> > > > where low latency is crucial and there are not too many idempotent >> > > > updates, it would be better to fall back to emit-on-update. However, >> > > > we do not know how much emit-on-change impacts latency. We would >> first >> > > > need to benchmark that before we can decide about the >> opt-out-config. >> > > > >> > > > A metric of dropped idempotent updates seems useful to me to be >> > > > informed about potential upstream applications or upstream operators >> > > > that produce too many idempotent updates. The KIP should state the >> > > > name of the metric, its group, its tags, and its recording level >> (see >> > > > KIP-444 or KIP-471 for examples). I propose DEBUG as reporting >> level. >> > > > >> > > > Richard, what competing proposals for emit-on-change for >> aggregations >> > > > do you mean? I have the feeling that we agreed to get rid of >> > > > idempotent updates if the aggregate is updated with the same key, >> > > > value, AND timestamp. I am also fine if we do not include this into >> > > > this KIP (remember: baby steps). >> > > > >> > > > You write that "emit-on-change is more correct". Since we agreed >> that >> > > > this is an optimization, IMO you cannot argue this way. >> > > > >> > > > Please put "Alternative Approaches" under "Rejected Alternatives", >> so >> > > > that it becomes clear that we are not going to implement them. In >> > > > general, I think the KIP needs a bit of clean-up (probably, you >> > > > already planned for it). "Design Reasoning" is a bit of behavior >> > > > changes, rejected alternatives and duplicates a bit the content in >> > > > those sections. >> > > > >> > > > I do not like the name "no-op operations" or "no-ops", because they >> > > > are rather generic. I like more "idempotent updates". >> > > > >> > > > Best, >> > > > Bruno >> > > > >> > > > >> > > > On Tue, Feb 18, 2020 at 7:25 PM Richard Yu < >> yohan.richard...@gmail.com> wrote: >> > > > > >> > > > > Hi all, >> > > > > >> > > > > We are definitely making progress! >> > > > > >> > > > > @John should I emphasize in the proposed behavior changes that we >> are only >> > > > > doing binary equality checks for stateful operators? >> > > > > It looks like we have come close to finalizing this part of the >> KIP. (I >> > > > > will note in the KIP that this proposal is intended for >> optimization, not >> > > > > semantics correctness) >> > > > > >> > > > > I do think maybe we still have one other detail we need to >> discuss. So far, >> > > > > there has been quite a bit of back and forth about what the >> behavior of >> > > > > aggregations should look like in emit on change. I have seen >> > > > > multiple competing proposals, so I am not completely certain >> which one we >> > > > > should go with, or how we will be able to compromise in between >> them. >> > > > > >> > > > > Let me know what your thoughts are on this matter, since we are >> probably >> > > > > close to wrapping up most other stuff. >> > > > > @Matthias J. Sax <matth...@confluent.io> and @Bruno, see what >> you think >> > > > > about this. >> > > > > >> > > > > Best, >> > > > > Richard >> > > > > >> > > > > >> > > > > >> > > > > On Tue, Feb 18, 2020 at 9:06 AM John Roesler <vvcep...@apache.org> >> wrote: >> > > > > >> > > > > > Thanks, Matthias! >> > > > > > >> > > > > > Regarding numbers, it would be hard to know how many >> applications >> > > > > > would benefit, since we don't know how many applications there >> are, >> > > > > > or anything about their data sets or topologies. We could do a >> survey, >> > > > > > but it seems overkill if we take the conservative approach. >> > > > > > >> > > > > > I have my own practical stream processing experience that tells >> me this >> > > > > > is absolutely critical for any moderate-to-large relational >> stream >> > > > > > processing use cases. I'll leave it to you to decide if you >> find that >> > > > > > convincing, but it's definitely not an _assumption_. I've also >> heard from >> > > > > > a few Streams users who have already had to implement their own >> > > > > > noop-suppression transformers in order to get to production >> scale. >> > > > > > >> > > > > > Regardless, it sounds like we can agree on taking an >> opportunistic approach >> > > > > > and targeting the optimization just to use a binary-equality >> check at >> > > > > > stateful operators. (I'd also suggest in sink nodes, when we >> are about to >> > > > > > send old and new values, since they are also already present >> and serialized >> > > > > > at that point.) We could make the KIP even more vague, and just >> say that >> > > > > > we'll drop no-op updates "when possible". >> > > > > > >> > > > > > I'm curious what Bruno and the others think about this. If it >> seems like >> > > > > > a good starting point, perhaps we could move to a vote soon and >> get to >> > > > > > work on the implementation! >> > > > > > >> > > > > > Thanks, >> > > > > > -John >> > > > > > >> > > > > > On Mon, Feb 17, 2020, at 20:54, Matthias J. Sax wrote: >> > > > > > > Talking about optimizations and reducing downstream load: >> > > > > > > >> > > > > > > Do we actually have any numbers? I have the impression that >> this KIP is >> > > > > > > more or less build on the _assumption_ that there is a >> problem. Yes, >> > > > > > > there are some use cases that would benefit from this; But >> how many >> > > > > > > applications would actually benefit? And how much load >> reduction would >> > > > > > > they get? >> > > > > > > >> > > > > > > The simplest approach (following John idea to make baby >> steps) would be >> > > > > > > to apply the emit-on-change pattern only if there is a store. >> For this >> > > > > > > case we need to serialize old and new result anyway and thus >> a simple >> > > > > > > byte-array comparison is no overhead. >> > > > > > > >> > > > > > > Sending `oldValues` by default would become expensive because >> we would >> > > > > > > need to serialize the recomputed old result, as well as the >> new result, >> > > > > > > to make the comparison (and we now the serialization is not >> cheap). We >> > > > > > > are facing a trade-off between CPU overhead and downstream >> load and I am >> > > > > > > not sure if we should hard code this. My original argument >> for sending >> > > > > > > `oldValues` was about semantics; but for an optimization, I >> am not sure >> > > > > > > if this would be the right choice. >> > > > > > > >> > > > > > > For now, users who want to opt-in can force a >> materialization. A >> > > > > > > materialization may be expensive and if we see future demand, >> we could >> > > > > > > still add an option to send `oldValues` instead of >> materialization (this >> > > > > > > would at least save the store overhead). As we consider the >> KIP an >> > > > > > > optimization, a "config" seems to make sense. >> > > > > > > >> > > > > > > >> > > > > > > -Matthias >> > > > > > > >> > > > > > > >> > > > > > > On 2/17/20 5:21 PM, Richard Yu wrote: >> > > > > > > > Hi John! >> > > > > > > > >> > > > > > > > Thanks for the reply. >> > > > > > > > >> > > > > > > > About the changes we have discussed so far. I think upon >> further >> > > > > > > > consideration, we have been mostly talking about this from >> the >> > > > > > perspective >> > > > > > > > that no stop-gap effort is acceptable. However, in recent >> discussion, >> > > > > > > if we >> > > > > > > > consider optimization, then it appears that the perspective >> I >> > > > > > mentioned no >> > > > > > > > longer applies. After all, we are no longer concerned so >> much about >> > > > > > > > semantics correctness, then reducing traffic as much as >> possible >> > > > > > without >> > > > > > > > performance tradeoffs. >> > > > > > > > >> > > > > > > > In this case, I think a cache would be a good idea for >> stateless >> > > > > > > > operations. This cache will not be backed by a store >> obviously. We can >> > > > > > > > probably use Kafka's ThreadCache. We should be able to >> catch a large >> > > > > > > > portion of the no-ops if we at least store some results in >> the cache. >> > > > > > Not >> > > > > > > > all will be caught, but I think the impact will be >> significant. >> > > > > > > > >> > > > > > > > On another note, I think that we should implement competing >> proposals >> > > > > > i.e. >> > > > > > > > one where we forward both old and new values with a >> reasonable >> > > > > > proportion >> > > > > > > > of artificial no-ops (we do not necessarily have to rely on >> equals so >> > > > > > much >> > > > > > > > as comparing the serialized binary data after the >> operation), and in >> > > > > > > > another scenario, the cache for stateless ops. It would be >> > > > > > unreasonable if >> > > > > > > > we completely disregard either approach, since they both >> have merit. >> > > > > > The >> > > > > > > > reason for implementing both is to perform benchmark tests >> on them, and >> > > > > > > > compare them with the original. This way, we can more >> clearly see what >> > > > > > is >> > > > > > > > the drawbacks and the gains. So far, we have been >> discussing only >> > > > > > > > hypotheticals, and if we continue to do so, I think it is >> likely no >> > > > > > ground >> > > > > > > > will be gained. >> > > > > > > > >> > > > > > > > After all, what we seek is optimization, and performance >> benchmarks >> > > > > > > will be >> > > > > > > > mandatory for a KIP of this nature. >> > > > > > > > >> > > > > > > > Hope this helps, >> > > > > > > > Richard >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > On Mon, Feb 17, 2020 at 2:12 PM John Roesler < >> vvcep...@apache.org> >> > > > > > wrote: >> > > > > > > > >> > > > > > > >> Hi again, all, >> > > > > > > >> >> > > > > > > >> Sorry on my part for my silence. >> > > > > > > >> >> > > > > > > >> I've just taken another look over the recent history of >> this >> > > > > > > discussion. It >> > > > > > > >> seems like the #1 point to clarify (because it affect >> everything >> > > > > > else) is >> > > > > > > >> that, >> > > > > > > >> yes, I was 100% envisioning this as an _optimization_. >> > > > > > > >> >> > > > > > > >> As a consequence, I don't think it's critical to make any >> hard >> > > > > > guarantees >> > > > > > > >> about >> > > > > > > >> what results get forwarded and what (no-op updates) get >> dropped. I'd >> > > > > > > >> initially >> > > > > > > >> just been thinking about doing this opportunistically in >> cases where >> > > > > > we >> > > > > > > >> already >> > > > > > > >> had the "old" and "new" result in memory, thanks to a >> request to >> > > > > > > "emit old >> > > > > > > >> values", or to the implementation of timestamp semantics. >> > > > > > > >> >> > > > > > > >> However, whether or not it's semantically critical, I do >> think that >> > > > > > > >> Matthias's >> > > > > > > >> idea to use the change-forwarding mechanism to check for >> no-ops even >> > > > > > on >> > > > > > > >> stateless operations is pretty interesting. Specifically, >> this would >> > > > > > > >> _really_ >> > > > > > > >> let you pare down useless updates by using mapValues to >> strip down >> > > > > > > records >> > > > > > > >> only >> > > > > > > >> to what you really need. However, the dependence on the >> > > > > > implementation of >> > > > > > > >> equals() is troubling. >> > > > > > > >> >> > > > > > > >> It might make sense to table this idea, as well as my >> complicated >> > > > > > no-op >> > > > > > > >> detection algorithm, and initially propose just a >> nonconfigurable >> > > > > > feature >> > > > > > > >> to >> > > > > > > >> check "old" and "new" results for binary equality before >> forwarding. >> > > > > > > I.e., >> > > > > > > >> if >> > > > > > > >> any operation determines that the old and new results are >> > > > > > > >> binary-identical, we >> > > > > > > >> would not forward. >> > > > > > > >> >> > > > > > > >> I'll admit that this doesn't serve Tommy's use case very >> well, but it >> > > > > > > >> might be >> > > > > > > >> better to take baby steps with an optimization like this >> and not risk >> > > > > > > >> over-reaching in a way that actually harms performance or >> > > > > > correctness. We >> > > > > > > >> could >> > > > > > > >> always expand the feature to use equals() or some kind of >> > > > > > ChangeDetector >> > > > > > > >> later >> > > > > > > >> on, in a more focused discussion. >> > > > > > > >> >> > > > > > > >> Regarding metrics or debug logs, I guess I don't feel >> strongly, but it >> > > > > > > >> feels >> > > > > > > >> like two things will happen that make it nicer to add them: >> > > > > > > >> >> > > > > > > >> 1. This feature is going to surprise/annoy _somebody_, and >> it would be >> > > > > > > >> nice to >> > > > > > > >> be able to definitively say the reason that updates are >> dropped is >> > > > > > that >> > > > > > > >> they >> > > > > > > >> were no-ops. The easiest smoking gun is if there are >> debug-logs that >> > > > > > > can be >> > > > > > > >> enabled. This person might just be looking at the >> dashboards, >> > > > > > > wondering why >> > > > > > > >> there are 100K updates per second going into their app, >> but only 1K >> > > > > > > >> results per >> > > > > > > >> second coming out. Having the metric there makes the >> accounting >> > > > > > easier. >> > > > > > > >> >> > > > > > > >> 2. Somebody is going to struggle with high-volume updates, >> and it >> > > > > > > would be >> > > > > > > >> nice >> > > > > > > >> for them to know that this feature is saving them >> X-thousand updates >> > > > > > per >> > > > > > > >> second, >> > > > > > > >> etc. >> > > > > > > >> >> > > > > > > >> What does everyone think about this? Note, as I read it, >> what I've >> > > > > > said >> > > > > > > >> above is >> > > > > > > >> already reflected in the text of the KIP. >> > > > > > > >> >> > > > > > > >> Thanks, >> > > > > > > >> -John >> > > > > > > >> >> > > > > > > >> >> > > > > > > >> On Tue, Feb 11, 2020, at 18:27, Richard Yu wrote: >> > > > > > > >>> Hi all, >> > > > > > > >>> >> > > > > > > >>> Bumping this. If you feel that this KIP is not too >> urgent. Then let >> > > > > > me >> > > > > > > >>> know. :) >> > > > > > > >>> >> > > > > > > >>> Cheers, >> > > > > > > >>> Richard >> > > > > > > >>> >> > > > > > > >>> On Thu, Feb 6, 2020 at 4:55 PM Richard Yu < >> > > > > > yohan.richard...@gmail.com> >> > > > > > > >>> wrote: >> > > > > > > >>> >> > > > > > > >>>> Hi all, >> > > > > > > >>>> >> > > > > > > >>>> I've had just a few thoughts regarding the forwarding of >> <key, >> > > > > > > >>>> change<old_value, new_value>>. As Matthias already >> mentioned, there >> > > > > > > >> are two >> > > > > > > >>>> separate priorities by which we can judge this KIP: >> > > > > > > >>>> >> > > > > > > >>>> 1. A optimization perspective: In this case, the user >> would prefer >> > > > > > the >> > > > > > > >>>> impact of this KIP to be as minimal as possible. By such >> logic, if >> > > > > > > >>>> stateless operations are performed twice, that could >> prove >> > > > > > > >> unacceptable for >> > > > > > > >>>> them. (since operations can prove expensive) >> > > > > > > >>>> >> > > > > > > >>>> 2. Semantics correctness perspective: Unlike the >> optimization >> > > > > > > >> approach, we >> > > > > > > >>>> are more concerned with all KTable operations obeying >> the same >> > > > > > emission >> > > > > > > >>>> policy. i.e. emit on change. In this case, a discrepancy >> would not >> > > > > > be >> > > > > > > >>>> tolerated, even though an extra performance cost will be >> incurred. >> > > > > > > >>>> Therefore, we will follow Matthias's approach, and then >> perform the >> > > > > > > >>>> operation once on the old value, and once on the new. >> > > > > > > >>>> >> > > > > > > >>>> The issue here I think is more black and white than in >> between. The >> > > > > > > >> second >> > > > > > > >>>> option in particular would be favorable for users with >> inexpensive >> > > > > > > >>>> stateless operations, while for the former option, we >> are probably >> > > > > > > >> dealing >> > > > > > > >>>> with more expensive ones. So the simplest solution is >> probably to >> > > > > > > >> allow the >> > > > > > > >>>> user to choose one of the behaviors, and have a config >> which can >> > > > > > > >> switch in >> > > > > > > >>>> between them. >> > > > > > > >>>> >> > > > > > > >>>> Its the simplest compromise I can come up with at the >> moment, but if >> > > > > > > >> you >> > > > > > > >>>> think you have a better plan which could better balance >> tradeoffs. >> > > > > > Then >> > > > > > > >>>> please let us know. :) >> > > > > > > >>>> >> > > > > > > >>>> Best, >> > > > > > > >>>> Richard >> > > > > > > >>>> >> > > > > > > >>>> On Wed, Feb 5, 2020 at 5:12 PM John Roesler < >> vvcep...@apache.org> >> > > > > > > >> wrote: >> > > > > > > >>>> >> > > > > > > >>>>> Hi all, >> > > > > > > >>>>> >> > > > > > > >>>>> Thanks for the thoughtful comments! >> > > > > > > >>>>> >> > > > > > > >>>>> I need more time to reflect on your thoughts, but just >> wanted to >> > > > > > offer >> > > > > > > >>>>> a quick clarification about equals(). >> > > > > > > >>>>> >> > > > > > > >>>>> I only meant that we can't be sure if a class's equals() >> > > > > > > >> implementation >> > > > > > > >>>>> returns true for two semantically identical instances. >> I.e., if a >> > > > > > > >> class >> > > > > > > >>>>> doesn't >> > > > > > > >>>>> override the default equals() implementation, then we >> would see >> > > > > > > >> behavior >> > > > > > > >>>>> like: >> > > > > > > >>>>> >> > > > > > > >>>>> new MyPair("A", 1).equals(new MyPair("A", 1)) returns >> false >> > > > > > > >>>>> >> > > > > > > >>>>> In that case, I would still like to catch no-op updates >> by >> > > > > > comparing >> > > > > > > >> the >> > > > > > > >>>>> serialized form of the records when we happen to have >> it serialized >> > > > > > > >> anyway >> > > > > > > >>>>> (such as when the operation is stateful, or when we're >> sending to a >> > > > > > > >>>>> repartition topic and we have both the "new" and "old" >> value from >> > > > > > > >>>>> upstream). >> > > > > > > >>>>> >> > > > > > > >>>>> I didn't mean to suggest we'd try to use reflection to >> detect >> > > > > > whether >> > > > > > > >>>>> equals >> > > > > > > >>>>> is implemented, although that is a neat trick. I was >> thinking more >> > > > > > of >> > > > > > > >> a >> > > > > > > >>>>> belt-and-suspenders algorithm where we do the check for >> no-ops >> > > > > > based >> > > > > > > >> on >> > > > > > > >>>>> equals() and then _also_ check the serialized bytes for >> equality. >> > > > > > > >>>>> >> > > > > > > >>>>> Thanks, >> > > > > > > >>>>> -John >> > > > > > > >>>>> >> > > > > > > >>>>> On Wed, Feb 5, 2020, at 15:31, Ted Yu wrote: >> > > > > > > >>>>>> Thanks for the comments, Matthias. >> > > > > > > >>>>>> >> > > > > > > >>>>>> w.r.t. requirement of an `equals()` implementation, >> each template >> > > > > > > >> type >> > > > > > > >>>>>> would have an equals() method. We can use the >> following code to >> > > > > > know >> > > > > > > >>>>>> whether it is provided by JVM or provided by user. >> > > > > > > >>>>>> >> > > > > > > >>>>>> boolean customEquals = false; >> > > > > > > >>>>>> try { >> > > > > > > >>>>>> Class cls = value.getClass().getMethod("equals", >> > > > > > > >>>>>> Object.class).getDeclaringClass(); >> > > > > > > >>>>>> if (!Object.class.equals(cls)) { >> > > > > > > >>>>>> customEquals = true; >> > > > > > > >>>>>> } >> > > > > > > >>>>>> } catch (NoSuchMethodException nsme) { >> > > > > > > >>>>>> // equals is always defined, this wouldn't hit >> > > > > > > >>>>>> } >> > > > > > > >>>>>> >> > > > > > > >>>>>> The next question is: what if the user doesn't provide >> equals() >> > > > > > > >> method ? >> > > > > > > >>>>>> Would we automatically fall back to emit-on-update ? >> > > > > > > >>>>>> >> > > > > > > >>>>>> Cheers >> > > > > > > >>>>>> >> > > > > > > >>>>>> On Tue, Feb 4, 2020 at 1:37 PM Matthias J. Sax < >> mj...@apache.org> >> > > > > > > >>>>> wrote: >> > > > > > > >>>>>> >> > > > > > > > First a high level comment: >> > > > > > > > >> > > > > > > > Overall, I would like to make one step back, and make sure >> we are >> > > > > > > > discussion on the same level. Originally, I understood this >> KIP >> > > > > > > >>> as a >> > > > > > > > proposed change of _semantics_, however, given the latest >> > > > > > > >>> discussion >> > > > > > > > it seems it's actually not -- it's more an _optimization_ >> > > > > > > >>> proposal. >> > > > > > > > Hence, we only need to make sure that this optimization >> does not >> > > > > > > >>> break >> > > > > > > > existing semantics. It this the right way to think about it? >> > > > > > > > >> > > > > > > > If yes, than it might actually be ok to have different >> behavior >> > > > > > > > depending if there is a materialized KTable or not. So far, >> we >> > > > > > > >>> never >> > > > > > > > defined a public contract about our emit strategy and it >> seems >> > > > > > > >>> this >> > > > > > > > KIP does not define one either. >> > > > > > > > >> > > > > > > > Hence, I don't have as strong of an opinion about sending >> > > > > > > >>> oldValues >> > > > > > > > for example any longer. I guess the question is really, >> what can >> > > > > > > >>> we >> > > > > > > > implement in a reasonable way. >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > Other comments: >> > > > > > > > >> > > > > > > > >> > > > > > > > @Richard: >> > > > > > > > >> > > > > > > > Can you please add the KIP to the KIP overview table: It's >> missing >> > > > > > > > ( >> > > > > > > >>>>>> >> > > > > > > >>> >> > > > > > >> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Pro >> > > > > > > > posals). >> > > > > > > > >> > > > > > > > >> > > > > > > > @Bruno: >> > > > > > > > >> > > > > > > > You mentioned caching. I think it's irrelevant (orthogonal) >> and >> > > > > > > >>> we can >> > > > > > > > discuss this KIP without considering it. >> > > > > > > > >> > > > > > > > >> > > > > > > > @John: >> > > > > > > > >> > > > > > > >>>>>>>>> Even in the source table, we forward the updated >> record with >> > > > > > the >> > > > > > > >>>>>>>>> higher of the two timestamps. So the example is >> more like: >> > > > > > > > >> > > > > > > > That is not correct. Currently, we forward with the smaller >> > > > > > > > out-of-order timestamp (changing the timestamp would >> corrupt the >> > > > > > > >>> data >> > > > > > > > -- we don't know, because we don't check, if the value is >> the >> > > > > > > >>> same >> > > > > > > >>>>>> or >> > > > > > > > a different one, hence, we must emit the out-of-order record >> > > > > > > >>> as-is). >> > > > > > > > >> > > > > > > > If we start to do emit-on-change, we also need to emit a new >> > > > > > > >>> record if >> > > > > > > > the timestamp changes due to out-of-order data, hence, we >> would >> > > > > > > >>> still >> > > > > > > > need to emit <K,V,T1> because that give us correct >> semantics: >> > > > > > > >>> assume >> > > > > > > > you have a filter() and afterward use the filter KTable in a >> > > > > > > > stream-table join -- the lower T1 timestamp must be >> propagated to >> > > > > > > >>> the >> > > > > > > > filtered KTable to ensure that that the stream-table join >> compute >> > > > > > > >>> the >> > > > > > > > correct result. >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > Your point about requiring an `equals()` implementation is >> > > > > > > >>> actually a >> > > > > > > > quite interesting one and boils down to my statement from >> above >> > > > > > > >>> about >> > > > > > > > "what can we actually implement". What I don't understand >> is: >> > > > > > > > >> > > > > > > >>>>>>>>> This way, we still don't have to rely on the >> existence of an >> > > > > > > >>>>>>>>> equals() method, but if it is there, we can benefit >> from it. >> > > > > > > > >> > > > > > > > Your bullet point (2) says it uses `equals()` -- hence, it >> seems >> > > > > > > >>> we >> > > > > > > > actually to rely on it? Also, how can we detect if there is >> an >> > > > > > > > `equals()` method to do the comparison? Would be fail if we >> don't >> > > > > > > >>> have >> > > > > > > > `equals()` nor corresponding serializes to do the >> comparison? >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > >>>>>>>>> Wow, really good catch! Yes, we absolutely need >> metrics and >> > > > > > > >>> logs if >> > > > > > > >>>>>>>>> we're going to drop any records. And, yes, we >> should propose >> > > > > > > >>>>>>>>> metrics and logs that are similar to the existing >> ones when we >> > > > > > > >>> drop >> > > > > > > >>>>>>>>> records for other reasons. >> > > > > > > > >> > > > > > > > I am not sure about this point. In fact, we have already >> some >> > > > > > > >>> no-ops >> > > > > > > > in Kafka Streams in our join-operators and don't report any >> of >> > > > > > > >>> those >> > > > > > > > either. Emit-on-change is operator semantics and I don't >> see why >> > > > > > > >>> we >> > > > > > > > would need to have a metric for it? It seems to be quite >> different >> > > > > > > > compared to dropping late or malformed records. >> > > > > > > > >> > > > > > > > >> > > > > > > > -Matthias >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > On 2/4/20 7:13 AM, Thomas Becker wrote: >> > > > > > > >>>>>>>>> Thanks John for your thoughtful reply. Some >> comments inline. >> > > > > > > >>>>>>>>> >> > > > > > > >>>>>>>>> >> > > > > > > >>>>>>>>> On Mon, 2020-02-03 at 11:51 -0600, John Roesler >> wrote: >> > > > > > > >>>>>>>>>> [EXTERNAL EMAIL] Attention: This email was sent >> from outside >> > > > > > > >>>>>>>>>> TiVo. DO NOT CLICK any links or attachments unless >> you >> > > > > > expected >> > > > > > > >>>>>>>>>> them. ________________________________ >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> Hi Tommy, >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> Thanks for the context. I can see the attraction of >> > > > > > considering >> > > > > > > >>>>>>>>>> these use cases together. >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> To answer your question, if a part of the record >> is not >> > > > > > > >>> relevant >> > > > > > > >>>>>>>>>> to downstream consumers, I was thinking you could >> just use a >> > > > > > > >>>>>>>>>> mapValue to remove it. >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> E.g., suppose you wanted to do a join between two >> tables. >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> employeeInfo.join( employeePayroll, (info, >> payroll) -> new >> > > > > > > >>>>>>>>>> Result(info.name(), payroll.salary()) ) >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> We only care about one attribute from the Info >> table (name), >> > > > > > > >>> and >> > > > > > > >>>>>>>>>> one from the Payroll table (salary), and these >> attributes >> > > > > > > >>> change >> > > > > > > >>>>>>>>>> rarely. On the other hand, there might be many >> other >> > > > > > attributes >> > > > > > > >>>>>>>>>> that change frequently of these tables. We can >> avoid >> > > > > > triggering >> > > > > > > >>>>>>>>>> the join unnecessarily by mapping the input tables >> to drop the >> > > > > > > >>>>>>>>>> unnecessary information before the join: >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> names = employeeInfo.mapValues(info -> info.name()) >> salaries >> > > > > > = >> > > > > > > >>>>>>>>>> employeePayroll.mapValues(payroll -> >> payroll.salary()) >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> names.join( salaries, (name, salary) -> new >> Result(name, >> > > > > > > >>> salary) >> > > > > > > >>>>>>>>>> ) >> > > > > > > >>>>>>>>> >> > > > > > > >>>>>>>>> Ahh yes I see. This works, but in the case where >> you're using >> > > > > > > >>>>>>>>> schemas as we are (e.g. Avro), it seems like this >> approach >> > > > > > could >> > > > > > > >>>>>>>>> lead to a proliferation of "skinny" record types >> that just drop >> > > > > > > >>>>>>>>> various fields. >> > > > > > > >>>>>>>>> >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> Especially if we take Matthias's idea to drop >> non-changes even >> > > > > > > >>>>>>>>>> for stateless operations, this would be quite >> efficient and is >> > > > > > > >>>>>>>>>> also a very straightforward optimization to >> understand once >> > > > > > you >> > > > > > > >>>>>>>>>> know that Streams provides emit-on-change. >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> From the context that you provided, it seems like >> a slightly >> > > > > > > >>>>>>>>>> different situation, though. Reading between the >> lines a >> > > > > > > >>> little, >> > > > > > > >>>>>>>>>> it sounds like: in contrast to the example above, >> in which we >> > > > > > > >>> are >> > > > > > > >>>>>>>>>> filtering out extra _data_, you have some extra >> _metadata_ >> > > > > > that >> > > > > > > >>>>>>>>>> you still wish to pass down with the data when >> there is a >> > > > > > > >>> "real" >> > > > > > > >>>>>>>>>> update, but you don't want the metadata itself to >> cause an >> > > > > > > >>>>>>>>>> update. >> > > > > > > >>>>>>>>> >> > > > > > > >>>>>>>>> Despite my lack of clarity, yes you've got it right >> ;) This >> > > > > > > >>>>>>>>> particular processor is the first stop for this >> data after >> > > > > > > >>> coming >> > > > > > > >>>>>>>>> in from external users, who often simply post the >> same content >> > > > > > > >>> each >> > > > > > > >>>>>>>>> time and we're trying to shield downstream >> consumers from >> > > > > > > >>>>>>>>> unnecessary churn. >> > > > > > > >>>>>>>>> >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> It does seem handy to be able to plug in a custom >> > > > > > > >>> ChangeDetector >> > > > > > > >>>>>>>>>> for this purpose, but I worry about the API >> complexity. Maybe >> > > > > > > >>> you >> > > > > > > >>>>>>>>>> can help think though how to provide the same >> benefit while >> > > > > > > >>>>>>>>>> limiting user-facing complexity. >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> Here's some extra context to consider: >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> We currently don't make any extra requirements >> about the >> > > > > > nature >> > > > > > > >>>>>>>>>> of data that you can use in Streams. For example, >> you don't >> > > > > > > >>> have >> > > > > > > >>>>>>>>>> to implement hashCode and equals, or compareTo, >> etc. With the >> > > > > > > >>>>>>>>>> current proposal, we can do an airtight comparison >> based only >> > > > > > > >>> on >> > > > > > > >>>>>>>>>> the serialized form of the values, and we actually >> don't have >> > > > > > > >>> to >> > > > > > > >>>>>>>>>> deserialize the "prior" value at all for a large >> number of >> > > > > > > >>>>>>>>>> operations. Admitedly, if we extend the proposal >> to include >> > > > > > > >>> no-op >> > > > > > > >>>>>>>>>> detection for stateless operations, we'd probably >> need to rely >> > > > > > > >>> on >> > > > > > > >>>>>>>>>> equals() for no-op checking, otherwise we'd wind >> up requiring >> > > > > > > >>>>>>>>>> serdes for stateless operations as well. Actually, >> I'd >> > > > > > probably >> > > > > > > >>>>>>>>>> argue for doing exactly that: >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> 1. In stateful operations, drop if the serialized >> byte[]s are >> > > > > > > >>> the >> > > > > > > >>>>>>>>>> same. After deserializing, also drop if the >> objects are equal >> > > > > > > >>>>>>>>>> according to Object#equals(). >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> 2. In stateless operations, compare the "new" and >> "old" values >> > > > > > > >>>>>>>>>> (if "old" is available) based on Object#equals(). >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> 3. As a final optimization, after serializing and >> before >> > > > > > > >>> sending >> > > > > > > >>>>>>>>>> repartition records, compare the serialized data >> and drop >> > > > > > > >>>>>>>>>> no-ops. >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> This way, we still don't have to rely on the >> existence of an >> > > > > > > >>>>>>>>>> equals() method, but if it is there, we can >> benefit from it. >> > > > > > > >>>>>>>>>> Also, we don't require a serde in any new >> situations, but we >> > > > > > > >>> can >> > > > > > > >>>>>>>>>> still leverage it when it is available. >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> For clarity, in my example above, even if the >> employeeInfo and >> > > > > > > >>>>>>>>>> employeePayroll and Result records all have >> serdes, we need >> > > > > > the >> > > > > > > >>>>>>>>>> "name" field (presumably String) and the "salary" >> field >> > > > > > > >>>>>>>>>> (presumable a Double) to have serdes as well in >> the naive >> > > > > > > >>>>>>>>>> implementation. But if we can leverage equals(), >> then the >> > > > > > > >>> "right >> > > > > > > >>>>>>>>>> thing" happens automatically. >> > > > > > > >>>>>>>>> >> > > > > > > >>>>>>>>> I still don't totally follow why the individual >> components >> > > > > > > >>> (name, >> > > > > > > >>>>>>>>> salary) would have to have serdes here. If Result >> has one, we >> > > > > > > >>>>>>>>> compare bytes, and if Result additionally has an >> equals() >> > > > > > method >> > > > > > > >>>>>>>>> (which presumably includes equals comparisons on the >> > > > > > constituent >> > > > > > > >>>>>>>>> fields), have we not covered our bases? >> > > > > > > >>>>>>>>> >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> This dovetails in with my primary UX concern; >> where would the >> > > > > > > >>>>>>>>>> ChangeDetector actually be registered? None of the >> operators >> > > > > > in >> > > > > > > >>>>>>>>>> my example have names or topics or any other >> identifiable >> > > > > > > >>>>>>>>>> characteristic that could be passed to a >> ChangeDetector class >> > > > > > > >>>>>>>>>> registered via config. You could say that we make >> > > > > > > >>> ChangeDetector >> > > > > > > >>>>>>>>>> an optional parameter to every operation in >> Streams, but this >> > > > > > > >>>>>>>>>> seems to carry quite a bit of mental burden with >> it. People >> > > > > > > >>> will >> > > > > > > >>>>>>>>>> wonder what it's for and whether or not they >> should be using >> > > > > > > >>> it. >> > > > > > > >>>>>>>>>> There would almost certainly be a misconception >> that it's >> > > > > > > >>>>>>>>>> preferable to implement it always, which would be >> unfortunate. >> > > > > > > >>>>>>>>>> Plus, to actually implment metadata flowing >> through the >> > > > > > > >>> topology >> > > > > > > >>>>>>>>>> as in your use case, you'd have to do two things: >> 1. make sure >> > > > > > > >>>>>>>>>> that all operations actually preserve the metadata >> alongside >> > > > > > > >>> the >> > > > > > > >>>>>>>>>> data (e.g., don't accidentally add a mapValues >> like I did, or >> > > > > > > >>> you >> > > > > > > >>>>>>>>>> drop the metadata). 2. implement a ChangeDetector >> for every >> > > > > > > >>>>>>>>>> single operation in the topology, or you don't get >> the benefit >> > > > > > > >>> of >> > > > > > > >>>>>>>>>> dropping non-changes internally 2b. Alternatively, >> you could >> > > > > > > >>> just >> > > > > > > >>>>>>>>>> add the ChangeDetector to one operation toward the >> end of the >> > > > > > > >>>>>>>>>> topology. This would not drop redundant computation >> > > > > > internally, >> > > > > > > >>>>>>>>>> but only drop redundant _outputs_. But this is >> just about the >> > > > > > > >>>>>>>>>> same as your current solution. >> > > > > > > >>>>>>>>> >> > > > > > > >>>>>>>>> I definitely see your point regarding >> configuration. I was >> > > > > > > >>>>>>>>> originally thinking about this when the >> deduplication was going >> > > > > > > >>> to >> > > > > > > >>>>>>>>> be opt-in, and it seemed very natural to say >> something like: >> > > > > > > >>>>>>>>> >> > > > > > > >>>>>>>>> employeeInfo.join(employeePayroll, (info, payroll) >> -> new >> > > > > > > >>>>>>>>> Result(info.name(), payroll.salary())) >> > > > > > > >>>>>>>>> .suppress(duplicatesAccordingTo(someChangeDetector)) >> > > > > > > >>>>>>>>> >> > > > > > > >>>>>>>>> Alternatively you can imagine a similar method >> being on >> > > > > > > >>>>>>>>> Materialized, though obviously this makes less >> sense if we >> > > > > > don't >> > > > > > > >>>>>>>>> want to require materialization. If we're now >> talking about >> > > > > > > >>>>>>>>> changing the default behavior and not having any >> configuration >> > > > > > > >>>>>>>>> options, it's harder to find a place for this. >> > > > > > > >>>>>>>>> >> > > > > > > >>>>>>>>> >> > > > > > > >>>>>>>>> >> > > > > > > >>>>>>>>>> A final thought; if it really is a metadata >> question, can we >> > > > > > > >>> just >> > > > > > > >>>>>>>>>> plan to finish up the support for headers in >> Streams? I.e., >> > > > > > > >>> give >> > > > > > > >>>>>>>>>> you a way to control the way that headers flow >> through the >> > > > > > > >>>>>>>>>> topology? Then, we could treat headers the same >> way we treat >> > > > > > > >>>>>>>>>> timestamps in the no-op checking... We completely >> ignore them >> > > > > > > >>>>>>>>>> for the sake of comparison. Thus, neither the >> timestamp nor >> > > > > > the >> > > > > > > >>>>>>>>>> headers would get updated in internal state or in >> downstream >> > > > > > > >>>>>>>>>> views as long as the value itself doesn't change. >> This seems >> > > > > > to >> > > > > > > >>>>>>>>>> give us a way to support your use case without >> adding to the >> > > > > > > >>>>>>>>>> mental overhead of using Streams for simple things. >> > > > > > > >>>>>>>>> >> > > > > > > >>>>>>>>> Agree headers could be a decent fit for this >> particular case >> > > > > > > >>>>>>>>> because it's mostly metadata, though to be honest >> we haven't >> > > > > > > >>> looked >> > > > > > > >>>>>>>>> at headers much (mostly because, and to your point, >> support >> > > > > > > >>> seems >> > > > > > > >>>>>>>>> to be lacking). I feel like there would be other >> cases where >> > > > > > > >>> this >> > > > > > > >>>>>>>>> feature could be valuable, but I admit I can't come >> up with >> > > > > > > >>>>>>>>> anything right this second. Perhaps yuzhihong had >> an example in >> > > > > > > >>>>>>>>> mind? >> > > > > > > >>>>>>>>> >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> I.e., simple things should be easy, and complex >> things should >> > > > > > > >>> be >> > > > > > > >>>>>>>>>> possible. >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> What are your thoughts? Thanks, -John >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> On Mon, Feb 3, 2020, at 07:19, Thomas Becker wrote: >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> Hi John, Can you describe how you'd use >> filtering/mapping to >> > > > > > > >>>>>>>>>> deduplicate records? To give some background on my >> suggestion >> > > > > > > >>> we >> > > > > > > >>>>>>>>>> currently have a small stream processor that >> exists solely to >> > > > > > > >>>>>>>>>> deduplicate, which we do using a process that I >> assume would >> > > > > > be >> > > > > > > >>>>>>>>>> similar to what would be done here (with a store >> of keys and >> > > > > > > >>> hash >> > > > > > > >>>>>>>>>> values). But the records we are deduplicating have >> some >> > > > > > > >>> metadata >> > > > > > > >>>>>>>>>> fields (such as timestamps of when the record was >> posted) that >> > > > > > > >>> we >> > > > > > > >>>>>>>>>> don't consider semantically meaningful for >> downstream >> > > > > > > >>> consumers, >> > > > > > > >>>>>>>>>> and therefore we also suppress updates that only >> touch those >> > > > > > > >>>>>>>>>> fields. >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> -Tommy >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> On Fri, 2020-01-31 at 19:30 -0600, John Roesler >> wrote: >> > > > > > > >>> [EXTERNAL >> > > > > > > >>>>>>>>>> EMAIL] Attention: This email was sent from outside >> TiVo. DO >> > > > > > NOT >> > > > > > > >>>>>>>>>> CLICK any links or attachments unless you expected >> them. >> > > > > > > >>>>>>>>>> ________________________________ >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> Hi Thomas and yuzhihong, That’s an interesting >> idea. Can you >> > > > > > > >>> help >> > > > > > > >>>>>>>>>> think of a use case that isn’t also served by >> filtering or >> > > > > > > >>>>>>>>>> mapping beforehand? Thanks for helping to design >> this feature! >> > > > > > > >>>>>>>>>> -John On Fri, Jan 31, 2020, at 18:56, >> yuzhih...@gmail.com >> > > > > > > >>>>>>>>>> <mailto:yuzhih...@gmail.com> wrote: I think this >> is good >> > > > > > > >>> idea. On >> > > > > > > >>>>>>>>>> Jan 31, 2020, at 4:49 PM, Thomas Becker < >> > > > > > > >>> thomas.bec...@tivo.com >> > > > > > > >>>>>>>>>> <mailto:thomas.bec...@tivo.com>> wrote: How do >> folks feel >> > > > > > > >>> about >> > > > > > > >>>>>>>>>> allowing the mechanism by which no-ops are >> detected to be >> > > > > > > >>>>>>>>>> pluggable? Meaning use something like a hash by >> default, but >> > > > > > > >>> you >> > > > > > > >>>>>>>>>> could optionally provide an implementation of >> something to use >> > > > > > > >>>>>>>>>> instead, like a ChangeDetector. This could be >> useful for >> > > > > > > >>> example >> > > > > > > >>>>>>>>>> to ignore changes to certain fields, which may not >> be relevant >> > > > > > > >>> to >> > > > > > > >>>>>>>>>> the operation being performed. >> > > > > > ________________________________ >> > > > > > > >>>>>>>>>> From: John Roesler <vvcep...@apache.org >> > > > > > > >>>>>>>>>> <mailto:vvcep...@apache.org>> Sent: Friday, >> January 31, 2020 >> > > > > > > >>> 4:51 >> > > > > > > >>>>>>>>>> PM To: dev@kafka.apache.org <mailto: >> dev@kafka.apache.org> >> > > > > > > >>>>>>>>>> <dev@kafka.apache.org <mailto:dev@kafka.apache.org>> >> Subject: >> > > > > > > >>> Re: >> > > > > > > >>>>>>>>>> [KAFKA-557] Add emit on change support for Kafka >> Streams >> > > > > > > >>>>>>>>>> [EXTERNAL EMAIL] Attention: This email was sent >> from outside >> > > > > > > >>>>>>>>>> TiVo. DO NOT CLICK any links or attachments unless >> you >> > > > > > expected >> > > > > > > >>>>>>>>>> them. ________________________________ >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> Hello all, Sorry for my silence. It seems like we >> are getting >> > > > > > > >>>>>>>>>> close to consensus. Hopefully, we could move to a >> vote soon! >> > > > > > > >>> All >> > > > > > > >>>>>>>>>> of the reasoning from Matthias and Bruno around >> timestamp is >> > > > > > > >>>>>>>>>> compelling. I would be strongly in favor of >> stating a few >> > > > > > > >>> things >> > > > > > > >>>>>>>>>> very clearly in the KIP: 1. Streams will drop >> no-op updates >> > > > > > > >>> only >> > > > > > > >>>>>>>>>> for KTable operations. That is, we won't make any >> changes to >> > > > > > > >>>>>>>>>> KStream aggregations at the moment. It does seem >> like we can >> > > > > > > >>>>>>>>>> potentially revisit the time semantics of that >> operation in >> > > > > > the >> > > > > > > >>>>>>>>>> future, but we don't need to do it now. On the >> other hand, the >> > > > > > > >>>>>>>>>> proposed semantics for KTable timestamps (marking >> the >> > > > > > beginning >> > > > > > > >>>>>>>>>> of the validity of that record) makes sense to me. >> 2. Streams >> > > > > > > >>>>>>>>>> will only drop no-op updates for _stateful_ KTable >> operations. >> > > > > > > >>> We >> > > > > > > >>>>>>>>>> don't want to add a hard guarantee that Streams >> will _never_ >> > > > > > > >>>>>>>>>> emit a no-op table update because it would require >> adding >> > > > > > state >> > > > > > > >>>>>>>>>> to otherwise stateless operations. If someone is >> really >> > > > > > > >>> concerned >> > > > > > > >>>>>>>>>> about a particular stateless operation producing a >> lot of >> > > > > > no-op >> > > > > > > >>>>>>>>>> results, all they have to do is materialize it, >> and Streams >> > > > > > > >>> would >> > > > > > > >>>>>>>>>> automatically drop the no-ops. Additionally, I'm >> +1 on not >> > > > > > > >>> adding >> > > > > > > >>>>>>>>>> an opt-out at this time. Regarding the KIP itself, >> I would >> > > > > > > >>> clean >> > > > > > > >>>>>>>>>> it up a bit before calling for a vote. There is a >> lot of >> > > > > > > >>>>>>>>>> "discussion"-type language there, which is very >> natural to >> > > > > > > >>> read, >> > > > > > > >>>>>>>>>> but makes it a bit hard to see what _exactly_ the >> kip is >> > > > > > > >>>>>>>>>> proposing. Richard, would you mind just making the >> "proposed >> > > > > > > >>>>>>>>>> behavior change" a simple and succinct list of >> bullet points? >> > > > > > > >>>>>>>>>> I.e., please drop glue phrases like "there has >> been some >> > > > > > > >>>>>>>>>> discussion" or "possibly we could do X". For the >> final version >> > > > > > > >>> of >> > > > > > > >>>>>>>>>> the KIP, it should just say, "Streams will do X, >> Streams will >> > > > > > > >>> do >> > > > > > > >>>>>>>>>> Y". Feel free to add an elaboration section to >> explain more >> > > > > > > >>> about >> > > > > > > >>>>>>>>>> what X and Y mean, but we don't need to talk about >> > > > > > > >>> possibilities >> > > > > > > >>>>>>>>>> or alternatives except in the "rejected >> alternatives" section. >> > > > > > > >>>>>>>>>> Accordingly, can you also move the options you >> presented in >> > > > > > the >> > > > > > > >>>>>>>>>> intro to the "rejected alternatives" section and >> only mention >> > > > > > > >>> the >> > > > > > > >>>>>>>>>> final proposal itself? This just really helps >> reviewers to >> > > > > > know >> > > > > > > >>>>>>>>>> what they are voting for, and it helps everyone >> after the fact >> > > > > > > >>>>>>>>>> when they are trying to get clarity on what >> exactly the >> > > > > > > >>> proposal >> > > > > > > >>>>>>>>>> is, versus all the things it could have been. >> Thanks, -John >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> On Mon, Jan 27, 2020, at 18:14, Richard Yu wrote: >> Hello to >> > > > > > all, >> > > > > > > >>>>>>>>>> I've finished making some initial modifications to >> the KIP. I >> > > > > > > >>>>>>>>>> have decided to keep the implementation section in >> the KIP for >> > > > > > > >>>>>>>>>> record-keeping purposes. For now, we should focus >> on only the >> > > > > > > >>>>>>>>>> proposed behavior changes instead. See if you have >> any >> > > > > > > >>> comments! >> > > > > > > >>>>>>>>>> Cheers, Richard On Sat, Jan 25, 2020 at 11:12 AM >> Richard Yu >> > > > > > > >>>>>>>>>> <yohan.richard...@gmail.com <mailto: >> > > > > > yohan.richard...@gmail.com >> > > > > > > >>>>> >> > > > > > > >>>>>>>>>> wrote: Hi all, Thanks for all the discussion! >> @John and @Bruno >> > > > > > > >>> I >> > > > > > > >>>>>>>>>> will survey other possible systems and see what I >> can do. Just >> > > > > > > >>> a >> > > > > > > >>>>>>>>>> question, by systems, I suppose you would mean the >> pros and >> > > > > > > >>> cons >> > > > > > > >>>>>>>>>> of different reporting strategies? I'm not >> completely certain >> > > > > > > >>> on >> > > > > > > >>>>>>>>>> this point, so it would be great if you can >> clarify on that. >> > > > > > So >> > > > > > > >>>>>>>>>> here's what I got from all the discussion so far: >> - Since both >> > > > > > > >>>>>>>>>> Matthias and John seems to have come to a >> consensus on this, >> > > > > > > >>> then >> > > > > > > >>>>>>>>>> we will go for an all-round behavorial change for >> KTables. >> > > > > > > >>> After >> > > > > > > >>>>>>>>>> some thought, I decided that for now, an opt-out >> config will >> > > > > > > >>> not >> > > > > > > >>>>>>>>>> be added. As John have pointed out, no-op changes >> tend to >> > > > > > > >>>>>>>>>> explode further down the topology as they are >> forwarded to >> > > > > > more >> > > > > > > >>>>>>>>>> and more processor nodes downstream. - About using >> hash codes, >> > > > > > > >>>>>>>>>> after some explanation from John, it looks like >> hash codes >> > > > > > > >>> might >> > > > > > > >>>>>>>>>> not be as ideal (for implementation). For now, we >> will omit >> > > > > > > >>> that >> > > > > > > >>>>>>>>>> detail, and save it for the PR. - @Bruno You do >> have valid >> > > > > > > >>>>>>>>>> concerns. Though, I am not completely certain if >> we want to do >> > > > > > > >>>>>>>>>> emit-on-change only for materialized KTables. I >> will put it >> > > > > > > >>> down >> > > > > > > >>>>>>>>>> in the KIP regardless. I will do my best to >> address all points >> > > > > > > >>>>>>>>>> raised so far on the discussion. Hope we could >> keep this >> > > > > > going! >> > > > > > > >>>>>>>>>> Best, Richard On Fri, Jan 24, 2020 at 6:07 PM >> Bruno Cadonna >> > > > > > > >>>>>>>>>> <br...@confluent.io <mailto:br...@confluent.io>> >> wrote: Thank >> > > > > > > >>> you >> > > > > > > >>>>>>>>>> Matthias for the use cases! Looking at both use >> cases, I think >> > > > > > > >>>>>>>>>> you need to elaborate on them in the KIP, Richard. >> Emit from >> > > > > > > >>>>>>>>>> plain KTable: I agree with Matthias that the lower >> timestamp >> > > > > > > >>>>>>>>>> makes sense because it marks the start of the >> validity of the >> > > > > > > >>>>>>>>>> record. Idempotent records with a higher timestamp >> can be >> > > > > > > >>> safely >> > > > > > > >>>>>>>>>> ignored. A corner case that I discussed with >> Matthias offline >> > > > > > > >>> is >> > > > > > > >>>>>>>>>> when we do not materialize a KTable due to >> optimization. Then >> > > > > > > >>> we >> > > > > > > >>>>>>>>>> cannot avoid the idempotent records because we do >> not keep the >> > > > > > > >>>>>>>>>> first record with the lower timestamp to compare >> to. Emit from >> > > > > > > >>>>>>>>>> KTable with aggregations: If we specify that an >> aggregation >> > > > > > > >>>>>>>>>> result should have the highest timestamp of the >> records that >> > > > > > > >>>>>>>>>> participated in the aggregation, we cannot ignore >> any >> > > > > > > >>> idempotent >> > > > > > > >>>>>>>>>> records. Admittedly, the result of an aggregation >> usually >> > > > > > > >>>>>>>>>> changes, but there are aggregations where the >> result may not >> > > > > > > >>>>>>>>>> change like min and max, or sum when the incoming >> records have >> > > > > > > >>> a >> > > > > > > >>>>>>>>>> value of zero. In those cases, we could benefit of >> the emit on >> > > > > > > >>>>>>>>>> change, but only if we define the semantics of the >> > > > > > aggregations >> > > > > > > >>>>>>>>>> to not use the highest timestamp of the >> participating records >> > > > > > > >>> for >> > > > > > > >>>>>>>>>> the result. In Kafka Streams, we do not have min, >> max, and sum >> > > > > > > >>> as >> > > > > > > >>>>>>>>>> explicit aggregations, but we need to provide an >> API to define >> > > > > > > >>>>>>>>>> what timestamp should be used for the result of an >> aggregation >> > > > > > > >>> if >> > > > > > > >>>>>>>>>> we want to go down this path. All of this does not >> block this >> > > > > > > >>> KIP >> > > > > > > >>>>>>>>>> and I just wanted to put this aspects up for >> discussion. The >> > > > > > > >>> KIP >> > > > > > > >>>>>>>>>> can limit itself to emit from materialized >> KTables. However, >> > > > > > > >>> the >> > > > > > > >>>>>>>>>> limits should be explicitly stated in the KIP. >> Best, Bruno >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> On Fri, Jan 24, 2020 at 10:58 AM Matthias J. Sax >> > > > > > > >>>>>>>>>> <matth...@confluent.io <mailto: >> matth...@confluent.io>> wrote: >> > > > > > > >>>>>>>>>> IMHO, the question about semantics depends on the >> use case, in >> > > > > > > >>>>>>>>>> particular on the origin of a KTable. If there is >> a changlog >> > > > > > > >>>>>>>>>> topic that one reads directly into a KTable, >> emit-on-change >> > > > > > > >>> does >> > > > > > > >>>>>>>>>> actually make sense, because the timestamp >> indicates _when_ >> > > > > > the >> > > > > > > >>>>>>>>>> update was _effective_. For this case, it is >> semantically >> > > > > > sound >> > > > > > > >>>>>>>>>> to _not_ update the timestamp in the store, >> because the second >> > > > > > > >>>>>>>>>> update is actually idempotent and advancing the >> timestamp is >> > > > > > > >>> not >> > > > > > > >>>>>>>>>> ideal (one could even consider it to be wrong to >> advance the >> > > > > > > >>>>>>>>>> timestamp) because the "valid time" of the record >> pair did not >> > > > > > > >>>>>>>>>> change. This reasoning also applies to >> KTable-KTable joins. >> > > > > > > >>>>>>>>>> However, if the KTable is the result of an >> aggregation, I >> > > > > > think >> > > > > > > >>>>>>>>>> emit-on-update is more natural, because the >> timestamp reflects >> > > > > > > >>>>>>>>>> the _last_ time (ie, highest timestamp) of all >> input records >> > > > > > > >>> the >> > > > > > > >>>>>>>>>> contributed to the result. Hence, updating the >> timestamp and >> > > > > > > >>>>>>>>>> emitting a new record actually sounds correct to >> me. This >> > > > > > > >>> applies >> > > > > > > >>>>>>>>>> to windowed and non-windowed aggregations IMHO. >> However, >> > > > > > > >>>>>>>>>> considering the argument that the timestamp should >> not be >> > > > > > > >>> update >> > > > > > > >>>>>>>>>> in the first case in the store to begin with, both >> cases are >> > > > > > > >>>>>>>>>> actually the same, and both can be modeled as >> emit-on-change: >> > > > > > > >>> if >> > > > > > > >>>>>>>>>> a `table()` operator does not update the timestamp >> if the >> > > > > > value >> > > > > > > >>>>>>>>>> does not change, there is _no_ change and thus >> nothing is >> > > > > > > >>>>>>>>>> emitted. At the same time, if an aggregation >> operator does >> > > > > > > >>> update >> > > > > > > >>>>>>>>>> the timestamp (even if the value does not change) >> there _is_ a >> > > > > > > >>>>>>>>>> change and we emit. Note that handling >> out-of-order data for >> > > > > > > >>>>>>>>>> aggregations would also work seamlessly with this >> approach -- >> > > > > > > >>> for >> > > > > > > >>>>>>>>>> out-of-order records, the timestamp does never >> change, and >> > > > > > > >>> thus, >> > > > > > > >>>>>>>>>> we only emit if the result itself changes. >> Therefore, I would >> > > > > > > >>>>>>>>>> argue that we might not even need any config, >> because the >> > > > > > > >>>>>>>>>> emit-on-change behavior is just correct and >> reduced the >> > > > > > > >>>>>>>>>> downstream load, while our current behavior is not >> ideal (even >> > > > > > > >>> if >> > > > > > > >>>>>>>>>> it's also correct). Thoughts? -Matthias On 1/24/20 >> 9:37 AM, >> > > > > > > >>> John >> > > > > > > >>>>>>>>>> Roesler wrote: Hi Bruno, Thanks for that idea. I >> hadn't >> > > > > > > >>>>>>>>>> considered that option before, and it does seem >> like that >> > > > > > would >> > > > > > > >>>>>>>>>> be the right place to put it if we think it might >> be >> > > > > > > >>> semantically >> > > > > > > >>>>>>>>>> important to control on a table-by-table basis. I >> had been >> > > > > > > >>>>>>>>>> thinking of it less semantically and more >> practically. In the >> > > > > > > >>>>>>>>>> context of a large topology, or more generally, a >> large >> > > > > > > >>> software >> > > > > > > >>>>>>>>>> system that contains many topologies and other >> event-driven >> > > > > > > >>>>>>>>>> systems, each no-op result becomes an input that >> is destined >> > > > > > to >> > > > > > > >>>>>>>>>> itself become a no-op result, and so on, all the >> way through >> > > > > > > >>> the >> > > > > > > >>>>>>>>>> system. Thus, a single pointless processing result >> becomes >> > > > > > > >>>>>>>>>> amplified into a large number of pointless >> computations, cache >> > > > > > > >>>>>>>>>> perturbations, and network and disk I/O >> operations. If you >> > > > > > also >> > > > > > > >>>>>>>>>> consider operations with fan-out implications, >> like branching >> > > > > > > >>> or >> > > > > > > >>>>>>>>>> foreign-key joins, the wasted resources are >> amplified not just >> > > > > > > >>> in >> > > > > > > >>>>>>>>>> proportion to the size of the system, but the size >> of the >> > > > > > > >>> system >> > > > > > > >>>>>>>>>> times the average fan-out (to the power of the >> number of >> > > > > > > >>> fan-out >> > > > > > > >>>>>>>>>> operations on the path(s) through the system). In >> my time >> > > > > > > >>>>>>>>>> operating such systems, I've observed these >> effects to be very >> > > > > > > >>>>>>>>>> real, and actually, the system and use case >> doesn't have to be >> > > > > > > >>>>>>>>>> very large before the amplification poses an >> existential >> > > > > > threat >> > > > > > > >>>>>>>>>> to the system as a whole. This is the basis of my >> advocating >> > > > > > > >>> for >> > > > > > > >>>>>>>>>> a simple behavior change, rather than an opt-in >> config of any >> > > > > > > >>>>>>>>>> kind. It seems like Streams should "do the right >> thing" for >> > > > > > the >> > > > > > > >>>>>>>>>> majority use case. My theory (which may be wrong) >> is that the >> > > > > > > >>>>>>>>>> majority use case is more like "relational >> queries" than "CEP >> > > > > > > >>>>>>>>>> queries". Even if you were doing some >> event-sensitive >> > > > > > > >>>>>>>>>> computation, wouldn't you do them as Stream >> operations (where >> > > > > > > >>>>>>>>>> this feature is inapplicable anyway)? In keeping >> with the >> > > > > > > >>>>>>>>>> "practical" perspective, I suggested the opt-out >> config only >> > > > > > in >> > > > > > > >>>>>>>>>> the (I think unlikely) event that filtering out >> pointless >> > > > > > > >>> updates >> > > > > > > >>>>>>>>>> actually harms performance. I'd also be perfectly >> fine without >> > > > > > > >>>>>>>>>> the opt-out config. I really think that (because >> of the >> > > > > > > >>> timestamp >> > > > > > > >>>>>>>>>> semantics work already underway), we're already >> pre-fetching >> > > > > > > >>> the >> > > > > > > >>>>>>>>>> prior result most of the time, so there would >> actually be very >> > > > > > > >>>>>>>>>> little extra I/O involved in implementing >> emit-on-change. >> > > > > > > >>>>>>>>>> However, we should consider whether my experience >> is likely to >> > > > > > > >>>>>>>>>> be general. Do you have some use case in mind for >> which you'd >> > > > > > > >>>>>>>>>> actually want some KTable results to be >> emit-on-update for >> > > > > > > >>>>>>>>>> semantic reasons? Thanks, -John >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> On Fri, Jan 24, 2020, at 11:02, Bruno Cadonna >> wrote: Hi >> > > > > > > >>> Richard, >> > > > > > > >>>>>>>>>> Thank you for the KIP. I agree with John that we >> should focus >> > > > > > > >>> on >> > > > > > > >>>>>>>>>> the interface and behavior change in a KIP. We can >> discuss the >> > > > > > > >>>>>>>>>> implementation later. I am also +1 for the survey. >> I had a >> > > > > > > >>>>>>>>>> thought about this. Couldn't we consider >> emit-on-change to be >> > > > > > > >>> one >> > > > > > > >>>>>>>>>> config of suppress (like `untilWindowCloses`)? >> What you >> > > > > > > >>>>>>>>>> basically propose is to suppress updates if they >> do not change >> > > > > > > >>>>>>>>>> the result. Considering emit on change as a >> flavour of >> > > > > > suppress >> > > > > > > >>>>>>>>>> would be more flexible because it would specify >> the behavior >> > > > > > > >>>>>>>>>> locally for a KTable instead of globally for all >> KTables. >> > > > > > > >>>>>>>>>> Additionally, specifying the behavior in one place >> instead of >> > > > > > > >>>>>>>>>> multiple places feels more intuitive and >> consistent to me. >> > > > > > > >>> Best, >> > > > > > > >>>>>>>>>> Bruno On Fri, Jan 24, 2020 at 7:49 AM John Roesler >> > > > > > > >>>>>>>>>> <vvcep...@apache.org <mailto:vvcep...@apache.org>> >> wrote: Hi >> > > > > > > >>>>>>>>>> Richard, Thanks for picking this up! I know of at >> least one >> > > > > > > >>> large >> > > > > > > >>>>>>>>>> community member for which this feature is >> absolutely >> > > > > > > >>> essential. >> > > > > > > >>>>>>>>>> If I understand your two options, it seems like >> the proposal >> > > > > > is >> > > > > > > >>>>>>>>>> to implement it as a behavior change regardless, >> and the >> > > > > > > >>> question >> > > > > > > >>>>>>>>>> is whether to provide an opt-out config or not. >> Given that any >> > > > > > > >>>>>>>>>> implementation of this feature would have some >> performance >> > > > > > > >>> impact >> > > > > > > >>>>>>>>>> under some workloads, and also that we don't know >> if anyone >> > > > > > > >>>>>>>>>> really depends on emit-on-update time semantics, >> it seems like >> > > > > > > >>> we >> > > > > > > >>>>>>>>>> should propose to add an opt-out config. Can you >> update the >> > > > > > KIP >> > > > > > > >>>>>>>>>> to mention the exact config key and value(s) you'd >> propose? >> > > > > > > >>> Just >> > > > > > > >>>>>>>>>> to move the discussion forward, maybe something >> like: emit.on >> > > > > > > >>> := >> > > > > > > >>>>>>>>>> change|update with the new default being "change" >> Thanks for >> > > > > > > >>>>>>>>>> pointing out the timestamp issue in particular. I >> agree that >> > > > > > if >> > > > > > > >>>>>>>>>> we discard the latter update as a no-op, then we >> also have to >> > > > > > > >>>>>>>>>> discard its timestamp (obviously, we don't forward >> the >> > > > > > > >>> timestamp >> > > > > > > >>>>>>>>>> update, as that's the whole point, but we also >> can't update >> > > > > > the >> > > > > > > >>>>>>>>>> timestamp in the store, as the store must remain >> consistent >> > > > > > > >>> with >> > > > > > > >>>>>>>>>> what has been emitted). I have to confess that I >> disagree with >> > > > > > > >>>>>>>>>> your implementation proposal, but it's also not >> necessary to >> > > > > > > >>>>>>>>>> discuss implementation in the KIP. Maybe it would >> be less >> > > > > > > >>>>>>>>>> controversial if you just drop that section for >> now, so that >> > > > > > > >>> the >> > > > > > > >>>>>>>>>> KIP discussion can focus on the behavior change >> and config. >> > > > > > > >>> Just >> > > > > > > >>>>>>>>>> for reference, there is some research into this >> domain. For >> > > > > > > >>>>>>>>>> example, see the "Report" section (3.2.3) of the >> SECRET paper: >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>> >> > > > > > >> https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fpeop >> > > > > > > > le.csail.mit.edu >> > > > > > > >>>>>> >> %2Ftatbul%2Fpublications%2Fmaxstream_vldb10.pdf&data >> > > > > > > > =02%7C01%7CThomas.Becker%40tivo.com >> > > > > > > >>>>>> %7Ce0235483b1eb4f259c5c08d7a8d1c16b%7 >> > > > > > > > >> > > > > > > >>>>>> >> > > > > > > >>> >> > > > > > >> Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C1%7C637163491160859282&sdata >> > > > > > > > >> =4dSGIS8jNPAPP7B48r9e%2BUgFh3WdmzVyXhyT63eP8dI%3D&reserved=0 >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> >> > > > > > > > It might help to round out the proposal if you take a brief >> > > > > > > >>> survey of >> > > > > > > >>>>>>>>>> the behaviors of other systems, along with pros >> and cons if >> > > > > > any >> > > > > > > >>>>>>>>>> are reported. Thanks, -John >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> On Fri, Jan 10, 2020, at 22:27, Richard Yu wrote: >> Hi >> > > > > > everybody! >> > > > > > > >>>>>>>>>> I'd like to propose a change that we probably >> should've added >> > > > > > > >>> for >> > > > > > > >>>>>>>>>> a long time now. The key benefit of this KIP would >> be reduced >> > > > > > > >>>>>>>>>> traffic in Kafka Streams since a lot of no-op >> results would no >> > > > > > > >>>>>>>>>> longer be sent downstream. Here is the KIP for >> reference. >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>> >> > > > > > >> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwi >> > > > > > > > ki.apache.org >> > > > > > > >>>>>> >> %2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-557%253A%2BAdd%2Bemit >> > > > > > > > >> > > > > > > >>>>>> >> > > > > > > >>> >> > > > > > >> %2Bon%2Bchange%2Bsupport%2Bfor%2BKafka%2BStreams&data=02%7C01%7CThom >> > > > > > > > as.Becker%40tivo.com >> > > > > > > >>>>>> %7Ce0235483b1eb4f259c5c08d7a8d1c16b%7Cd05b7c6912014c >> > > > > > > > >> > > > > > > >>>>>> >> > > > > > > >>> >> > > > > > >> 0db45d7f1dcc227e4d%7C1%7C1%7C637163491160869277&sdata=zYpCSFOsyN4%2B >> > > > > > > > 4rKRZBQ%2FZvcGQ4EINR9Qm6PLsB7EKrc%3D&reserved=0 >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> >> > > > > > > > Currently, I seek to formalize our approach for this KIP >> first >> > > > > > > >>> before >> > > > > > > >>>>>>>>>> we determine concrete API additions / >> configurations. Some >> > > > > > > >>>>>>>>>> configs might warrant adding, whiles others are >> not necessary >> > > > > > > >>>>>>>>>> since adding them would only increase complexity >> of Kafka >> > > > > > > >>>>>>>>>> Streams. Cheers, Richard >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> ________________________________ This email and any >> > > > > > attachments >> > > > > > > >>>>>>>>>> may contain confidential and privileged material >> for the sole >> > > > > > > >>> use >> > > > > > > >>>>>>>>>> of the intended recipient. Any review, copying, or >> > > > > > distribution >> > > > > > > >>>>>>>>>> of this email (or any attachments) by others is >> prohibited. If >> > > > > > > >>>>>>>>>> you are not the intended recipient, please contact >> the sender >> > > > > > > >>>>>>>>>> immediately and permanently delete this email and >> any >> > > > > > > >>>>>>>>>> attachments. No employee or agent of TiVo is >> authorized to >> > > > > > > >>>>>>>>>> conclude any binding agreement on behalf of TiVo >> by email. >> > > > > > > >>>>>>>>>> Binding agreements with TiVo may only be made by a >> signed >> > > > > > > >>> written >> > > > > > > >>>>>>>>>> agreement. -- *Tommy Becker* *Principal Engineer * >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> *Personalized Content Discovery* >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> *O* +1 919.460.4747 *tivo.com* < >> http://www.tivo.com/> >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> >> > > > > > > >>>>>>>>>> This email and any attachments may contain >> confidential and >> > > > > > > >>>>>>>>>> privileged material for the sole use of the >> intended >> > > > > > recipient. >> > > > > > > >>>>>>>>>> Any review, copying, or distribution of this email >> (or any >> > > > > > > >>>>>>>>>> attachments) by others is prohibited. If you are >> not the >> > > > > > > >>> intended >> > > > > > > >>>>>>>>>> recipient, please contact the sender immediately >> and >> > > > > > > >>> permanently >> > > > > > > >>>>>>>>>> delete this email and any attachments. No employee >> or agent of >> > > > > > > >>>>>>>>>> TiVo is authorized to conclude any binding >> agreement on behalf >> > > > > > > >>> of >> > > > > > > >>>>>>>>>> TiVo by email. Binding agreements with TiVo may >> only be made >> > > > > > > >>> by a >> > > > > > > >>>>>>>>>> signed written agreement. >> > > > > > > >>>>>>>>> >> > > > > > > >>>>>>>>> -- >> > > > > > > >>>>>>>>> >> > > > > > > >>>>>>>>> *Tommy Becker* /Principal Engineer / >> > > > > > > >>>>>>>>> >> > > > > > > >>>>>>>>> /Personalized Content Discovery/ >> > > > > > > >>>>>>>>> >> > > > > > > >>>>>>>>> *O* +1 919.460.4747 *tivo.com* < >> http://www.tivo.com/> >> > > > > > > >>>>>>>>> >> > > > > > > >>>>>>>>> >> > > > > > > >>>>>>>>> >> > > > > > > >>>>>> >> > > > > > >> ---------------------------------------------------------------------- >> > > > > > > >>>>>>> >> > > > > > > >>>>>> >> > > > > > > >>>>> >> > > > > > > >>>> >> > > > > > > >>> >> > > > > > > >> >> > > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > Attachments: >> > > > > > > * signature.asc >> > > > > > >> > > > >> > >> >