Hello,

Just to make some updates. I changed the name of the metric so that it was
more in line with usual Kafka naming conventions for metrics / sensors.
Below is the updated description of the metric:

dropped-idempotent-updates : (Level 2 - Per Task) DEBUG (rate | total)

Description: This metric will record the number of updates that have been
dropped since they are essentially re-performing an earlier operation.

Note:

   - The rate option indicates the ratio of records dropped to actual
   volume of records passing through the task.
   - The total option will just give a raw count of the number of records
   dropped.


I hope that this is more on point.

Best,
Richard

On Fri, Feb 21, 2020 at 2:20 PM Richard Yu <yohan.richard...@gmail.com>
wrote:

> Hi all,
>
> Thanks for the clarification. I was just confused a little on what was
> going on.
>
> So I guess then that for the actual proposal. We got the following:
>
> 1. We check for binary equality, and perform no extra look ups.
> 2. Emphasize that this applies only to materialized tables.
> 3. We drop aggregation updates if key, value and timestamp is the same.
>
> Then that settles the behavior changes. So it looks like the Metric that
> is the only thing that is left. In this case, I think the metric would be
> named the following: IdempotentUpdateMetric. This is mostly off the top of
> my head. So if you think that we change it, feel free to say so.
> The metric will report the number of dropped operations inherently.
>
> It will probably be added as a Sensor, similar to the dropped records
> sensor we already have.
>
> If there isn't anything else, I will probably start the voting process
> next week!
>
> Cheers,
> Richard
>
>
> On Fri, Feb 21, 2020 at 11:23 AM John Roesler <vvcep...@apache.org> wrote:
>
>> Hi Bruno,
>>
>> Thanks for the clarification. Indeed, I was thinking two things:
>> 1. For the initial implementation, we can just avoid adding any extra
>> lookups, but only do the comparison when we already happen to have
>> the prior value.
>> 2. I think, as a result of the timestamp semantics, we actually _do_ look
>> up the prior value approximately all the time, so the idempotence check
>> should be quite effective.
>>
>> I think that second point is the same thing you're referring to
>> potentially
>> being unnecessary. It does mean that we do fetch the whole value in a
>> lot of cases where we really only need the timestamp, so it could
>> certainly
>> be optimized in the future. In that future, we would need to weigh that
>> optimization against losing the idempotence check. But, that's a problem
>> for tomorrow :)
>>
>> I'm 100% on board with scrutinizing the performance as we implement
>> this feature.
>>
>> Thanks again,
>> -John
>>
>> On Thu, Feb 20, 2020, at 03:25, Bruno Cadonna wrote:
>> > Hi John,
>> >
>> > I am glad to help you with your imagination. With overhead, I mainly
>> > meant the additional lookup into the state store to get the current
>> > value, but I see now in the code that we do that lookup anyways
>> > (although I think we could avoid that in the cases where we do not
>> > need the old value). With or without config, we need to evaluate the
>> > performance benefits of this change, in any case.
>> >
>> > Best,
>> > Bruno
>> >
>> > On Wed, Feb 19, 2020 at 7:48 PM John Roesler <vvcep...@apache.org>
>> wrote:
>> > >
>> > > Thanks for your remarks, Bruno!
>> > >
>> > > I'm in favor of standardizing on terminology like "not forwarding
>> > > idempotent updates" or "dropping idempotent updates". Maybe we
>> > > should make a pass on the KIP and just convert everything to this
>> > > phrasing. In retrospect, even the term "emit-on-change" has too much
>> > > semantic baggage, since it implies the semantics from the SECRET
>> > > paper, which we don't really want to imply here.
>> > >
>> > > I'm also in favor of the metric as you propose.
>> > >
>> > > Likewise with stream aggregations, I was also under the impression
>> > > that we agreed on dropping idempotent updates to the aggregation
>> > > result, any time we find that our "new" (key, value, timestamp) result
>> > > is identical to the prior one.
>> > >
>> > > Also, I'm +1 on all your recommendations for updating the KIP document
>> > > for clarity.
>> > >
>> > > Regarding the opt-out config. Perhaps I'm suffering from a failure of
>> > > imagination, but I don't see how the current proposal could really
>> have
>> > > a measurable impact on latency. If all we do is make a single extra
>> pass
>> > > to compare two byte arrays for equality, only in the cases where we
>> already
>> > > have the byte arrays available, it seems unlikely to measurably
>> affect the
>> > > processing of non-idempotent updates. It seems guaranteed to
>> _decrease_
>> > > the latency of processing idempotent updates, since we get to skip a
>> > > store#put, at least one producer#send, and also all downstream
>> processing,
>> > > including all the disk and network operations associated with
>> downstream
>> > > operations.
>> > >
>> > > It seems like if we're pretty sure this change would only _help_, we
>> shouldn't
>> > > introduce the operational burden of an extra configuration. If we
>> want to
>> > > be more aggressive about dropping idempotent operations in the future,
>> > > such as depending on equals() or adding a ChangeDetector interface,
>> then
>> > > we should consider adding a configuration as part of that future
>> work. In
>> > > fact, if we add a simple "opt-in/opt-out" switch right now, we might
>> find
>> > > that it's actually insufficient for whatever future feature we might
>> propose,
>> > > then we have a mess of deprecating the opt-out config and replacing
>> it.
>> > >
>> > > What do you think?
>> > > -John
>> > >
>> > > On Wed, Feb 19, 2020, at 09:50, Bruno Cadonna wrote:
>> > > > Hi all,
>> > > >
>> > > > Sorry for the late reply!
>> > > >
>> > > > I am also in favour of baby steps.
>> > > >
>> > > > I am undecided whether the KIP should contain a opt-out config or
>> not.
>> > > > The overhead of emit-on-change might affect latency. For
>> applications
>> > > > where low latency is crucial and there are not too many idempotent
>> > > > updates, it would be better to fall back to emit-on-update. However,
>> > > > we do not know how much emit-on-change impacts latency. We would
>> first
>> > > > need to benchmark that before we can decide about the
>> opt-out-config.
>> > > >
>> > > > A metric of dropped idempotent updates seems useful to me to be
>> > > > informed about potential upstream applications or upstream operators
>> > > > that produce too many idempotent updates. The KIP should state the
>> > > > name of the metric, its group, its tags, and its recording level
>> (see
>> > > > KIP-444 or KIP-471 for examples). I propose DEBUG as reporting
>> level.
>> > > >
>> > > > Richard, what competing proposals for emit-on-change for
>> aggregations
>> > > > do you mean? I have the feeling that we agreed to get rid of
>> > > > idempotent updates if the aggregate is updated with the same key,
>> > > > value, AND timestamp. I am also fine if we do not include this into
>> > > > this KIP (remember: baby steps).
>> > > >
>> > > > You write that "emit-on-change is more correct". Since we agreed
>> that
>> > > > this is an optimization, IMO you cannot argue this way.
>> > > >
>> > > > Please put "Alternative Approaches" under "Rejected Alternatives",
>> so
>> > > > that it becomes clear that we are not going to implement them. In
>> > > > general, I think the KIP needs a bit of clean-up (probably, you
>> > > > already planned for it). "Design Reasoning" is a bit of behavior
>> > > > changes, rejected alternatives and duplicates a bit the content in
>> > > > those sections.
>> > > >
>> > > > I do not like the name "no-op operations" or "no-ops", because they
>> > > > are rather generic. I like more "idempotent updates".
>> > > >
>> > > > Best,
>> > > > Bruno
>> > > >
>> > > >
>> > > > On Tue, Feb 18, 2020 at 7:25 PM Richard Yu <
>> yohan.richard...@gmail.com> wrote:
>> > > > >
>> > > > > Hi all,
>> > > > >
>> > > > > We are definitely making progress!
>> > > > >
>> > > > > @John should I emphasize in the proposed behavior changes that we
>> are only
>> > > > > doing binary equality checks for stateful operators?
>> > > > > It looks like we have come close to finalizing this part of the
>> KIP. (I
>> > > > > will note in the KIP that this proposal is intended for
>> optimization, not
>> > > > > semantics correctness)
>> > > > >
>> > > > > I do think maybe we still have one other detail we need to
>> discuss. So far,
>> > > > > there has been quite a bit of back and forth about what the
>> behavior of
>> > > > > aggregations should look like in emit on change. I have seen
>> > > > > multiple competing proposals, so I am not completely certain
>> which one we
>> > > > > should go with, or how we will be able to compromise in between
>> them.
>> > > > >
>> > > > > Let me know what your thoughts are on this matter, since we are
>> probably
>> > > > > close to wrapping up most other stuff.
>> > > > > @Matthias J. Sax <matth...@confluent.io>  and @Bruno, see what
>> you think
>> > > > > about this.
>> > > > >
>> > > > > Best,
>> > > > > Richard
>> > > > >
>> > > > >
>> > > > >
>> > > > > On Tue, Feb 18, 2020 at 9:06 AM John Roesler <vvcep...@apache.org>
>> wrote:
>> > > > >
>> > > > > > Thanks, Matthias!
>> > > > > >
>> > > > > > Regarding numbers, it would be hard to know how many
>> applications
>> > > > > > would benefit, since we don't know how many applications there
>> are,
>> > > > > > or anything about their data sets or topologies. We could do a
>> survey,
>> > > > > > but it seems overkill if we take the conservative approach.
>> > > > > >
>> > > > > > I have my own practical stream processing experience that tells
>> me this
>> > > > > > is absolutely critical for any moderate-to-large relational
>> stream
>> > > > > > processing use cases. I'll leave it to you to decide if you
>> find that
>> > > > > > convincing, but it's definitely not an _assumption_. I've also
>> heard from
>> > > > > > a few Streams users who have already had to implement their own
>> > > > > > noop-suppression transformers in order to get to production
>> scale.
>> > > > > >
>> > > > > > Regardless, it sounds like we can agree on taking an
>> opportunistic approach
>> > > > > > and targeting the optimization just to use a binary-equality
>> check at
>> > > > > > stateful operators. (I'd also suggest in sink nodes, when we
>> are about to
>> > > > > > send old and new values, since they are also already present
>> and serialized
>> > > > > > at that point.) We could make the KIP even more vague, and just
>> say that
>> > > > > > we'll drop no-op updates "when possible".
>> > > > > >
>> > > > > > I'm curious what Bruno and the others think about this. If it
>> seems like
>> > > > > > a good starting point, perhaps we could move to a vote soon and
>> get to
>> > > > > > work on the implementation!
>> > > > > >
>> > > > > > Thanks,
>> > > > > > -John
>> > > > > >
>> > > > > > On Mon, Feb 17, 2020, at 20:54, Matthias J. Sax wrote:
>> > > > > > > Talking about optimizations and reducing downstream load:
>> > > > > > >
>> > > > > > > Do we actually have any numbers? I have the impression that
>> this KIP is
>> > > > > > > more or less build on the _assumption_ that there is a
>> problem. Yes,
>> > > > > > > there are some use cases that would benefit from this; But
>> how many
>> > > > > > > applications would actually benefit? And how much load
>> reduction would
>> > > > > > > they get?
>> > > > > > >
>> > > > > > > The simplest approach (following John idea to make baby
>> steps) would be
>> > > > > > > to apply the emit-on-change pattern only if there is a store.
>> For this
>> > > > > > > case we need to serialize old and new result anyway and thus
>> a simple
>> > > > > > > byte-array comparison is no overhead.
>> > > > > > >
>> > > > > > > Sending `oldValues` by default would become expensive because
>> we would
>> > > > > > > need to serialize the recomputed old result, as well as the
>> new result,
>> > > > > > > to make the comparison (and we now the serialization is not
>> cheap). We
>> > > > > > > are facing a trade-off between CPU overhead and downstream
>> load and I am
>> > > > > > > not sure if we should hard code this. My original argument
>> for sending
>> > > > > > > `oldValues` was about semantics; but for an optimization, I
>> am not sure
>> > > > > > > if this would be the right choice.
>> > > > > > >
>> > > > > > > For now, users who want to opt-in can force a
>> materialization. A
>> > > > > > > materialization may be expensive and if we see future demand,
>> we could
>> > > > > > > still add an option to send `oldValues` instead of
>> materialization (this
>> > > > > > > would at least save the store overhead). As we consider the
>> KIP an
>> > > > > > > optimization, a "config" seems to make sense.
>> > > > > > >
>> > > > > > >
>> > > > > > > -Matthias
>> > > > > > >
>> > > > > > >
>> > > > > > > On 2/17/20 5:21 PM, Richard Yu wrote:
>> > > > > > > > Hi John!
>> > > > > > > >
>> > > > > > > > Thanks for the reply.
>> > > > > > > >
>> > > > > > > > About the changes we have discussed so far. I think upon
>> further
>> > > > > > > > consideration, we have been mostly talking about this from
>> the
>> > > > > > perspective
>> > > > > > > > that no stop-gap effort is acceptable. However, in recent
>> discussion,
>> > > > > > > if we
>> > > > > > > > consider optimization, then it appears that the perspective
>> I
>> > > > > > mentioned no
>> > > > > > > > longer applies. After all, we are no longer concerned so
>> much about
>> > > > > > > > semantics correctness, then reducing traffic as much as
>> possible
>> > > > > > without
>> > > > > > > > performance tradeoffs.
>> > > > > > > >
>> > > > > > > > In this case, I think a cache would be a good idea for
>> stateless
>> > > > > > > > operations. This cache will not be backed by a store
>> obviously. We can
>> > > > > > > > probably use Kafka's ThreadCache. We should be able to
>> catch a large
>> > > > > > > > portion of the no-ops if we at least store some results in
>> the cache.
>> > > > > > Not
>> > > > > > > > all will be caught, but I think the impact will be
>> significant.
>> > > > > > > >
>> > > > > > > > On another note, I think that we should implement competing
>> proposals
>> > > > > > i.e.
>> > > > > > > > one where we forward both old and new values with a
>> reasonable
>> > > > > > proportion
>> > > > > > > > of artificial no-ops (we do not necessarily have to rely on
>> equals so
>> > > > > > much
>> > > > > > > > as comparing the serialized binary data after the
>> operation), and in
>> > > > > > > > another scenario, the cache for stateless ops. It would be
>> > > > > > unreasonable if
>> > > > > > > > we completely disregard either approach, since they both
>> have merit.
>> > > > > > The
>> > > > > > > > reason for implementing both is to perform benchmark tests
>> on them, and
>> > > > > > > > compare them with the original. This way, we can more
>> clearly see what
>> > > > > > is
>> > > > > > > > the drawbacks and the gains. So far, we have been
>> discussing only
>> > > > > > > > hypotheticals, and if we continue to do so, I think it is
>> likely no
>> > > > > > ground
>> > > > > > > > will be gained.
>> > > > > > > >
>> > > > > > > > After all, what we seek is optimization, and performance
>> benchmarks
>> > > > > > > will be
>> > > > > > > > mandatory for a KIP of this nature.
>> > > > > > > >
>> > > > > > > > Hope this helps,
>> > > > > > > > Richard
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > On Mon, Feb 17, 2020 at 2:12 PM John Roesler <
>> vvcep...@apache.org>
>> > > > > > wrote:
>> > > > > > > >
>> > > > > > > >> Hi again, all,
>> > > > > > > >>
>> > > > > > > >> Sorry on my part for my silence.
>> > > > > > > >>
>> > > > > > > >> I've just taken another look over the recent history of
>> this
>> > > > > > > discussion. It
>> > > > > > > >> seems like the #1 point to clarify (because it affect
>> everything
>> > > > > > else) is
>> > > > > > > >> that,
>> > > > > > > >> yes, I was 100% envisioning this as an _optimization_.
>> > > > > > > >>
>> > > > > > > >> As a consequence, I don't think it's critical to make any
>> hard
>> > > > > > guarantees
>> > > > > > > >> about
>> > > > > > > >> what results get forwarded and what (no-op updates) get
>> dropped. I'd
>> > > > > > > >> initially
>> > > > > > > >> just been thinking about doing this opportunistically in
>> cases where
>> > > > > > we
>> > > > > > > >> already
>> > > > > > > >> had the "old" and "new" result in memory, thanks to a
>> request to
>> > > > > > > "emit old
>> > > > > > > >> values", or to the implementation of timestamp semantics.
>> > > > > > > >>
>> > > > > > > >> However, whether or not it's semantically critical, I do
>> think that
>> > > > > > > >> Matthias's
>> > > > > > > >> idea to use the change-forwarding mechanism to check for
>> no-ops even
>> > > > > > on
>> > > > > > > >> stateless operations is pretty interesting. Specifically,
>> this would
>> > > > > > > >> _really_
>> > > > > > > >> let you pare down useless updates by using mapValues to
>> strip down
>> > > > > > > records
>> > > > > > > >> only
>> > > > > > > >> to what you really need. However, the dependence on the
>> > > > > > implementation of
>> > > > > > > >> equals() is troubling.
>> > > > > > > >>
>> > > > > > > >> It might make sense to table this idea, as well as my
>> complicated
>> > > > > > no-op
>> > > > > > > >> detection algorithm, and initially propose just a
>> nonconfigurable
>> > > > > > feature
>> > > > > > > >> to
>> > > > > > > >> check "old" and "new" results for binary equality before
>> forwarding.
>> > > > > > > I.e.,
>> > > > > > > >> if
>> > > > > > > >> any operation determines that the old and new results are
>> > > > > > > >> binary-identical, we
>> > > > > > > >> would not forward.
>> > > > > > > >>
>> > > > > > > >> I'll admit that this doesn't serve Tommy's use case very
>> well, but it
>> > > > > > > >> might be
>> > > > > > > >> better to take baby steps with an optimization like this
>> and not risk
>> > > > > > > >> over-reaching in a way that actually harms performance or
>> > > > > > correctness. We
>> > > > > > > >> could
>> > > > > > > >> always expand the feature to use equals() or some kind of
>> > > > > > ChangeDetector
>> > > > > > > >> later
>> > > > > > > >> on, in a more focused discussion.
>> > > > > > > >>
>> > > > > > > >> Regarding metrics or debug logs, I guess I don't feel
>> strongly, but it
>> > > > > > > >> feels
>> > > > > > > >> like two things will happen that make it nicer to add them:
>> > > > > > > >>
>> > > > > > > >> 1. This feature is going to surprise/annoy _somebody_, and
>> it would be
>> > > > > > > >> nice to
>> > > > > > > >> be able to definitively say the reason that updates are
>> dropped is
>> > > > > > that
>> > > > > > > >> they
>> > > > > > > >> were no-ops. The easiest smoking gun is if there are
>> debug-logs that
>> > > > > > > can be
>> > > > > > > >> enabled. This person might just be looking at the
>> dashboards,
>> > > > > > > wondering why
>> > > > > > > >> there are 100K updates per second going into their app,
>> but only 1K
>> > > > > > > >> results per
>> > > > > > > >> second coming out. Having the metric there makes the
>> accounting
>> > > > > > easier.
>> > > > > > > >>
>> > > > > > > >> 2. Somebody is going to struggle with high-volume updates,
>> and it
>> > > > > > > would be
>> > > > > > > >> nice
>> > > > > > > >> for them to know that this feature is saving them
>> X-thousand updates
>> > > > > > per
>> > > > > > > >> second,
>> > > > > > > >> etc.
>> > > > > > > >>
>> > > > > > > >> What does everyone think about this? Note, as I read it,
>> what I've
>> > > > > > said
>> > > > > > > >> above is
>> > > > > > > >> already reflected in the text of the KIP.
>> > > > > > > >>
>> > > > > > > >> Thanks,
>> > > > > > > >> -John
>> > > > > > > >>
>> > > > > > > >>
>> > > > > > > >> On Tue, Feb 11, 2020, at 18:27, Richard Yu wrote:
>> > > > > > > >>> Hi all,
>> > > > > > > >>>
>> > > > > > > >>> Bumping this. If you feel that this KIP is not too
>> urgent. Then let
>> > > > > > me
>> > > > > > > >>> know. :)
>> > > > > > > >>>
>> > > > > > > >>> Cheers,
>> > > > > > > >>> Richard
>> > > > > > > >>>
>> > > > > > > >>> On Thu, Feb 6, 2020 at 4:55 PM Richard Yu <
>> > > > > > yohan.richard...@gmail.com>
>> > > > > > > >>> wrote:
>> > > > > > > >>>
>> > > > > > > >>>> Hi all,
>> > > > > > > >>>>
>> > > > > > > >>>> I've had just a few thoughts regarding the forwarding of
>> <key,
>> > > > > > > >>>> change<old_value, new_value>>. As Matthias already
>> mentioned, there
>> > > > > > > >> are two
>> > > > > > > >>>> separate priorities by which we can judge this KIP:
>> > > > > > > >>>>
>> > > > > > > >>>> 1. A optimization perspective: In this case, the user
>> would prefer
>> > > > > > the
>> > > > > > > >>>> impact of this KIP to be as minimal as possible. By such
>> logic, if
>> > > > > > > >>>> stateless operations are performed twice, that could
>> prove
>> > > > > > > >> unacceptable for
>> > > > > > > >>>> them. (since operations can prove expensive)
>> > > > > > > >>>>
>> > > > > > > >>>> 2. Semantics correctness perspective: Unlike the
>> optimization
>> > > > > > > >> approach, we
>> > > > > > > >>>> are more concerned with all KTable operations obeying
>> the same
>> > > > > > emission
>> > > > > > > >>>> policy. i.e. emit on change. In this case, a discrepancy
>> would not
>> > > > > > be
>> > > > > > > >>>> tolerated, even though an extra performance cost will be
>> incurred.
>> > > > > > > >>>> Therefore, we will follow Matthias's approach, and then
>> perform the
>> > > > > > > >>>> operation once on the old value, and once on the new.
>> > > > > > > >>>>
>> > > > > > > >>>> The issue here I think is more black and white than in
>> between. The
>> > > > > > > >> second
>> > > > > > > >>>> option in particular would be favorable for users with
>> inexpensive
>> > > > > > > >>>> stateless operations, while for the former option, we
>> are probably
>> > > > > > > >> dealing
>> > > > > > > >>>> with more expensive ones. So the simplest solution is
>> probably to
>> > > > > > > >> allow the
>> > > > > > > >>>> user to choose one of the behaviors, and have a config
>> which can
>> > > > > > > >> switch in
>> > > > > > > >>>> between them.
>> > > > > > > >>>>
>> > > > > > > >>>> Its the simplest compromise I can come up with at the
>> moment, but if
>> > > > > > > >> you
>> > > > > > > >>>> think you have a better plan which could better balance
>> tradeoffs.
>> > > > > > Then
>> > > > > > > >>>> please let us know. :)
>> > > > > > > >>>>
>> > > > > > > >>>> Best,
>> > > > > > > >>>> Richard
>> > > > > > > >>>>
>> > > > > > > >>>> On Wed, Feb 5, 2020 at 5:12 PM John Roesler <
>> vvcep...@apache.org>
>> > > > > > > >> wrote:
>> > > > > > > >>>>
>> > > > > > > >>>>> Hi all,
>> > > > > > > >>>>>
>> > > > > > > >>>>> Thanks for the thoughtful comments!
>> > > > > > > >>>>>
>> > > > > > > >>>>> I need more time to reflect on your thoughts, but just
>> wanted to
>> > > > > > offer
>> > > > > > > >>>>> a quick clarification about equals().
>> > > > > > > >>>>>
>> > > > > > > >>>>> I only meant that we can't be sure if a class's equals()
>> > > > > > > >> implementation
>> > > > > > > >>>>> returns true for two semantically identical instances.
>> I.e., if a
>> > > > > > > >> class
>> > > > > > > >>>>> doesn't
>> > > > > > > >>>>> override the default equals() implementation, then we
>> would see
>> > > > > > > >> behavior
>> > > > > > > >>>>> like:
>> > > > > > > >>>>>
>> > > > > > > >>>>> new MyPair("A", 1).equals(new MyPair("A", 1)) returns
>> false
>> > > > > > > >>>>>
>> > > > > > > >>>>> In that case, I would still like to catch no-op updates
>> by
>> > > > > > comparing
>> > > > > > > >> the
>> > > > > > > >>>>> serialized form of the records when we happen to have
>> it serialized
>> > > > > > > >> anyway
>> > > > > > > >>>>> (such as when the operation is stateful, or when we're
>> sending to a
>> > > > > > > >>>>> repartition topic and we have both the "new" and "old"
>> value from
>> > > > > > > >>>>> upstream).
>> > > > > > > >>>>>
>> > > > > > > >>>>> I didn't mean to suggest we'd try to use reflection to
>> detect
>> > > > > > whether
>> > > > > > > >>>>> equals
>> > > > > > > >>>>> is implemented, although that is a neat trick. I was
>> thinking more
>> > > > > > of
>> > > > > > > >> a
>> > > > > > > >>>>> belt-and-suspenders algorithm where we do the check for
>> no-ops
>> > > > > > based
>> > > > > > > >> on
>> > > > > > > >>>>> equals() and then _also_ check the serialized bytes for
>> equality.
>> > > > > > > >>>>>
>> > > > > > > >>>>> Thanks,
>> > > > > > > >>>>> -John
>> > > > > > > >>>>>
>> > > > > > > >>>>> On Wed, Feb 5, 2020, at 15:31, Ted Yu wrote:
>> > > > > > > >>>>>> Thanks for the comments, Matthias.
>> > > > > > > >>>>>>
>> > > > > > > >>>>>> w.r.t. requirement of an `equals()` implementation,
>> each template
>> > > > > > > >> type
>> > > > > > > >>>>>> would have an equals() method. We can use the
>> following code to
>> > > > > > know
>> > > > > > > >>>>>> whether it is provided by JVM or provided by user.
>> > > > > > > >>>>>>
>> > > > > > > >>>>>> boolean customEquals = false;
>> > > > > > > >>>>>> try {
>> > > > > > > >>>>>>     Class cls = value.getClass().getMethod("equals",
>> > > > > > > >>>>>> Object.class).getDeclaringClass();
>> > > > > > > >>>>>>     if (!Object.class.equals(cls)) {
>> > > > > > > >>>>>>         customEquals = true;
>> > > > > > > >>>>>>     }
>> > > > > > > >>>>>> } catch (NoSuchMethodException nsme) {
>> > > > > > > >>>>>>     // equals is always defined, this wouldn't hit
>> > > > > > > >>>>>> }
>> > > > > > > >>>>>>
>> > > > > > > >>>>>> The next question is: what if the user doesn't provide
>> equals()
>> > > > > > > >> method ?
>> > > > > > > >>>>>> Would we automatically fall back to emit-on-update ?
>> > > > > > > >>>>>>
>> > > > > > > >>>>>> Cheers
>> > > > > > > >>>>>>
>> > > > > > > >>>>>> On Tue, Feb 4, 2020 at 1:37 PM Matthias J. Sax <
>> mj...@apache.org>
>> > > > > > > >>>>> wrote:
>> > > > > > > >>>>>>
>> > > > > > > > First a high level comment:
>> > > > > > > >
>> > > > > > > > Overall, I would like to make one step back, and make sure
>> we are
>> > > > > > > > discussion on the same level. Originally, I understood this
>> KIP
>> > > > > > > >>> as a
>> > > > > > > > proposed change of _semantics_, however, given the latest
>> > > > > > > >>> discussion
>> > > > > > > > it seems it's actually not -- it's more an _optimization_
>> > > > > > > >>> proposal.
>> > > > > > > > Hence, we only need to make sure that this optimization
>> does not
>> > > > > > > >>> break
>> > > > > > > > existing semantics. It this the right way to think about it?
>> > > > > > > >
>> > > > > > > > If yes, than it might actually be ok to have different
>> behavior
>> > > > > > > > depending if there is a materialized KTable or not. So far,
>> we
>> > > > > > > >>> never
>> > > > > > > > defined a public contract about our emit strategy and it
>> seems
>> > > > > > > >>> this
>> > > > > > > > KIP does not define one either.
>> > > > > > > >
>> > > > > > > > Hence, I don't have as strong of an opinion about sending
>> > > > > > > >>> oldValues
>> > > > > > > > for example any longer. I guess the question is really,
>> what can
>> > > > > > > >>> we
>> > > > > > > > implement in a reasonable way.
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > Other comments:
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > @Richard:
>> > > > > > > >
>> > > > > > > > Can you please add the KIP to the KIP overview table: It's
>> missing
>> > > > > > > > (
>> > > > > > > >>>>>>
>> > > > > > > >>>
>> > > > > >
>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Pro
>> > > > > > > > posals).
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > @Bruno:
>> > > > > > > >
>> > > > > > > > You mentioned caching. I think it's irrelevant (orthogonal)
>> and
>> > > > > > > >>> we can
>> > > > > > > > discuss this KIP without considering it.
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > @John:
>> > > > > > > >
>> > > > > > > >>>>>>>>> Even in the source table, we forward the updated
>> record with
>> > > > > > the
>> > > > > > > >>>>>>>>> higher of the two timestamps. So the example is
>> more like:
>> > > > > > > >
>> > > > > > > > That is not correct. Currently, we forward with the smaller
>> > > > > > > > out-of-order timestamp (changing the timestamp would
>> corrupt the
>> > > > > > > >>> data
>> > > > > > > > -- we don't know, because we don't check, if the value is
>> the
>> > > > > > > >>> same
>> > > > > > > >>>>>> or
>> > > > > > > > a different one, hence, we must emit the out-of-order record
>> > > > > > > >>> as-is).
>> > > > > > > >
>> > > > > > > > If we start to do emit-on-change, we also need to emit a new
>> > > > > > > >>> record if
>> > > > > > > > the timestamp changes due to out-of-order data, hence, we
>> would
>> > > > > > > >>> still
>> > > > > > > > need to emit <K,V,T1> because that give us correct
>> semantics:
>> > > > > > > >>> assume
>> > > > > > > > you have a filter() and afterward use the filter KTable in a
>> > > > > > > > stream-table join -- the lower T1 timestamp must be
>> propagated to
>> > > > > > > >>> the
>> > > > > > > > filtered KTable to ensure that that the stream-table join
>> compute
>> > > > > > > >>> the
>> > > > > > > > correct result.
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > Your point about requiring an `equals()` implementation is
>> > > > > > > >>> actually a
>> > > > > > > > quite interesting one and boils down to my statement from
>> above
>> > > > > > > >>> about
>> > > > > > > > "what can we actually implement". What I don't understand
>> is:
>> > > > > > > >
>> > > > > > > >>>>>>>>> This way, we still don't have to rely on the
>> existence of an
>> > > > > > > >>>>>>>>> equals() method, but if it is there, we can benefit
>> from it.
>> > > > > > > >
>> > > > > > > > Your bullet point (2) says it uses `equals()` -- hence, it
>> seems
>> > > > > > > >>> we
>> > > > > > > > actually to rely on it? Also, how can we detect if there is
>> an
>> > > > > > > > `equals()` method to do the comparison? Would be fail if we
>> don't
>> > > > > > > >>> have
>> > > > > > > > `equals()` nor corresponding serializes to do the
>> comparison?
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >>>>>>>>> Wow, really good catch! Yes, we absolutely need
>> metrics and
>> > > > > > > >>> logs if
>> > > > > > > >>>>>>>>> we're going to drop any records. And, yes, we
>> should propose
>> > > > > > > >>>>>>>>> metrics and logs that are similar to the existing
>> ones when we
>> > > > > > > >>> drop
>> > > > > > > >>>>>>>>> records for other reasons.
>> > > > > > > >
>> > > > > > > > I am not sure about this point. In fact, we have already
>> some
>> > > > > > > >>> no-ops
>> > > > > > > > in Kafka Streams in our join-operators and don't report any
>> of
>> > > > > > > >>> those
>> > > > > > > > either. Emit-on-change is operator semantics and I don't
>> see why
>> > > > > > > >>> we
>> > > > > > > > would need to have a metric for it? It seems to be quite
>> different
>> > > > > > > > compared to dropping late or malformed records.
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > -Matthias
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > On 2/4/20 7:13 AM, Thomas Becker wrote:
>> > > > > > > >>>>>>>>> Thanks John for your thoughtful reply. Some
>> comments inline.
>> > > > > > > >>>>>>>>>
>> > > > > > > >>>>>>>>>
>> > > > > > > >>>>>>>>> On Mon, 2020-02-03 at 11:51 -0600, John Roesler
>> wrote:
>> > > > > > > >>>>>>>>>> [EXTERNAL EMAIL] Attention: This email was sent
>> from outside
>> > > > > > > >>>>>>>>>> TiVo. DO NOT CLICK any links or attachments unless
>> you
>> > > > > > expected
>> > > > > > > >>>>>>>>>> them. ________________________________
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>> Hi Tommy,
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>> Thanks for the context. I can see the attraction of
>> > > > > > considering
>> > > > > > > >>>>>>>>>> these use cases together.
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>> To answer your question, if a part of the record
>> is not
>> > > > > > > >>> relevant
>> > > > > > > >>>>>>>>>> to downstream consumers, I was thinking you could
>> just use a
>> > > > > > > >>>>>>>>>> mapValue to remove it.
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>> E.g., suppose you wanted to do a join between two
>> tables.
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>> employeeInfo.join( employeePayroll, (info,
>> payroll) -> new
>> > > > > > > >>>>>>>>>> Result(info.name(), payroll.salary()) )
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>> We only care about one attribute from the Info
>> table (name),
>> > > > > > > >>> and
>> > > > > > > >>>>>>>>>> one from the Payroll table (salary), and these
>> attributes
>> > > > > > > >>> change
>> > > > > > > >>>>>>>>>> rarely. On the other hand, there might be many
>> other
>> > > > > > attributes
>> > > > > > > >>>>>>>>>> that change frequently of these tables. We can
>> avoid
>> > > > > > triggering
>> > > > > > > >>>>>>>>>> the join unnecessarily by mapping the input tables
>> to drop the
>> > > > > > > >>>>>>>>>> unnecessary information before the join:
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>> names = employeeInfo.mapValues(info -> info.name())
>> salaries
>> > > > > > =
>> > > > > > > >>>>>>>>>> employeePayroll.mapValues(payroll ->
>> payroll.salary())
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>> names.join( salaries, (name, salary) -> new
>> Result(name,
>> > > > > > > >>> salary)
>> > > > > > > >>>>>>>>>> )
>> > > > > > > >>>>>>>>>
>> > > > > > > >>>>>>>>> Ahh yes I see. This works, but in the case where
>> you're using
>> > > > > > > >>>>>>>>> schemas as we are (e.g. Avro), it seems like this
>> approach
>> > > > > > could
>> > > > > > > >>>>>>>>> lead to a proliferation of "skinny" record types
>> that just drop
>> > > > > > > >>>>>>>>> various fields.
>> > > > > > > >>>>>>>>>
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>> Especially if we take Matthias's idea to drop
>> non-changes even
>> > > > > > > >>>>>>>>>> for stateless operations, this would be quite
>> efficient and is
>> > > > > > > >>>>>>>>>> also a very straightforward optimization to
>> understand once
>> > > > > > you
>> > > > > > > >>>>>>>>>> know that Streams provides emit-on-change.
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>> From the context that you provided, it seems like
>> a slightly
>> > > > > > > >>>>>>>>>> different situation, though. Reading between the
>> lines a
>> > > > > > > >>> little,
>> > > > > > > >>>>>>>>>> it sounds like: in contrast to the example above,
>> in which we
>> > > > > > > >>> are
>> > > > > > > >>>>>>>>>> filtering out extra _data_, you have some extra
>> _metadata_
>> > > > > > that
>> > > > > > > >>>>>>>>>> you still wish to pass down with the data when
>> there is a
>> > > > > > > >>> "real"
>> > > > > > > >>>>>>>>>> update, but you don't want the metadata itself to
>> cause an
>> > > > > > > >>>>>>>>>> update.
>> > > > > > > >>>>>>>>>
>> > > > > > > >>>>>>>>> Despite my lack of clarity, yes you've got it right
>> ;) This
>> > > > > > > >>>>>>>>> particular processor is the first stop for this
>> data after
>> > > > > > > >>> coming
>> > > > > > > >>>>>>>>> in from external users, who often simply post the
>> same content
>> > > > > > > >>> each
>> > > > > > > >>>>>>>>> time and we're trying to shield downstream
>> consumers from
>> > > > > > > >>>>>>>>> unnecessary churn.
>> > > > > > > >>>>>>>>>
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>> It does seem handy to be able to plug in a custom
>> > > > > > > >>> ChangeDetector
>> > > > > > > >>>>>>>>>> for this purpose, but I worry about the API
>> complexity. Maybe
>> > > > > > > >>> you
>> > > > > > > >>>>>>>>>> can help think though how to provide the same
>> benefit while
>> > > > > > > >>>>>>>>>> limiting user-facing complexity.
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>> Here's some extra context to consider:
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>> We currently don't make any extra requirements
>> about the
>> > > > > > nature
>> > > > > > > >>>>>>>>>> of data that you can use in Streams. For example,
>> you don't
>> > > > > > > >>> have
>> > > > > > > >>>>>>>>>> to implement hashCode and equals, or compareTo,
>> etc. With the
>> > > > > > > >>>>>>>>>> current proposal, we can do an airtight comparison
>> based only
>> > > > > > > >>> on
>> > > > > > > >>>>>>>>>> the serialized form of the values, and we actually
>> don't have
>> > > > > > > >>> to
>> > > > > > > >>>>>>>>>> deserialize the "prior" value at all for a large
>> number of
>> > > > > > > >>>>>>>>>> operations. Admitedly, if we extend the proposal
>> to include
>> > > > > > > >>> no-op
>> > > > > > > >>>>>>>>>> detection for stateless operations, we'd probably
>> need to rely
>> > > > > > > >>> on
>> > > > > > > >>>>>>>>>> equals() for no-op checking, otherwise we'd wind
>> up requiring
>> > > > > > > >>>>>>>>>> serdes for stateless operations as well. Actually,
>> I'd
>> > > > > > probably
>> > > > > > > >>>>>>>>>> argue for doing exactly that:
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>> 1. In stateful operations, drop if the serialized
>> byte[]s are
>> > > > > > > >>> the
>> > > > > > > >>>>>>>>>> same. After deserializing, also drop if the
>> objects are equal
>> > > > > > > >>>>>>>>>> according to Object#equals().
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>> 2. In stateless operations, compare the "new" and
>> "old" values
>> > > > > > > >>>>>>>>>> (if "old" is available) based on Object#equals().
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>> 3. As a final optimization, after serializing and
>> before
>> > > > > > > >>> sending
>> > > > > > > >>>>>>>>>> repartition records, compare the serialized data
>> and drop
>> > > > > > > >>>>>>>>>> no-ops.
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>> This way, we still don't have to rely on the
>> existence of an
>> > > > > > > >>>>>>>>>> equals() method, but if it is there, we can
>> benefit from it.
>> > > > > > > >>>>>>>>>> Also, we don't require a serde in any new
>> situations, but we
>> > > > > > > >>> can
>> > > > > > > >>>>>>>>>> still leverage it when it is available.
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>> For clarity, in my example above, even if the
>> employeeInfo and
>> > > > > > > >>>>>>>>>> employeePayroll and Result records all have
>> serdes, we need
>> > > > > > the
>> > > > > > > >>>>>>>>>> "name" field (presumably String) and the "salary"
>> field
>> > > > > > > >>>>>>>>>> (presumable a Double) to have serdes as well in
>> the naive
>> > > > > > > >>>>>>>>>> implementation. But if we can leverage equals(),
>> then the
>> > > > > > > >>> "right
>> > > > > > > >>>>>>>>>> thing" happens automatically.
>> > > > > > > >>>>>>>>>
>> > > > > > > >>>>>>>>> I still don't totally follow why the individual
>> components
>> > > > > > > >>> (name,
>> > > > > > > >>>>>>>>> salary) would have to have serdes here. If Result
>> has one, we
>> > > > > > > >>>>>>>>> compare bytes, and if Result additionally has an
>> equals()
>> > > > > > method
>> > > > > > > >>>>>>>>> (which presumably includes equals comparisons on the
>> > > > > > constituent
>> > > > > > > >>>>>>>>> fields), have we not covered our bases?
>> > > > > > > >>>>>>>>>
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>> This dovetails in with my primary UX concern;
>> where would the
>> > > > > > > >>>>>>>>>> ChangeDetector actually be registered? None of the
>> operators
>> > > > > > in
>> > > > > > > >>>>>>>>>> my example have names or topics or any other
>> identifiable
>> > > > > > > >>>>>>>>>> characteristic that could be passed to a
>> ChangeDetector class
>> > > > > > > >>>>>>>>>> registered via config. You could say that we make
>> > > > > > > >>> ChangeDetector
>> > > > > > > >>>>>>>>>> an optional parameter to every operation in
>> Streams, but this
>> > > > > > > >>>>>>>>>> seems to carry quite a bit of mental burden with
>> it. People
>> > > > > > > >>> will
>> > > > > > > >>>>>>>>>> wonder what it's for and whether or not they
>> should be using
>> > > > > > > >>> it.
>> > > > > > > >>>>>>>>>> There would almost certainly be a misconception
>> that it's
>> > > > > > > >>>>>>>>>> preferable to implement it always, which would be
>> unfortunate.
>> > > > > > > >>>>>>>>>> Plus, to actually implment metadata flowing
>> through the
>> > > > > > > >>> topology
>> > > > > > > >>>>>>>>>> as in your use case, you'd have to do two things:
>> 1. make sure
>> > > > > > > >>>>>>>>>> that all operations actually preserve the metadata
>> alongside
>> > > > > > > >>> the
>> > > > > > > >>>>>>>>>> data (e.g., don't accidentally add a mapValues
>> like I did, or
>> > > > > > > >>> you
>> > > > > > > >>>>>>>>>> drop the metadata). 2. implement a ChangeDetector
>> for every
>> > > > > > > >>>>>>>>>> single operation in the topology, or you don't get
>> the benefit
>> > > > > > > >>> of
>> > > > > > > >>>>>>>>>> dropping non-changes internally 2b. Alternatively,
>> you could
>> > > > > > > >>> just
>> > > > > > > >>>>>>>>>> add the ChangeDetector to one operation toward the
>> end of the
>> > > > > > > >>>>>>>>>> topology. This would not drop redundant computation
>> > > > > > internally,
>> > > > > > > >>>>>>>>>> but only drop redundant _outputs_. But this is
>> just about the
>> > > > > > > >>>>>>>>>> same as your current solution.
>> > > > > > > >>>>>>>>>
>> > > > > > > >>>>>>>>> I definitely see your point regarding
>> configuration. I was
>> > > > > > > >>>>>>>>> originally thinking about this when the
>> deduplication was going
>> > > > > > > >>> to
>> > > > > > > >>>>>>>>> be opt-in, and it seemed very natural to say
>> something like:
>> > > > > > > >>>>>>>>>
>> > > > > > > >>>>>>>>> employeeInfo.join(employeePayroll, (info, payroll)
>> -> new
>> > > > > > > >>>>>>>>> Result(info.name(), payroll.salary()))
>> > > > > > > >>>>>>>>> .suppress(duplicatesAccordingTo(someChangeDetector))
>> > > > > > > >>>>>>>>>
>> > > > > > > >>>>>>>>> Alternatively you can imagine a similar method
>> being on
>> > > > > > > >>>>>>>>> Materialized, though obviously this makes less
>> sense if we
>> > > > > > don't
>> > > > > > > >>>>>>>>> want to require materialization. If we're now
>> talking about
>> > > > > > > >>>>>>>>> changing the default behavior and not having any
>> configuration
>> > > > > > > >>>>>>>>> options, it's harder to find a place for this.
>> > > > > > > >>>>>>>>>
>> > > > > > > >>>>>>>>>
>> > > > > > > >>>>>>>>>
>> > > > > > > >>>>>>>>>> A final thought; if it really is a metadata
>> question, can we
>> > > > > > > >>> just
>> > > > > > > >>>>>>>>>> plan to finish up the support for headers in
>> Streams? I.e.,
>> > > > > > > >>> give
>> > > > > > > >>>>>>>>>> you a way to control the way that headers flow
>> through the
>> > > > > > > >>>>>>>>>> topology? Then, we could treat headers the same
>> way we treat
>> > > > > > > >>>>>>>>>> timestamps in the no-op checking... We completely
>> ignore them
>> > > > > > > >>>>>>>>>> for the sake of comparison. Thus, neither the
>> timestamp nor
>> > > > > > the
>> > > > > > > >>>>>>>>>> headers would get updated in internal state or in
>> downstream
>> > > > > > > >>>>>>>>>> views as long as the value itself doesn't change.
>> This seems
>> > > > > > to
>> > > > > > > >>>>>>>>>> give us a way to support your use case without
>> adding to the
>> > > > > > > >>>>>>>>>> mental overhead of using Streams for simple things.
>> > > > > > > >>>>>>>>>
>> > > > > > > >>>>>>>>> Agree headers could be a decent fit for this
>> particular case
>> > > > > > > >>>>>>>>> because it's mostly metadata, though to be honest
>> we haven't
>> > > > > > > >>> looked
>> > > > > > > >>>>>>>>> at headers much (mostly because, and to your point,
>> support
>> > > > > > > >>> seems
>> > > > > > > >>>>>>>>> to be lacking). I feel like there would be other
>> cases where
>> > > > > > > >>> this
>> > > > > > > >>>>>>>>> feature could be valuable, but I admit I can't come
>> up with
>> > > > > > > >>>>>>>>> anything right this second. Perhaps yuzhihong had
>> an example in
>> > > > > > > >>>>>>>>> mind?
>> > > > > > > >>>>>>>>>
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>> I.e., simple things should be easy, and complex
>> things should
>> > > > > > > >>> be
>> > > > > > > >>>>>>>>>> possible.
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>> What are your thoughts? Thanks, -John
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>> On Mon, Feb 3, 2020, at 07:19, Thomas Becker wrote:
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>> Hi John, Can you describe how you'd use
>> filtering/mapping to
>> > > > > > > >>>>>>>>>> deduplicate records? To give some background on my
>> suggestion
>> > > > > > > >>> we
>> > > > > > > >>>>>>>>>> currently have a small stream processor that
>> exists solely to
>> > > > > > > >>>>>>>>>> deduplicate, which we do using a process that I
>> assume would
>> > > > > > be
>> > > > > > > >>>>>>>>>> similar to what would be done here (with a store
>> of keys and
>> > > > > > > >>> hash
>> > > > > > > >>>>>>>>>> values). But the records we are deduplicating have
>> some
>> > > > > > > >>> metadata
>> > > > > > > >>>>>>>>>> fields (such as timestamps of when the record was
>> posted) that
>> > > > > > > >>> we
>> > > > > > > >>>>>>>>>> don't consider semantically meaningful for
>> downstream
>> > > > > > > >>> consumers,
>> > > > > > > >>>>>>>>>> and therefore we also suppress updates that only
>> touch those
>> > > > > > > >>>>>>>>>> fields.
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>> -Tommy
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>> On Fri, 2020-01-31 at 19:30 -0600, John Roesler
>> wrote:
>> > > > > > > >>> [EXTERNAL
>> > > > > > > >>>>>>>>>> EMAIL] Attention: This email was sent from outside
>> TiVo. DO
>> > > > > > NOT
>> > > > > > > >>>>>>>>>> CLICK any links or attachments unless you expected
>> them.
>> > > > > > > >>>>>>>>>> ________________________________
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>> Hi Thomas and yuzhihong, That’s an interesting
>> idea. Can you
>> > > > > > > >>> help
>> > > > > > > >>>>>>>>>> think of a use case that isn’t also served by
>> filtering or
>> > > > > > > >>>>>>>>>> mapping beforehand? Thanks for helping to design
>> this feature!
>> > > > > > > >>>>>>>>>> -John On Fri, Jan 31, 2020, at 18:56,
>> yuzhih...@gmail.com
>> > > > > > > >>>>>>>>>> <mailto:yuzhih...@gmail.com> wrote: I think this
>> is good
>> > > > > > > >>> idea. On
>> > > > > > > >>>>>>>>>> Jan 31, 2020, at 4:49 PM, Thomas Becker <
>> > > > > > > >>> thomas.bec...@tivo.com
>> > > > > > > >>>>>>>>>> <mailto:thomas.bec...@tivo.com>> wrote: How do
>> folks feel
>> > > > > > > >>> about
>> > > > > > > >>>>>>>>>> allowing the mechanism by which no-ops are
>> detected to be
>> > > > > > > >>>>>>>>>> pluggable? Meaning use something like a hash by
>> default, but
>> > > > > > > >>> you
>> > > > > > > >>>>>>>>>> could optionally provide an implementation of
>> something to use
>> > > > > > > >>>>>>>>>> instead, like a ChangeDetector. This could be
>> useful for
>> > > > > > > >>> example
>> > > > > > > >>>>>>>>>> to ignore changes to certain fields, which may not
>> be relevant
>> > > > > > > >>> to
>> > > > > > > >>>>>>>>>> the operation being performed.
>> > > > > > ________________________________
>> > > > > > > >>>>>>>>>> From: John Roesler <vvcep...@apache.org
>> > > > > > > >>>>>>>>>> <mailto:vvcep...@apache.org>> Sent: Friday,
>> January 31, 2020
>> > > > > > > >>> 4:51
>> > > > > > > >>>>>>>>>> PM To: dev@kafka.apache.org <mailto:
>> dev@kafka.apache.org>
>> > > > > > > >>>>>>>>>> <dev@kafka.apache.org <mailto:dev@kafka.apache.org>>
>> Subject:
>> > > > > > > >>> Re:
>> > > > > > > >>>>>>>>>> [KAFKA-557] Add emit on change support for Kafka
>> Streams
>> > > > > > > >>>>>>>>>> [EXTERNAL EMAIL] Attention: This email was sent
>> from outside
>> > > > > > > >>>>>>>>>> TiVo. DO NOT CLICK any links or attachments unless
>> you
>> > > > > > expected
>> > > > > > > >>>>>>>>>> them. ________________________________
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>> Hello all, Sorry for my silence. It seems like we
>> are getting
>> > > > > > > >>>>>>>>>> close to consensus. Hopefully, we could move to a
>> vote soon!
>> > > > > > > >>> All
>> > > > > > > >>>>>>>>>> of the reasoning from Matthias and Bruno around
>> timestamp is
>> > > > > > > >>>>>>>>>> compelling. I would be strongly in favor of
>> stating a few
>> > > > > > > >>> things
>> > > > > > > >>>>>>>>>> very clearly in the KIP: 1. Streams will drop
>> no-op updates
>> > > > > > > >>> only
>> > > > > > > >>>>>>>>>> for KTable operations. That is, we won't make any
>> changes to
>> > > > > > > >>>>>>>>>> KStream aggregations at the moment. It does seem
>> like we can
>> > > > > > > >>>>>>>>>> potentially revisit the time semantics of that
>> operation in
>> > > > > > the
>> > > > > > > >>>>>>>>>> future, but we don't need to do it now. On the
>> other hand, the
>> > > > > > > >>>>>>>>>> proposed semantics for KTable timestamps (marking
>> the
>> > > > > > beginning
>> > > > > > > >>>>>>>>>> of the validity of that record) makes sense to me.
>> 2. Streams
>> > > > > > > >>>>>>>>>> will only drop no-op updates for _stateful_ KTable
>> operations.
>> > > > > > > >>> We
>> > > > > > > >>>>>>>>>> don't want to add a hard guarantee that Streams
>> will _never_
>> > > > > > > >>>>>>>>>> emit a no-op table update because it would require
>> adding
>> > > > > > state
>> > > > > > > >>>>>>>>>> to otherwise stateless operations. If someone is
>> really
>> > > > > > > >>> concerned
>> > > > > > > >>>>>>>>>> about a particular stateless operation producing a
>> lot of
>> > > > > > no-op
>> > > > > > > >>>>>>>>>> results, all they have to do is materialize it,
>> and Streams
>> > > > > > > >>> would
>> > > > > > > >>>>>>>>>> automatically drop the no-ops. Additionally, I'm
>> +1 on not
>> > > > > > > >>> adding
>> > > > > > > >>>>>>>>>> an opt-out at this time. Regarding the KIP itself,
>> I would
>> > > > > > > >>> clean
>> > > > > > > >>>>>>>>>> it up a bit before calling for a vote. There is a
>> lot of
>> > > > > > > >>>>>>>>>> "discussion"-type language there, which is very
>> natural to
>> > > > > > > >>> read,
>> > > > > > > >>>>>>>>>> but makes it a bit hard to see what _exactly_ the
>> kip is
>> > > > > > > >>>>>>>>>> proposing. Richard, would you mind just making the
>> "proposed
>> > > > > > > >>>>>>>>>> behavior change" a simple and succinct list of
>> bullet points?
>> > > > > > > >>>>>>>>>> I.e., please drop glue phrases like "there has
>> been some
>> > > > > > > >>>>>>>>>> discussion" or "possibly we could do X". For the
>> final version
>> > > > > > > >>> of
>> > > > > > > >>>>>>>>>> the KIP, it should just say, "Streams will do X,
>> Streams will
>> > > > > > > >>> do
>> > > > > > > >>>>>>>>>> Y". Feel free to add an elaboration section to
>> explain more
>> > > > > > > >>> about
>> > > > > > > >>>>>>>>>> what X and Y mean, but we don't need to talk about
>> > > > > > > >>> possibilities
>> > > > > > > >>>>>>>>>> or alternatives except in the "rejected
>> alternatives" section.
>> > > > > > > >>>>>>>>>> Accordingly, can you also move the options you
>> presented in
>> > > > > > the
>> > > > > > > >>>>>>>>>> intro to the "rejected alternatives" section and
>> only mention
>> > > > > > > >>> the
>> > > > > > > >>>>>>>>>> final proposal itself? This just really helps
>> reviewers to
>> > > > > > know
>> > > > > > > >>>>>>>>>> what they are voting for, and it helps everyone
>> after the fact
>> > > > > > > >>>>>>>>>> when they are trying to get clarity on what
>> exactly the
>> > > > > > > >>> proposal
>> > > > > > > >>>>>>>>>> is, versus all the things it could have been.
>> Thanks, -John
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>> On Mon, Jan 27, 2020, at 18:14, Richard Yu wrote:
>> Hello to
>> > > > > > all,
>> > > > > > > >>>>>>>>>> I've finished making some initial modifications to
>> the KIP. I
>> > > > > > > >>>>>>>>>> have decided to keep the implementation section in
>> the KIP for
>> > > > > > > >>>>>>>>>> record-keeping purposes. For now, we should focus
>> on only the
>> > > > > > > >>>>>>>>>> proposed behavior changes instead. See if you have
>> any
>> > > > > > > >>> comments!
>> > > > > > > >>>>>>>>>> Cheers, Richard On Sat, Jan 25, 2020 at 11:12 AM
>> Richard Yu
>> > > > > > > >>>>>>>>>> <yohan.richard...@gmail.com <mailto:
>> > > > > > yohan.richard...@gmail.com
>> > > > > > > >>>>>
>> > > > > > > >>>>>>>>>> wrote: Hi all, Thanks for all the discussion!
>> @John and @Bruno
>> > > > > > > >>> I
>> > > > > > > >>>>>>>>>> will survey other possible systems and see what I
>> can do. Just
>> > > > > > > >>> a
>> > > > > > > >>>>>>>>>> question, by systems, I suppose you would mean the
>> pros and
>> > > > > > > >>> cons
>> > > > > > > >>>>>>>>>> of different reporting strategies? I'm not
>> completely certain
>> > > > > > > >>> on
>> > > > > > > >>>>>>>>>> this point, so it would be great if you can
>> clarify on that.
>> > > > > > So
>> > > > > > > >>>>>>>>>> here's what I got from all the discussion so far:
>> - Since both
>> > > > > > > >>>>>>>>>> Matthias and John seems to have come to a
>> consensus on this,
>> > > > > > > >>> then
>> > > > > > > >>>>>>>>>> we will go for an all-round behavorial change for
>> KTables.
>> > > > > > > >>> After
>> > > > > > > >>>>>>>>>> some thought, I decided that for now, an opt-out
>> config will
>> > > > > > > >>> not
>> > > > > > > >>>>>>>>>> be added. As John have pointed out, no-op changes
>> tend to
>> > > > > > > >>>>>>>>>> explode further down the topology as they are
>> forwarded to
>> > > > > > more
>> > > > > > > >>>>>>>>>> and more processor nodes downstream. - About using
>> hash codes,
>> > > > > > > >>>>>>>>>> after some explanation from John, it looks like
>> hash codes
>> > > > > > > >>> might
>> > > > > > > >>>>>>>>>> not be as ideal (for implementation). For now, we
>> will omit
>> > > > > > > >>> that
>> > > > > > > >>>>>>>>>> detail, and save it for the PR. - @Bruno You do
>> have valid
>> > > > > > > >>>>>>>>>> concerns. Though, I am not completely certain if
>> we want to do
>> > > > > > > >>>>>>>>>> emit-on-change only for materialized KTables. I
>> will put it
>> > > > > > > >>> down
>> > > > > > > >>>>>>>>>> in the KIP regardless. I will do my best to
>> address all points
>> > > > > > > >>>>>>>>>> raised so far on the discussion. Hope we could
>> keep this
>> > > > > > going!
>> > > > > > > >>>>>>>>>> Best, Richard On Fri, Jan 24, 2020 at 6:07 PM
>> Bruno Cadonna
>> > > > > > > >>>>>>>>>> <br...@confluent.io <mailto:br...@confluent.io>>
>> wrote: Thank
>> > > > > > > >>> you
>> > > > > > > >>>>>>>>>> Matthias for the use cases! Looking at both use
>> cases, I think
>> > > > > > > >>>>>>>>>> you need to elaborate on them in the KIP, Richard.
>> Emit from
>> > > > > > > >>>>>>>>>> plain KTable: I agree with Matthias that the lower
>> timestamp
>> > > > > > > >>>>>>>>>> makes sense because it marks the start of the
>> validity of the
>> > > > > > > >>>>>>>>>> record. Idempotent records with a higher timestamp
>> can be
>> > > > > > > >>> safely
>> > > > > > > >>>>>>>>>> ignored. A corner case that I discussed with
>> Matthias offline
>> > > > > > > >>> is
>> > > > > > > >>>>>>>>>> when we do not materialize a KTable due to
>> optimization. Then
>> > > > > > > >>> we
>> > > > > > > >>>>>>>>>> cannot avoid the idempotent records because we do
>> not keep the
>> > > > > > > >>>>>>>>>> first record with the lower timestamp to compare
>> to. Emit from
>> > > > > > > >>>>>>>>>> KTable with aggregations: If we specify that an
>> aggregation
>> > > > > > > >>>>>>>>>> result should have the highest timestamp of the
>> records that
>> > > > > > > >>>>>>>>>> participated in the aggregation, we cannot ignore
>> any
>> > > > > > > >>> idempotent
>> > > > > > > >>>>>>>>>> records. Admittedly, the result of an aggregation
>> usually
>> > > > > > > >>>>>>>>>> changes, but there are aggregations where the
>> result may not
>> > > > > > > >>>>>>>>>> change like min and max, or sum when the incoming
>> records have
>> > > > > > > >>> a
>> > > > > > > >>>>>>>>>> value of zero. In those cases, we could benefit of
>> the emit on
>> > > > > > > >>>>>>>>>> change, but only if we define the semantics of the
>> > > > > > aggregations
>> > > > > > > >>>>>>>>>> to not use the highest timestamp of the
>> participating records
>> > > > > > > >>> for
>> > > > > > > >>>>>>>>>> the result. In Kafka Streams, we do not have min,
>> max, and sum
>> > > > > > > >>> as
>> > > > > > > >>>>>>>>>> explicit aggregations, but we need to provide an
>> API to define
>> > > > > > > >>>>>>>>>> what timestamp should be used for the result of an
>> aggregation
>> > > > > > > >>> if
>> > > > > > > >>>>>>>>>> we want to go down this path. All of this does not
>> block this
>> > > > > > > >>> KIP
>> > > > > > > >>>>>>>>>> and I just wanted to put this aspects up for
>> discussion. The
>> > > > > > > >>> KIP
>> > > > > > > >>>>>>>>>> can limit itself to emit from materialized
>> KTables. However,
>> > > > > > > >>> the
>> > > > > > > >>>>>>>>>> limits should be explicitly stated in the KIP.
>> Best, Bruno
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>> On Fri, Jan 24, 2020 at 10:58 AM Matthias J. Sax
>> > > > > > > >>>>>>>>>> <matth...@confluent.io <mailto:
>> matth...@confluent.io>> wrote:
>> > > > > > > >>>>>>>>>> IMHO, the question about semantics depends on the
>> use case, in
>> > > > > > > >>>>>>>>>> particular on the origin of a KTable. If there is
>> a changlog
>> > > > > > > >>>>>>>>>> topic that one reads directly into a KTable,
>> emit-on-change
>> > > > > > > >>> does
>> > > > > > > >>>>>>>>>> actually make sense, because the timestamp
>> indicates _when_
>> > > > > > the
>> > > > > > > >>>>>>>>>> update was _effective_. For this case, it is
>> semantically
>> > > > > > sound
>> > > > > > > >>>>>>>>>> to _not_ update the timestamp in the store,
>> because the second
>> > > > > > > >>>>>>>>>> update is actually idempotent and advancing the
>> timestamp is
>> > > > > > > >>> not
>> > > > > > > >>>>>>>>>> ideal (one could even consider it to be wrong to
>> advance the
>> > > > > > > >>>>>>>>>> timestamp) because the "valid time" of the record
>> pair did not
>> > > > > > > >>>>>>>>>> change. This reasoning also applies to
>> KTable-KTable joins.
>> > > > > > > >>>>>>>>>> However, if the KTable is the result of an
>> aggregation, I
>> > > > > > think
>> > > > > > > >>>>>>>>>> emit-on-update is more natural, because the
>> timestamp reflects
>> > > > > > > >>>>>>>>>> the _last_ time (ie, highest timestamp) of all
>> input records
>> > > > > > > >>> the
>> > > > > > > >>>>>>>>>> contributed to the result. Hence, updating the
>> timestamp and
>> > > > > > > >>>>>>>>>> emitting a new record actually sounds correct to
>> me. This
>> > > > > > > >>> applies
>> > > > > > > >>>>>>>>>> to windowed and non-windowed aggregations IMHO.
>> However,
>> > > > > > > >>>>>>>>>> considering the argument that the timestamp should
>> not be
>> > > > > > > >>> update
>> > > > > > > >>>>>>>>>> in the first case in the store to begin with, both
>> cases are
>> > > > > > > >>>>>>>>>> actually the same, and both can be modeled as
>> emit-on-change:
>> > > > > > > >>> if
>> > > > > > > >>>>>>>>>> a `table()` operator does not update the timestamp
>> if the
>> > > > > > value
>> > > > > > > >>>>>>>>>> does not change, there is _no_ change and thus
>> nothing is
>> > > > > > > >>>>>>>>>> emitted. At the same time, if an aggregation
>> operator does
>> > > > > > > >>> update
>> > > > > > > >>>>>>>>>> the timestamp (even if the value does not change)
>> there _is_ a
>> > > > > > > >>>>>>>>>> change and we emit. Note that handling
>> out-of-order data for
>> > > > > > > >>>>>>>>>> aggregations would also work seamlessly with this
>> approach --
>> > > > > > > >>> for
>> > > > > > > >>>>>>>>>> out-of-order records, the timestamp does never
>> change, and
>> > > > > > > >>> thus,
>> > > > > > > >>>>>>>>>> we only emit if the result itself changes.
>> Therefore, I would
>> > > > > > > >>>>>>>>>> argue that we might not even need any config,
>> because the
>> > > > > > > >>>>>>>>>> emit-on-change behavior is just correct and
>> reduced the
>> > > > > > > >>>>>>>>>> downstream load, while our current behavior is not
>> ideal (even
>> > > > > > > >>> if
>> > > > > > > >>>>>>>>>> it's also correct). Thoughts? -Matthias On 1/24/20
>> 9:37 AM,
>> > > > > > > >>> John
>> > > > > > > >>>>>>>>>> Roesler wrote: Hi Bruno, Thanks for that idea. I
>> hadn't
>> > > > > > > >>>>>>>>>> considered that option before, and it does seem
>> like that
>> > > > > > would
>> > > > > > > >>>>>>>>>> be the right place to put it if we think it might
>> be
>> > > > > > > >>> semantically
>> > > > > > > >>>>>>>>>> important to control on a table-by-table basis. I
>> had been
>> > > > > > > >>>>>>>>>> thinking of it less semantically and more
>> practically. In the
>> > > > > > > >>>>>>>>>> context of a large topology, or more generally, a
>> large
>> > > > > > > >>> software
>> > > > > > > >>>>>>>>>> system that contains many topologies and other
>> event-driven
>> > > > > > > >>>>>>>>>> systems, each no-op result becomes an input that
>> is destined
>> > > > > > to
>> > > > > > > >>>>>>>>>> itself become a no-op result, and so on, all the
>> way through
>> > > > > > > >>> the
>> > > > > > > >>>>>>>>>> system. Thus, a single pointless processing result
>> becomes
>> > > > > > > >>>>>>>>>> amplified into a large number of pointless
>> computations, cache
>> > > > > > > >>>>>>>>>> perturbations, and network and disk I/O
>> operations. If you
>> > > > > > also
>> > > > > > > >>>>>>>>>> consider operations with fan-out implications,
>> like branching
>> > > > > > > >>> or
>> > > > > > > >>>>>>>>>> foreign-key joins, the wasted resources are
>> amplified not just
>> > > > > > > >>> in
>> > > > > > > >>>>>>>>>> proportion to the size of the system, but the size
>> of the
>> > > > > > > >>> system
>> > > > > > > >>>>>>>>>> times the average fan-out (to the power of the
>> number of
>> > > > > > > >>> fan-out
>> > > > > > > >>>>>>>>>> operations on the path(s) through the system). In
>> my time
>> > > > > > > >>>>>>>>>> operating such systems, I've observed these
>> effects to be very
>> > > > > > > >>>>>>>>>> real, and actually, the system and use case
>> doesn't have to be
>> > > > > > > >>>>>>>>>> very large before the amplification poses an
>> existential
>> > > > > > threat
>> > > > > > > >>>>>>>>>> to the system as a whole. This is the basis of my
>> advocating
>> > > > > > > >>> for
>> > > > > > > >>>>>>>>>> a simple behavior change, rather than an opt-in
>> config of any
>> > > > > > > >>>>>>>>>> kind. It seems like Streams should "do the right
>> thing" for
>> > > > > > the
>> > > > > > > >>>>>>>>>> majority use case. My theory (which may be wrong)
>> is that the
>> > > > > > > >>>>>>>>>> majority use case is more like "relational
>> queries" than "CEP
>> > > > > > > >>>>>>>>>> queries". Even if you were doing some
>> event-sensitive
>> > > > > > > >>>>>>>>>> computation, wouldn't you do them as Stream
>> operations (where
>> > > > > > > >>>>>>>>>> this feature is inapplicable anyway)? In keeping
>> with the
>> > > > > > > >>>>>>>>>> "practical" perspective, I suggested the opt-out
>> config only
>> > > > > > in
>> > > > > > > >>>>>>>>>> the (I think unlikely) event that filtering out
>> pointless
>> > > > > > > >>> updates
>> > > > > > > >>>>>>>>>> actually harms performance. I'd also be perfectly
>> fine without
>> > > > > > > >>>>>>>>>> the opt-out config. I really think that (because
>> of the
>> > > > > > > >>> timestamp
>> > > > > > > >>>>>>>>>> semantics work already underway), we're already
>> pre-fetching
>> > > > > > > >>> the
>> > > > > > > >>>>>>>>>> prior result most of the time, so there would
>> actually be very
>> > > > > > > >>>>>>>>>> little extra I/O involved in implementing
>> emit-on-change.
>> > > > > > > >>>>>>>>>> However, we should consider whether my experience
>> is likely to
>> > > > > > > >>>>>>>>>> be general. Do you have some use case in mind for
>> which you'd
>> > > > > > > >>>>>>>>>> actually want some KTable results to be
>> emit-on-update for
>> > > > > > > >>>>>>>>>> semantic reasons? Thanks, -John
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>> On Fri, Jan 24, 2020, at 11:02, Bruno Cadonna
>> wrote: Hi
>> > > > > > > >>> Richard,
>> > > > > > > >>>>>>>>>> Thank you for the KIP. I agree with John that we
>> should focus
>> > > > > > > >>> on
>> > > > > > > >>>>>>>>>> the interface and behavior change in a KIP. We can
>> discuss the
>> > > > > > > >>>>>>>>>> implementation later. I am also +1 for the survey.
>> I had a
>> > > > > > > >>>>>>>>>> thought about this. Couldn't we consider
>> emit-on-change to be
>> > > > > > > >>> one
>> > > > > > > >>>>>>>>>> config of suppress (like `untilWindowCloses`)?
>> What you
>> > > > > > > >>>>>>>>>> basically propose is to suppress updates if they
>> do not change
>> > > > > > > >>>>>>>>>> the result. Considering emit on change as a
>> flavour of
>> > > > > > suppress
>> > > > > > > >>>>>>>>>> would be more flexible because it would specify
>> the behavior
>> > > > > > > >>>>>>>>>> locally for a KTable instead of globally for all
>> KTables.
>> > > > > > > >>>>>>>>>> Additionally, specifying the behavior in one place
>> instead of
>> > > > > > > >>>>>>>>>> multiple places feels more intuitive and
>> consistent to me.
>> > > > > > > >>> Best,
>> > > > > > > >>>>>>>>>> Bruno On Fri, Jan 24, 2020 at 7:49 AM John Roesler
>> > > > > > > >>>>>>>>>> <vvcep...@apache.org <mailto:vvcep...@apache.org>>
>> wrote: Hi
>> > > > > > > >>>>>>>>>> Richard, Thanks for picking this up! I know of at
>> least one
>> > > > > > > >>> large
>> > > > > > > >>>>>>>>>> community member for which this feature is
>> absolutely
>> > > > > > > >>> essential.
>> > > > > > > >>>>>>>>>> If I understand your two options, it seems like
>> the proposal
>> > > > > > is
>> > > > > > > >>>>>>>>>> to implement it as a behavior change regardless,
>> and the
>> > > > > > > >>> question
>> > > > > > > >>>>>>>>>> is whether to provide an opt-out config or not.
>> Given that any
>> > > > > > > >>>>>>>>>> implementation of this feature would have some
>> performance
>> > > > > > > >>> impact
>> > > > > > > >>>>>>>>>> under some workloads, and also that we don't know
>> if anyone
>> > > > > > > >>>>>>>>>> really depends on emit-on-update time semantics,
>> it seems like
>> > > > > > > >>> we
>> > > > > > > >>>>>>>>>> should propose to add an opt-out config. Can you
>> update the
>> > > > > > KIP
>> > > > > > > >>>>>>>>>> to mention the exact config key and value(s) you'd
>> propose?
>> > > > > > > >>> Just
>> > > > > > > >>>>>>>>>> to move the discussion forward, maybe something
>> like: emit.on
>> > > > > > > >>> :=
>> > > > > > > >>>>>>>>>> change|update with the new default being "change"
>> Thanks for
>> > > > > > > >>>>>>>>>> pointing out the timestamp issue in particular. I
>> agree that
>> > > > > > if
>> > > > > > > >>>>>>>>>> we discard the latter update as a no-op, then we
>> also have to
>> > > > > > > >>>>>>>>>> discard its timestamp (obviously, we don't forward
>> the
>> > > > > > > >>> timestamp
>> > > > > > > >>>>>>>>>> update, as that's the whole point, but we also
>> can't update
>> > > > > > the
>> > > > > > > >>>>>>>>>> timestamp in the store, as the store must remain
>> consistent
>> > > > > > > >>> with
>> > > > > > > >>>>>>>>>> what has been emitted). I have to confess that I
>> disagree with
>> > > > > > > >>>>>>>>>> your implementation proposal, but it's also not
>> necessary to
>> > > > > > > >>>>>>>>>> discuss implementation in the KIP. Maybe it would
>> be less
>> > > > > > > >>>>>>>>>> controversial if you just drop that section for
>> now, so that
>> > > > > > > >>> the
>> > > > > > > >>>>>>>>>> KIP discussion can focus on the behavior change
>> and config.
>> > > > > > > >>> Just
>> > > > > > > >>>>>>>>>> for reference, there is some research into this
>> domain. For
>> > > > > > > >>>>>>>>>> example, see the "Report" section (3.2.3) of the
>> SECRET paper:
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>
>> > > > > >
>> https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fpeop
>> > > > > > > > le.csail.mit.edu
>> > > > > > > >>>>>>
>> %2Ftatbul%2Fpublications%2Fmaxstream_vldb10.pdf&amp;data
>> > > > > > > > =02%7C01%7CThomas.Becker%40tivo.com
>> > > > > > > >>>>>> %7Ce0235483b1eb4f259c5c08d7a8d1c16b%7
>> > > > > > > >
>> > > > > > > >>>>>>
>> > > > > > > >>>
>> > > > > >
>> Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C1%7C637163491160859282&amp;sdata
>> > > > > > > >
>> =4dSGIS8jNPAPP7B48r9e%2BUgFh3WdmzVyXhyT63eP8dI%3D&amp;reserved=0
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>>
>> > > > > > > > It might help to round out the proposal if you take a brief
>> > > > > > > >>> survey of
>> > > > > > > >>>>>>>>>> the behaviors of other systems, along with pros
>> and cons if
>> > > > > > any
>> > > > > > > >>>>>>>>>> are reported. Thanks, -John
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>> On Fri, Jan 10, 2020, at 22:27, Richard Yu wrote:
>> Hi
>> > > > > > everybody!
>> > > > > > > >>>>>>>>>> I'd like to propose a change that we probably
>> should've added
>> > > > > > > >>> for
>> > > > > > > >>>>>>>>>> a long time now. The key benefit of this KIP would
>> be reduced
>> > > > > > > >>>>>>>>>> traffic in Kafka Streams since a lot of no-op
>> results would no
>> > > > > > > >>>>>>>>>> longer be sent downstream. Here is the KIP for
>> reference.
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>
>> > > > > >
>> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwi
>> > > > > > > > ki.apache.org
>> > > > > > > >>>>>>
>> %2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-557%253A%2BAdd%2Bemit
>> > > > > > > >
>> > > > > > > >>>>>>
>> > > > > > > >>>
>> > > > > >
>> %2Bon%2Bchange%2Bsupport%2Bfor%2BKafka%2BStreams&amp;data=02%7C01%7CThom
>> > > > > > > > as.Becker%40tivo.com
>> > > > > > > >>>>>> %7Ce0235483b1eb4f259c5c08d7a8d1c16b%7Cd05b7c6912014c
>> > > > > > > >
>> > > > > > > >>>>>>
>> > > > > > > >>>
>> > > > > >
>> 0db45d7f1dcc227e4d%7C1%7C1%7C637163491160869277&amp;sdata=zYpCSFOsyN4%2B
>> > > > > > > > 4rKRZBQ%2FZvcGQ4EINR9Qm6PLsB7EKrc%3D&amp;reserved=0
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>>
>> > > > > > > > Currently, I seek to formalize our approach for this KIP
>> first
>> > > > > > > >>> before
>> > > > > > > >>>>>>>>>> we determine concrete API additions /
>> configurations. Some
>> > > > > > > >>>>>>>>>> configs might warrant adding, whiles others are
>> not necessary
>> > > > > > > >>>>>>>>>> since adding them would only increase complexity
>> of Kafka
>> > > > > > > >>>>>>>>>> Streams. Cheers, Richard
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>> ________________________________ This email and any
>> > > > > > attachments
>> > > > > > > >>>>>>>>>> may contain confidential and privileged material
>> for the sole
>> > > > > > > >>> use
>> > > > > > > >>>>>>>>>> of the intended recipient. Any review, copying, or
>> > > > > > distribution
>> > > > > > > >>>>>>>>>> of this email (or any attachments) by others is
>> prohibited. If
>> > > > > > > >>>>>>>>>> you are not the intended recipient, please contact
>> the sender
>> > > > > > > >>>>>>>>>> immediately and permanently delete this email and
>> any
>> > > > > > > >>>>>>>>>> attachments. No employee or agent of TiVo is
>> authorized to
>> > > > > > > >>>>>>>>>> conclude any binding agreement on behalf of TiVo
>> by email.
>> > > > > > > >>>>>>>>>> Binding agreements with TiVo may only be made by a
>> signed
>> > > > > > > >>> written
>> > > > > > > >>>>>>>>>> agreement. -- *Tommy Becker* *Principal Engineer *
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>> *Personalized Content Discovery*
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>> *O* +1 919.460.4747 *tivo.com* <
>> http://www.tivo.com/>
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>>
>> > > > > > > >>>>>>>>>> This email and any attachments may contain
>> confidential and
>> > > > > > > >>>>>>>>>> privileged material for the sole use of the
>> intended
>> > > > > > recipient.
>> > > > > > > >>>>>>>>>> Any review, copying, or distribution of this email
>> (or any
>> > > > > > > >>>>>>>>>> attachments) by others is prohibited. If you are
>> not the
>> > > > > > > >>> intended
>> > > > > > > >>>>>>>>>> recipient, please contact the sender immediately
>> and
>> > > > > > > >>> permanently
>> > > > > > > >>>>>>>>>> delete this email and any attachments. No employee
>> or agent of
>> > > > > > > >>>>>>>>>> TiVo is authorized to conclude any binding
>> agreement on behalf
>> > > > > > > >>> of
>> > > > > > > >>>>>>>>>> TiVo by email. Binding agreements with TiVo may
>> only be made
>> > > > > > > >>> by a
>> > > > > > > >>>>>>>>>> signed written agreement.
>> > > > > > > >>>>>>>>>
>> > > > > > > >>>>>>>>> --
>> > > > > > > >>>>>>>>>
>> > > > > > > >>>>>>>>> *Tommy Becker* /Principal Engineer /
>> > > > > > > >>>>>>>>>
>> > > > > > > >>>>>>>>> /Personalized Content Discovery/
>> > > > > > > >>>>>>>>>
>> > > > > > > >>>>>>>>> *O* +1 919.460.4747 *tivo.com* <
>> http://www.tivo.com/>
>> > > > > > > >>>>>>>>>
>> > > > > > > >>>>>>>>>
>> > > > > > > >>>>>>>>>
>> > > > > > > >>>>>>
>> > > > > >
>> ----------------------------------------------------------------------
>> > > > > > > >>>>>>>
>> > > > > > > >>>>>>
>> > > > > > > >>>>>
>> > > > > > > >>>>
>> > > > > > > >>>
>> > > > > > > >>
>> > > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > Attachments:
>> > > > > > > * signature.asc
>> > > > > >
>> > > >
>> >
>>
>

Reply via email to