Hi Bruno, Hi John,

Thanks for your comments! I updated the KIP accordingly, and it looks like
for quite a few points. I was doing some beating around the bush which
could've been avoided.

Looks like we can reduce the metric to Level 1 (per processor node) then.

I've cleaned up most of the unnecessary info, and we should be fairly close.
I will start working on a PR soon for this KIP. (although we might split
that up into stages)

Cheers,
Richard

On Thu, Feb 27, 2020 at 6:06 AM Bruno Cadonna <br...@confluent.io> wrote:

> Hi John,
>
> I agree with you. It is better to measure the metric on processor node
> level. The users can do the rollup to task-level by themselves.
>
> Best,
> Bruno
>
> On Thu, Feb 27, 2020 at 12:09 AM John Roesler <vvcep...@apache.org> wrote:
> >
> > Hi Richard,
> >
> > I've been making a final pass over the KIP.
> >
> > Re: Proposed Behavior Change:
> >
> > I think this point is controversial and probably doesn't need to be
> there at all:
> > > 2.b. In certain situations where there is a high volume of idempotent
> > > updates throughout the Streams DAG, it will be recommended practice
> > > to materialize all operations to reduce traffic overall across the
> entire
> > >  network of nodes.
> >
> > Re-reading all the points, it seems like we can sum them up in a way
> that's
> > a little more straight to the point, and gives us the right amount of
> flexibility:
> >
> > > Proposed Behavior Changes
> > >
> > > Definition: "idempotent update" is one in which the new result and
> prior
> > > result,  when serialized, are identical byte arrays
> > >
> > > Note: an "update" is a concept that only applies to Table operations,
> so
> > > the concept of an "idempotent update" also only applies to Table
> operations.
> > > See
> https://kafka.apache.org/documentation/streams/developer-guide/dsl-api.html#streams_concepts_ktable
> > > for more information.
> > >
> > > Given that definition, we propose for Streams to drop idempotent
> updates
> > > in any situation where it's possible and convenient to do so. For
> example,
> > > any time we already have both the prior and new results serialized, we
> > > may compare them, and drop the update if it is idempotent.
> > >
> > > Note that under this proposal, we can implement idempotence checking
> > > in the following situations:
> > > 1. Any aggregation (for example, KGroupedStream, KGroupedTable,
> > >     TimeWindowedKStream, and SessionWindowedKStream operations)
> > > 2. Any Materialized KTable operation
> > > 3. Repartition operations, when we need to send both prior and new
> results
> >
> > Notice that in my proposed wording, we neither limit ourselves to just
> the
> > situations enumerated, nor promise to implement the optimization in every
> > possible situation. IMHO, this is the best way to propose such a feature.
> > That way, we have the flexibility to implement it in stages, and also to
> add
> > on to the implementation in the future.
> >
> >
> > Re: Metrics
> >
> > I agree with Bruno, although, I think it might just be a confusing
> statement.
> > It might be clearer to drop all the "discussion", and just say: "We will
> add a
> > metric to count the number of idempotent updates that we have dropped".
> >
> > Also, with respect to the metric, I'm wondering if the metric should be
> task-
> > level or processor-node-level. Since the interesting action takes place
> inside
> > individual processor nodes, I _think_ it would be higher leverage to just
> > measure it at the node level. WDYT?
> >
> > Re: Design Reasoning
> >
> > This section seems to be a little bit outdated. I also just noticed a
> "surprise"
> > configuration "timestamp.aggregation.selection.policy" hidden in point
> 1.a.
> > Is that part of the proposal? We haven't discussed it, and I think we
> were
> > talking about this KIP being "configuration free".
> >
> > There is also some discussion of discarded alternative in the Design
> Reasoning
> > section, which is confusing. Finally, there was a point there I didn't
> understand
> > at all, about stateless operators not being intended to load prior
> results.
> > This statement doesn't seem to be true, but it also doesn't seem to be
> relevant,
> > so maybe we can just drop it.
> >
> > Overall, it might help if you make a pass on this section, and just
> discuss as
> > briefly as possible the justification for the proposed behavior change,
> and
> > not adding a configuration. Try to avoid talking about things that we
> are not
> > proposing, since that will just lead to confusion.
> >
> > Similarly, I'd just completely remove the "Implementation [discarded]"
> section.
> > It was good to have this as part of the discussion initially, but as we
> move
> > toward a vote, it's better to just streamline the KIP document as much as
> > possible. Keeping a "discarded" section in the document will just make it
> > harder for new people to understand the proposal. We did the same thing
> > with KIP-441, where there were two prior drafts included at the end of
> the
> > document, and we just deleted them for clarity.
> >
> > I liked the "Compatibility" and "Rejected Alternatives" section. Very
> clear
> > and to the point.
> >
> > Thanks again for the contribution! I think once the KIP document is
> cleaned
> > up, we'll be in good shape to finalize the discussion.
> > -John
> >
> >
> > On Wed, Feb 26, 2020, at 07:27, Bruno Cadonna wrote:
> > > Hi Richard,
> > >
> > > 1. Could you change "idempotent update operations will only be dropped
> > > from KTables, not from other classes." -> idempotent update operations
> > > will only be dropped from materialized KTables? For non-materialized
> > > KTables -- as they can occur after optimization of the topology -- we
> > > cannot drop idempotent updates.
> > >
> > > 2. I cannot completely follow the metrics section. Do you want to
> > > record all idempotent updates or only the dropped ones? In particular,
> > > I do not understand the following sentences:
> > > "For that matter, even if we don't drop idempotent updates, we should
> > > at the very least record the number of idempotent updates that has
> > > been seen go through a particular processor."
> > > "Therefore, we should add some metrics which will count the number of
> > > idempotent updates that each node has seen."
> > > I do not see how we can record idempotent updates that we do not drop.
> > > If we see them, we should drop them. If we do not see them, we cannot
> > > drop them and we cannot record them.
> > >
> > > Best,
> > > Bruno
> > >
> > > On Wed, Feb 26, 2020 at 4:57 AM Richard Yu <yohan.richard...@gmail.com>
> wrote:
> > > >
> > > > Hi John,
> > > >
> > > > Sounds goods. It looks like we are close to wrapping things up. If
> there
> > > > isn't any other revisions which needs to be made. (If so, please
> comment in
> > > > the thread)
> > > > I will start the voting process this Thursday (Pacific Standard
> Time).
> > > >
> > > > Cheers,
> > > > Richard
> > > >
> > > > On Tue, Feb 25, 2020 at 11:59 AM John Roesler <vvcep...@apache.org>
> wrote:
> > > >
> > > > > Hi Richard,
> > > > >
> > > > > Sorry for the slow reply. I actually think we should avoid checking
> > > > > equals() for now. Your reasoning is good, but the truth is that
> > > > > depending on the implementation of equals() is non-trivial,
> > > > > semantically, and (though I proposed it before), I'm not convinced
> > > > > it's worth the risk. Much better to start with exactly one kind of
> > > > > "idempotence detection".
> > > > >
> > > > > Even if someone does update their serdes, we know that the new
> > > > > serde would still be able to _de_serialize the old format, or the
> whole
> > > > > app would break. The situation is that the new result gets encoded
> > > > > in the new binary format, which means we don't detect an idempotent
> > > > > update for what it is. In this case, we'd write the new binary
> format to
> > > > > disk and the changelog, and forward it downstream. However, we only
> > > > > do this once. Now that the binary format for that record has been
> updated,
> > > > > we would correctly detect idempotence of any subsequent updates.
> > > > >
> > > > > Plus, we would still be able to filter out idempotent updates in
> > > > > repartition
> > > > > sinks, since for those, we use the new serde to serialize both the
> "old"
> > > > > and
> > > > > "new" result.
> > > > >
> > > > > It's certainly a good observation, but I think we can just make a
> note of
> > > > > it
> > > > > in "rejected alternatives" for now, and plan to refine it later,
> if it does
> > > > > pose a big performance problem.
> > > > >
> > > > > Thanks!
> > > > > -John
> > > > >
> > > > > On Sat, Feb 22, 2020, at 18:14, Richard Yu wrote:
> > > > > > Hi all,
> > > > > >
> > > > > > Updated the KIP.
> > > > > >
> > > > > > Just a question: do you think it would be a good idea if we
> check for
> > > > > both
> > > > > > Object#equals() and binary equality?
> > > > > > Because there might be some subtle changes in the serialization
> (for
> > > > > > example, if the user decides to upgrade their serialization
> procedure to
> > > > > a
> > > > > > new one), but the underlying values of the result might be the
> same.
> > > > > > (therefore equals() might return true)
> > > > > >
> > > > > > Do you think this would be plausible?
> > > > > >
> > > > > > Cheers,
> > > > > > Richard
> > > > > >
> > > > > > On Fri, Feb 21, 2020 at 2:37 PM Richard Yu <
> yohan.richard...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hello,
> > > > > > >
> > > > > > > Just to make some updates. I changed the name of the metric so
> that it
> > > > > was
> > > > > > > more in line with usual Kafka naming conventions for metrics /
> sensors.
> > > > > > > Below is the updated description of the metric:
> > > > > > >
> > > > > > > dropped-idempotent-updates : (Level 2 - Per Task) DEBUG (rate
> | total)
> > > > > > >
> > > > > > > Description: This metric will record the number of updates
> that have
> > > > > been
> > > > > > > dropped since they are essentially re-performing an earlier
> operation.
> > > > > > >
> > > > > > > Note:
> > > > > > >
> > > > > > >    - The rate option indicates the ratio of records dropped to
> actual
> > > > > > >    volume of records passing through the task.
> > > > > > >    - The total option will just give a raw count of the number
> of
> > > > > records
> > > > > > >    dropped.
> > > > > > >
> > > > > > >
> > > > > > > I hope that this is more on point.
> > > > > > >
> > > > > > > Best,
> > > > > > > Richard
> > > > > > >
> > > > > > > On Fri, Feb 21, 2020 at 2:20 PM Richard Yu <
> yohan.richard...@gmail.com
> > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > >> Hi all,
> > > > > > >>
> > > > > > >> Thanks for the clarification. I was just confused a little on
> what was
> > > > > > >> going on.
> > > > > > >>
> > > > > > >> So I guess then that for the actual proposal. We got the
> following:
> > > > > > >>
> > > > > > >> 1. We check for binary equality, and perform no extra look
> ups.
> > > > > > >> 2. Emphasize that this applies only to materialized tables.
> > > > > > >> 3. We drop aggregation updates if key, value and timestamp is
> the
> > > > > same.
> > > > > > >>
> > > > > > >> Then that settles the behavior changes. So it looks like the
> Metric
> > > > > that
> > > > > > >> is the only thing that is left. In this case, I think the
> metric
> > > > > would be
> > > > > > >> named the following: IdempotentUpdateMetric. This is mostly
> off the
> > > > > top of
> > > > > > >> my head. So if you think that we change it, feel free to say
> so.
> > > > > > >> The metric will report the number of dropped operations
> inherently.
> > > > > > >>
> > > > > > >> It will probably be added as a Sensor, similar to the dropped
> records
> > > > > > >> sensor we already have.
> > > > > > >>
> > > > > > >> If there isn't anything else, I will probably start the
> voting process
> > > > > > >> next week!
> > > > > > >>
> > > > > > >> Cheers,
> > > > > > >> Richard
> > > > > > >>
> > > > > > >>
> > > > > > >> On Fri, Feb 21, 2020 at 11:23 AM John Roesler <
> vvcep...@apache.org>
> > > > > > >> wrote:
> > > > > > >>
> > > > > > >>> Hi Bruno,
> > > > > > >>>
> > > > > > >>> Thanks for the clarification. Indeed, I was thinking two
> things:
> > > > > > >>> 1. For the initial implementation, we can just avoid adding
> any extra
> > > > > > >>> lookups, but only do the comparison when we already happen
> to have
> > > > > > >>> the prior value.
> > > > > > >>> 2. I think, as a result of the timestamp semantics, we
> actually _do_
> > > > > look
> > > > > > >>> up the prior value approximately all the time, so the
> idempotence
> > > > > check
> > > > > > >>> should be quite effective.
> > > > > > >>>
> > > > > > >>> I think that second point is the same thing you're referring
> to
> > > > > > >>> potentially
> > > > > > >>> being unnecessary. It does mean that we do fetch the whole
> value in a
> > > > > > >>> lot of cases where we really only need the timestamp, so it
> could
> > > > > > >>> certainly
> > > > > > >>> be optimized in the future. In that future, we would need to
> weigh
> > > > > that
> > > > > > >>> optimization against losing the idempotence check. But,
> that's a
> > > > > problem
> > > > > > >>> for tomorrow :)
> > > > > > >>>
> > > > > > >>> I'm 100% on board with scrutinizing the performance as we
> implement
> > > > > > >>> this feature.
> > > > > > >>>
> > > > > > >>> Thanks again,
> > > > > > >>> -John
> > > > > > >>>
> > > > > > >>> On Thu, Feb 20, 2020, at 03:25, Bruno Cadonna wrote:
> > > > > > >>> > Hi John,
> > > > > > >>> >
> > > > > > >>> > I am glad to help you with your imagination. With
> overhead, I
> > > > > mainly
> > > > > > >>> > meant the additional lookup into the state store to get
> the current
> > > > > > >>> > value, but I see now in the code that we do that lookup
> anyways
> > > > > > >>> > (although I think we could avoid that in the cases where
> we do not
> > > > > > >>> > need the old value). With or without config, we need to
> evaluate
> > > > > the
> > > > > > >>> > performance benefits of this change, in any case.
> > > > > > >>> >
> > > > > > >>> > Best,
> > > > > > >>> > Bruno
> > > > > > >>> >
> > > > > > >>> > On Wed, Feb 19, 2020 at 7:48 PM John Roesler <
> vvcep...@apache.org>
> > > > > > >>> wrote:
> > > > > > >>> > >
> > > > > > >>> > > Thanks for your remarks, Bruno!
> > > > > > >>> > >
> > > > > > >>> > > I'm in favor of standardizing on terminology like "not
> forwarding
> > > > > > >>> > > idempotent updates" or "dropping idempotent updates".
> Maybe we
> > > > > > >>> > > should make a pass on the KIP and just convert
> everything to this
> > > > > > >>> > > phrasing. In retrospect, even the term "emit-on-change"
> has too
> > > > > much
> > > > > > >>> > > semantic baggage, since it implies the semantics from
> the SECRET
> > > > > > >>> > > paper, which we don't really want to imply here.
> > > > > > >>> > >
> > > > > > >>> > > I'm also in favor of the metric as you propose.
> > > > > > >>> > >
> > > > > > >>> > > Likewise with stream aggregations, I was also under the
> > > > > impression
> > > > > > >>> > > that we agreed on dropping idempotent updates to the
> aggregation
> > > > > > >>> > > result, any time we find that our "new" (key, value,
> timestamp)
> > > > > > >>> result
> > > > > > >>> > > is identical to the prior one.
> > > > > > >>> > >
> > > > > > >>> > > Also, I'm +1 on all your recommendations for updating
> the KIP
> > > > > > >>> document
> > > > > > >>> > > for clarity.
> > > > > > >>> > >
> > > > > > >>> > > Regarding the opt-out config. Perhaps I'm suffering from
> a
> > > > > failure of
> > > > > > >>> > > imagination, but I don't see how the current proposal
> could
> > > > > really
> > > > > > >>> have
> > > > > > >>> > > a measurable impact on latency. If all we do is make a
> single
> > > > > extra
> > > > > > >>> pass
> > > > > > >>> > > to compare two byte arrays for equality, only in the
> cases where
> > > > > we
> > > > > > >>> already
> > > > > > >>> > > have the byte arrays available, it seems unlikely to
> measurably
> > > > > > >>> affect the
> > > > > > >>> > > processing of non-idempotent updates. It seems
> guaranteed to
> > > > > > >>> _decrease_
> > > > > > >>> > > the latency of processing idempotent updates, since we
> get to
> > > > > skip a
> > > > > > >>> > > store#put, at least one producer#send, and also all
> downstream
> > > > > > >>> processing,
> > > > > > >>> > > including all the disk and network operations associated
> with
> > > > > > >>> downstream
> > > > > > >>> > > operations.
> > > > > > >>> > >
> > > > > > >>> > > It seems like if we're pretty sure this change would only
> > > > > _help_, we
> > > > > > >>> shouldn't
> > > > > > >>> > > introduce the operational burden of an extra
> configuration. If we
> > > > > > >>> want to
> > > > > > >>> > > be more aggressive about dropping idempotent operations
> in the
> > > > > > >>> future,
> > > > > > >>> > > such as depending on equals() or adding a ChangeDetector
> > > > > interface,
> > > > > > >>> then
> > > > > > >>> > > we should consider adding a configuration as part of
> that future
> > > > > > >>> work. In
> > > > > > >>> > > fact, if we add a simple "opt-in/opt-out" switch right
> now, we
> > > > > might
> > > > > > >>> find
> > > > > > >>> > > that it's actually insufficient for whatever future
> feature we
> > > > > might
> > > > > > >>> propose,
> > > > > > >>> > > then we have a mess of deprecating the opt-out config and
> > > > > replacing
> > > > > > >>> it.
> > > > > > >>> > >
> > > > > > >>> > > What do you think?
> > > > > > >>> > > -John
> > > > > > >>> > >
> > > > > > >>> > > On Wed, Feb 19, 2020, at 09:50, Bruno Cadonna wrote:
> > > > > > >>> > > > Hi all,
> > > > > > >>> > > >
> > > > > > >>> > > > Sorry for the late reply!
> > > > > > >>> > > >
> > > > > > >>> > > > I am also in favour of baby steps.
> > > > > > >>> > > >
> > > > > > >>> > > > I am undecided whether the KIP should contain a
> opt-out config
> > > > > or
> > > > > > >>> not.
> > > > > > >>> > > > The overhead of emit-on-change might affect latency.
> For
> > > > > > >>> applications
> > > > > > >>> > > > where low latency is crucial and there are not too many
> > > > > idempotent
> > > > > > >>> > > > updates, it would be better to fall back to
> emit-on-update.
> > > > > > >>> However,
> > > > > > >>> > > > we do not know how much emit-on-change impacts
> latency. We
> > > > > would
> > > > > > >>> first
> > > > > > >>> > > > need to benchmark that before we can decide about the
> > > > > > >>> opt-out-config.
> > > > > > >>> > > >
> > > > > > >>> > > > A metric of dropped idempotent updates seems useful to
> me to be
> > > > > > >>> > > > informed about potential upstream applications or
> upstream
> > > > > > >>> operators
> > > > > > >>> > > > that produce too many idempotent updates. The KIP
> should state
> > > > > the
> > > > > > >>> > > > name of the metric, its group, its tags, and its
> recording
> > > > > level
> > > > > > >>> (see
> > > > > > >>> > > > KIP-444 or KIP-471 for examples). I propose DEBUG as
> reporting
> > > > > > >>> level.
> > > > > > >>> > > >
> > > > > > >>> > > > Richard, what competing proposals for emit-on-change
> for
> > > > > > >>> aggregations
> > > > > > >>> > > > do you mean? I have the feeling that we agreed to get
> rid of
> > > > > > >>> > > > idempotent updates if the aggregate is updated with
> the same
> > > > > key,
> > > > > > >>> > > > value, AND timestamp. I am also fine if we do not
> include this
> > > > > into
> > > > > > >>> > > > this KIP (remember: baby steps).
> > > > > > >>> > > >
> > > > > > >>> > > > You write that "emit-on-change is more correct". Since
> we
> > > > > agreed
> > > > > > >>> that
> > > > > > >>> > > > this is an optimization, IMO you cannot argue this way.
> > > > > > >>> > > >
> > > > > > >>> > > > Please put "Alternative Approaches" under "Rejected
> > > > > Alternatives",
> > > > > > >>> so
> > > > > > >>> > > > that it becomes clear that we are not going to
> implement them.
> > > > > In
> > > > > > >>> > > > general, I think the KIP needs a bit of clean-up
> (probably, you
> > > > > > >>> > > > already planned for it). "Design Reasoning" is a bit of
> > > > > behavior
> > > > > > >>> > > > changes, rejected alternatives and duplicates a bit the
> > > > > content in
> > > > > > >>> > > > those sections.
> > > > > > >>> > > >
> > > > > > >>> > > > I do not like the name "no-op operations" or "no-ops",
> because
> > > > > they
> > > > > > >>> > > > are rather generic. I like more "idempotent updates".
> > > > > > >>> > > >
> > > > > > >>> > > > Best,
> > > > > > >>> > > > Bruno
> > > > > > >>> > > >
> > > > > > >>> > > >
> > > > > > >>> > > > On Tue, Feb 18, 2020 at 7:25 PM Richard Yu <
> > > > > > >>> yohan.richard...@gmail.com> wrote:
> > > > > > >>> > > > >
> > > > > > >>> > > > > Hi all,
> > > > > > >>> > > > >
> > > > > > >>> > > > > We are definitely making progress!
> > > > > > >>> > > > >
> > > > > > >>> > > > > @John should I emphasize in the proposed behavior
> changes
> > > > > that
> > > > > > >>> we are only
> > > > > > >>> > > > > doing binary equality checks for stateful operators?
> > > > > > >>> > > > > It looks like we have come close to finalizing this
> part of
> > > > > the
> > > > > > >>> KIP. (I
> > > > > > >>> > > > > will note in the KIP that this proposal is intended
> for
> > > > > > >>> optimization, not
> > > > > > >>> > > > > semantics correctness)
> > > > > > >>> > > > >
> > > > > > >>> > > > > I do think maybe we still have one other detail we
> need to
> > > > > > >>> discuss. So far,
> > > > > > >>> > > > > there has been quite a bit of back and forth about
> what the
> > > > > > >>> behavior of
> > > > > > >>> > > > > aggregations should look like in emit on change. I
> have seen
> > > > > > >>> > > > > multiple competing proposals, so I am not completely
> certain
> > > > > > >>> which one we
> > > > > > >>> > > > > should go with, or how we will be able to compromise
> in
> > > > > between
> > > > > > >>> them.
> > > > > > >>> > > > >
> > > > > > >>> > > > > Let me know what your thoughts are on this matter,
> since we
> > > > > are
> > > > > > >>> probably
> > > > > > >>> > > > > close to wrapping up most other stuff.
> > > > > > >>> > > > > @Matthias J. Sax <matth...@confluent.io>  and
> @Bruno, see
> > > > > what
> > > > > > >>> you think
> > > > > > >>> > > > > about this.
> > > > > > >>> > > > >
> > > > > > >>> > > > > Best,
> > > > > > >>> > > > > Richard
> > > > > > >>> > > > >
> > > > > > >>> > > > >
> > > > > > >>> > > > >
> > > > > > >>> > > > > On Tue, Feb 18, 2020 at 9:06 AM John Roesler <
> > > > > > >>> vvcep...@apache.org> wrote:
> > > > > > >>> > > > >
> > > > > > >>> > > > > > Thanks, Matthias!
> > > > > > >>> > > > > >
> > > > > > >>> > > > > > Regarding numbers, it would be hard to know how
> many
> > > > > > >>> applications
> > > > > > >>> > > > > > would benefit, since we don't know how many
> applications
> > > > > there
> > > > > > >>> are,
> > > > > > >>> > > > > > or anything about their data sets or topologies.
> We could
> > > > > do a
> > > > > > >>> survey,
> > > > > > >>> > > > > > but it seems overkill if we take the conservative
> approach.
> > > > > > >>> > > > > >
> > > > > > >>> > > > > > I have my own practical stream processing
> experience that
> > > > > > >>> tells me this
> > > > > > >>> > > > > > is absolutely critical for any moderate-to-large
> relational
> > > > > > >>> stream
> > > > > > >>> > > > > > processing use cases. I'll leave it to you to
> decide if you
> > > > > > >>> find that
> > > > > > >>> > > > > > convincing, but it's definitely not an
> _assumption_. I've
> > > > > also
> > > > > > >>> heard from
> > > > > > >>> > > > > > a few Streams users who have already had to
> implement
> > > > > their own
> > > > > > >>> > > > > > noop-suppression transformers in order to get to
> production
> > > > > > >>> scale.
> > > > > > >>> > > > > >
> > > > > > >>> > > > > > Regardless, it sounds like we can agree on taking
> an
> > > > > > >>> opportunistic approach
> > > > > > >>> > > > > > and targeting the optimization just to use a
> > > > > binary-equality
> > > > > > >>> check at
> > > > > > >>> > > > > > stateful operators. (I'd also suggest in sink
> nodes, when
> > > > > we
> > > > > > >>> are about to
> > > > > > >>> > > > > > send old and new values, since they are also
> already
> > > > > present
> > > > > > >>> and serialized
> > > > > > >>> > > > > > at that point.) We could make the KIP even more
> vague, and
> > > > > > >>> just say that
> > > > > > >>> > > > > > we'll drop no-op updates "when possible".
> > > > > > >>> > > > > >
> > > > > > >>> > > > > > I'm curious what Bruno and the others think about
> this. If
> > > > > it
> > > > > > >>> seems like
> > > > > > >>> > > > > > a good starting point, perhaps we could move to a
> vote soon
> > > > > > >>> and get to
> > > > > > >>> > > > > > work on the implementation!
> > > > > > >>> > > > > >
> > > > > > >>> > > > > > Thanks,
> > > > > > >>> > > > > > -John
> > > > > > >>> > > > > >
> > > > > > >>> > > > > > On Mon, Feb 17, 2020, at 20:54, Matthias J. Sax
> wrote:
> > > > > > >>> > > > > > > Talking about optimizations and reducing
> downstream load:
> > > > > > >>> > > > > > >
> > > > > > >>> > > > > > > Do we actually have any numbers? I have the
> impression
> > > > > that
> > > > > > >>> this KIP is
> > > > > > >>> > > > > > > more or less build on the _assumption_ that
> there is a
> > > > > > >>> problem. Yes,
> > > > > > >>> > > > > > > there are some use cases that would benefit from
> this;
> > > > > But
> > > > > > >>> how many
> > > > > > >>> > > > > > > applications would actually benefit? And how
> much load
> > > > > > >>> reduction would
> > > > > > >>> > > > > > > they get?
> > > > > > >>> > > > > > >
> > > > > > >>> > > > > > > The simplest approach (following John idea to
> make baby
> > > > > > >>> steps) would be
> > > > > > >>> > > > > > > to apply the emit-on-change pattern only if
> there is a
> > > > > > >>> store. For this
> > > > > > >>> > > > > > > case we need to serialize old and new result
> anyway and
> > > > > thus
> > > > > > >>> a simple
> > > > > > >>> > > > > > > byte-array comparison is no overhead.
> > > > > > >>> > > > > > >
> > > > > > >>> > > > > > > Sending `oldValues` by default would become
> expensive
> > > > > > >>> because we would
> > > > > > >>> > > > > > > need to serialize the recomputed old result, as
> well as
> > > > > the
> > > > > > >>> new result,
> > > > > > >>> > > > > > > to make the comparison (and we now the
> serialization is
> > > > > not
> > > > > > >>> cheap). We
> > > > > > >>> > > > > > > are facing a trade-off between CPU overhead and
> > > > > downstream
> > > > > > >>> load and I am
> > > > > > >>> > > > > > > not sure if we should hard code this. My original
> > > > > argument
> > > > > > >>> for sending
> > > > > > >>> > > > > > > `oldValues` was about semantics; but for an
> > > > > optimization, I
> > > > > > >>> am not sure
> > > > > > >>> > > > > > > if this would be the right choice.
> > > > > > >>> > > > > > >
> > > > > > >>> > > > > > > For now, users who want to opt-in can force a
> > > > > > >>> materialization. A
> > > > > > >>> > > > > > > materialization may be expensive and if we see
> future
> > > > > > >>> demand, we could
> > > > > > >>> > > > > > > still add an option to send `oldValues` instead
> of
> > > > > > >>> materialization (this
> > > > > > >>> > > > > > > would at least save the store overhead). As we
> consider
> > > > > the
> > > > > > >>> KIP an
> > > > > > >>> > > > > > > optimization, a "config" seems to make sense.
> > > > > > >>> > > > > > >
> > > > > > >>> > > > > > >
> > > > > > >>> > > > > > > -Matthias
> > > > > > >>> > > > > > >
> > > > > > >>> > > > > > >
> > > > > > >>> > > > > > > On 2/17/20 5:21 PM, Richard Yu wrote:
> > > > > > >>> > > > > > > > Hi John!
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > > Thanks for the reply.
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > > About the changes we have discussed so far. I
> think
> > > > > upon
> > > > > > >>> further
> > > > > > >>> > > > > > > > consideration, we have been mostly talking
> about this
> > > > > from
> > > > > > >>> the
> > > > > > >>> > > > > > perspective
> > > > > > >>> > > > > > > > that no stop-gap effort is acceptable.
> However, in
> > > > > recent
> > > > > > >>> discussion,
> > > > > > >>> > > > > > > if we
> > > > > > >>> > > > > > > > consider optimization, then it appears that the
> > > > > > >>> perspective I
> > > > > > >>> > > > > > mentioned no
> > > > > > >>> > > > > > > > longer applies. After all, we are no longer
> concerned
> > > > > so
> > > > > > >>> much about
> > > > > > >>> > > > > > > > semantics correctness, then reducing traffic
> as much as
> > > > > > >>> possible
> > > > > > >>> > > > > > without
> > > > > > >>> > > > > > > > performance tradeoffs.
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > > In this case, I think a cache would be a good
> idea for
> > > > > > >>> stateless
> > > > > > >>> > > > > > > > operations. This cache will not be backed by a
> store
> > > > > > >>> obviously. We can
> > > > > > >>> > > > > > > > probably use Kafka's ThreadCache. We should be
> able to
> > > > > > >>> catch a large
> > > > > > >>> > > > > > > > portion of the no-ops if we at least store some
> > > > > results in
> > > > > > >>> the cache.
> > > > > > >>> > > > > > Not
> > > > > > >>> > > > > > > > all will be caught, but I think the impact
> will be
> > > > > > >>> significant.
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > > On another note, I think that we should
> implement
> > > > > > >>> competing proposals
> > > > > > >>> > > > > > i.e.
> > > > > > >>> > > > > > > > one where we forward both old and new values
> with a
> > > > > > >>> reasonable
> > > > > > >>> > > > > > proportion
> > > > > > >>> > > > > > > > of artificial no-ops (we do not necessarily
> have to
> > > > > rely
> > > > > > >>> on equals so
> > > > > > >>> > > > > > much
> > > > > > >>> > > > > > > > as comparing the serialized binary data after
> the
> > > > > > >>> operation), and in
> > > > > > >>> > > > > > > > another scenario, the cache for stateless ops.
> It
> > > > > would be
> > > > > > >>> > > > > > unreasonable if
> > > > > > >>> > > > > > > > we completely disregard either approach, since
> they
> > > > > both
> > > > > > >>> have merit.
> > > > > > >>> > > > > > The
> > > > > > >>> > > > > > > > reason for implementing both is to perform
> benchmark
> > > > > tests
> > > > > > >>> on them, and
> > > > > > >>> > > > > > > > compare them with the original. This way, we
> can more
> > > > > > >>> clearly see what
> > > > > > >>> > > > > > is
> > > > > > >>> > > > > > > > the drawbacks and the gains. So far, we have
> been
> > > > > > >>> discussing only
> > > > > > >>> > > > > > > > hypotheticals, and if we continue to do so, I
> think it
> > > > > is
> > > > > > >>> likely no
> > > > > > >>> > > > > > ground
> > > > > > >>> > > > > > > > will be gained.
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > > After all, what we seek is optimization, and
> > > > > performance
> > > > > > >>> benchmarks
> > > > > > >>> > > > > > > will be
> > > > > > >>> > > > > > > > mandatory for a KIP of this nature.
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > > Hope this helps,
> > > > > > >>> > > > > > > > Richard
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > > On Mon, Feb 17, 2020 at 2:12 PM John Roesler <
> > > > > > >>> vvcep...@apache.org>
> > > > > > >>> > > > > > wrote:
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > >> Hi again, all,
> > > > > > >>> > > > > > > >>
> > > > > > >>> > > > > > > >> Sorry on my part for my silence.
> > > > > > >>> > > > > > > >>
> > > > > > >>> > > > > > > >> I've just taken another look over the recent
> history
> > > > > of
> > > > > > >>> this
> > > > > > >>> > > > > > > discussion. It
> > > > > > >>> > > > > > > >> seems like the #1 point to clarify (because
> it affect
> > > > > > >>> everything
> > > > > > >>> > > > > > else) is
> > > > > > >>> > > > > > > >> that,
> > > > > > >>> > > > > > > >> yes, I was 100% envisioning this as an
> _optimization_.
> > > > > > >>> > > > > > > >>
> > > > > > >>> > > > > > > >> As a consequence, I don't think it's critical
> to make
> > > > > any
> > > > > > >>> hard
> > > > > > >>> > > > > > guarantees
> > > > > > >>> > > > > > > >> about
> > > > > > >>> > > > > > > >> what results get forwarded and what (no-op
> updates)
> > > > > get
> > > > > > >>> dropped. I'd
> > > > > > >>> > > > > > > >> initially
> > > > > > >>> > > > > > > >> just been thinking about doing this
> opportunistically
> > > > > in
> > > > > > >>> cases where
> > > > > > >>> > > > > > we
> > > > > > >>> > > > > > > >> already
> > > > > > >>> > > > > > > >> had the "old" and "new" result in memory,
> thanks to a
> > > > > > >>> request to
> > > > > > >>> > > > > > > "emit old
> > > > > > >>> > > > > > > >> values", or to the implementation of timestamp
> > > > > semantics.
> > > > > > >>> > > > > > > >>
> > > > > > >>> > > > > > > >> However, whether or not it's semantically
> critical, I
> > > > > do
> > > > > > >>> think that
> > > > > > >>> > > > > > > >> Matthias's
> > > > > > >>> > > > > > > >> idea to use the change-forwarding mechanism
> to check
> > > > > for
> > > > > > >>> no-ops even
> > > > > > >>> > > > > > on
> > > > > > >>> > > > > > > >> stateless operations is pretty interesting.
> > > > > Specifically,
> > > > > > >>> this would
> > > > > > >>> > > > > > > >> _really_
> > > > > > >>> > > > > > > >> let you pare down useless updates by using
> mapValues
> > > > > to
> > > > > > >>> strip down
> > > > > > >>> > > > > > > records
> > > > > > >>> > > > > > > >> only
> > > > > > >>> > > > > > > >> to what you really need. However, the
> dependence on
> > > > > the
> > > > > > >>> > > > > > implementation of
> > > > > > >>> > > > > > > >> equals() is troubling.
> > > > > > >>> > > > > > > >>
> > > > > > >>> > > > > > > >> It might make sense to table this idea, as
> well as my
> > > > > > >>> complicated
> > > > > > >>> > > > > > no-op
> > > > > > >>> > > > > > > >> detection algorithm, and initially propose
> just a
> > > > > > >>> nonconfigurable
> > > > > > >>> > > > > > feature
> > > > > > >>> > > > > > > >> to
> > > > > > >>> > > > > > > >> check "old" and "new" results for binary
> equality
> > > > > before
> > > > > > >>> forwarding.
> > > > > > >>> > > > > > > I.e.,
> > > > > > >>> > > > > > > >> if
> > > > > > >>> > > > > > > >> any operation determines that the old and new
> results
> > > > > are
> > > > > > >>> > > > > > > >> binary-identical, we
> > > > > > >>> > > > > > > >> would not forward.
> > > > > > >>> > > > > > > >>
> > > > > > >>> > > > > > > >> I'll admit that this doesn't serve Tommy's
> use case
> > > > > very
> > > > > > >>> well, but it
> > > > > > >>> > > > > > > >> might be
> > > > > > >>> > > > > > > >> better to take baby steps with an
> optimization like
> > > > > this
> > > > > > >>> and not risk
> > > > > > >>> > > > > > > >> over-reaching in a way that actually harms
> > > > > performance or
> > > > > > >>> > > > > > correctness. We
> > > > > > >>> > > > > > > >> could
> > > > > > >>> > > > > > > >> always expand the feature to use equals() or
> some
> > > > > kind of
> > > > > > >>> > > > > > ChangeDetector
> > > > > > >>> > > > > > > >> later
> > > > > > >>> > > > > > > >> on, in a more focused discussion.
> > > > > > >>> > > > > > > >>
> > > > > > >>> > > > > > > >> Regarding metrics or debug logs, I guess I
> don't feel
> > > > > > >>> strongly, but it
> > > > > > >>> > > > > > > >> feels
> > > > > > >>> > > > > > > >> like two things will happen that make it
> nicer to add
> > > > > > >>> them:
> > > > > > >>> > > > > > > >>
> > > > > > >>> > > > > > > >> 1. This feature is going to surprise/annoy
> _somebody_,
> > > > > > >>> and it would be
> > > > > > >>> > > > > > > >> nice to
> > > > > > >>> > > > > > > >> be able to definitively say the reason that
> updates
> > > > > are
> > > > > > >>> dropped is
> > > > > > >>> > > > > > that
> > > > > > >>> > > > > > > >> they
> > > > > > >>> > > > > > > >> were no-ops. The easiest smoking gun is if
> there are
> > > > > > >>> debug-logs that
> > > > > > >>> > > > > > > can be
> > > > > > >>> > > > > > > >> enabled. This person might just be looking at
> the
> > > > > > >>> dashboards,
> > > > > > >>> > > > > > > wondering why
> > > > > > >>> > > > > > > >> there are 100K updates per second going into
> their
> > > > > app,
> > > > > > >>> but only 1K
> > > > > > >>> > > > > > > >> results per
> > > > > > >>> > > > > > > >> second coming out. Having the metric there
> makes the
> > > > > > >>> accounting
> > > > > > >>> > > > > > easier.
> > > > > > >>> > > > > > > >>
> > > > > > >>> > > > > > > >> 2. Somebody is going to struggle with
> high-volume
> > > > > > >>> updates, and it
> > > > > > >>> > > > > > > would be
> > > > > > >>> > > > > > > >> nice
> > > > > > >>> > > > > > > >> for them to know that this feature is saving
> them
> > > > > > >>> X-thousand updates
> > > > > > >>> > > > > > per
> > > > > > >>> > > > > > > >> second,
> > > > > > >>> > > > > > > >> etc.
> > > > > > >>> > > > > > > >>
> > > > > > >>> > > > > > > >> What does everyone think about this? Note, as
> I read
> > > > > it,
> > > > > > >>> what I've
> > > > > > >>> > > > > > said
> > > > > > >>> > > > > > > >> above is
> > > > > > >>> > > > > > > >> already reflected in the text of the KIP.
> > > > > > >>> > > > > > > >>
> > > > > > >>> > > > > > > >> Thanks,
> > > > > > >>> > > > > > > >> -John
> > > > > > >>> > > > > > > >>
> > > > > > >>> > > > > > > >>
> > > > > > >>> > > > > > > >> On Tue, Feb 11, 2020, at 18:27, Richard Yu
> wrote:
> > > > > > >>> > > > > > > >>> Hi all,
> > > > > > >>> > > > > > > >>>
> > > > > > >>> > > > > > > >>> Bumping this. If you feel that this KIP is
> not too
> > > > > > >>> urgent. Then let
> > > > > > >>> > > > > > me
> > > > > > >>> > > > > > > >>> know. :)
> > > > > > >>> > > > > > > >>>
> > > > > > >>> > > > > > > >>> Cheers,
> > > > > > >>> > > > > > > >>> Richard
> > > > > > >>> > > > > > > >>>
> > > > > > >>> > > > > > > >>> On Thu, Feb 6, 2020 at 4:55 PM Richard Yu <
> > > > > > >>> > > > > > yohan.richard...@gmail.com>
> > > > > > >>> > > > > > > >>> wrote:
> > > > > > >>> > > > > > > >>>
> > > > > > >>> > > > > > > >>>> Hi all,
> > > > > > >>> > > > > > > >>>>
> > > > > > >>> > > > > > > >>>> I've had just a few thoughts regarding the
> > > > > forwarding
> > > > > > >>> of <key,
> > > > > > >>> > > > > > > >>>> change<old_value, new_value>>. As Matthias
> already
> > > > > > >>> mentioned, there
> > > > > > >>> > > > > > > >> are two
> > > > > > >>> > > > > > > >>>> separate priorities by which we can judge
> this KIP:
> > > > > > >>> > > > > > > >>>>
> > > > > > >>> > > > > > > >>>> 1. A optimization perspective: In this
> case, the
> > > > > user
> > > > > > >>> would prefer
> > > > > > >>> > > > > > the
> > > > > > >>> > > > > > > >>>> impact of this KIP to be as minimal as
> possible. By
> > > > > > >>> such logic, if
> > > > > > >>> > > > > > > >>>> stateless operations are performed twice,
> that could
> > > > > > >>> prove
> > > > > > >>> > > > > > > >> unacceptable for
> > > > > > >>> > > > > > > >>>> them. (since operations can prove expensive)
> > > > > > >>> > > > > > > >>>>
> > > > > > >>> > > > > > > >>>> 2. Semantics correctness perspective:
> Unlike the
> > > > > > >>> optimization
> > > > > > >>> > > > > > > >> approach, we
> > > > > > >>> > > > > > > >>>> are more concerned with all KTable
> operations
> > > > > obeying
> > > > > > >>> the same
> > > > > > >>> > > > > > emission
> > > > > > >>> > > > > > > >>>> policy. i.e. emit on change. In this case, a
> > > > > > >>> discrepancy would not
> > > > > > >>> > > > > > be
> > > > > > >>> > > > > > > >>>> tolerated, even though an extra performance
> cost
> > > > > will
> > > > > > >>> be incurred.
> > > > > > >>> > > > > > > >>>> Therefore, we will follow Matthias's
> approach, and
> > > > > then
> > > > > > >>> perform the
> > > > > > >>> > > > > > > >>>> operation once on the old value, and once
> on the
> > > > > new.
> > > > > > >>> > > > > > > >>>>
> > > > > > >>> > > > > > > >>>> The issue here I think is more black and
> white than
> > > > > in
> > > > > > >>> between. The
> > > > > > >>> > > > > > > >> second
> > > > > > >>> > > > > > > >>>> option in particular would be favorable for
> users
> > > > > with
> > > > > > >>> inexpensive
> > > > > > >>> > > > > > > >>>> stateless operations, while for the former
> option,
> > > > > we
> > > > > > >>> are probably
> > > > > > >>> > > > > > > >> dealing
> > > > > > >>> > > > > > > >>>> with more expensive ones. So the simplest
> solution
> > > > > is
> > > > > > >>> probably to
> > > > > > >>> > > > > > > >> allow the
> > > > > > >>> > > > > > > >>>> user to choose one of the behaviors, and
> have a
> > > > > config
> > > > > > >>> which can
> > > > > > >>> > > > > > > >> switch in
> > > > > > >>> > > > > > > >>>> between them.
> > > > > > >>> > > > > > > >>>>
> > > > > > >>> > > > > > > >>>> Its the simplest compromise I can come up
> with at
> > > > > the
> > > > > > >>> moment, but if
> > > > > > >>> > > > > > > >> you
> > > > > > >>> > > > > > > >>>> think you have a better plan which could
> better
> > > > > balance
> > > > > > >>> tradeoffs.
> > > > > > >>> > > > > > Then
> > > > > > >>> > > > > > > >>>> please let us know. :)
> > > > > > >>> > > > > > > >>>>
> > > > > > >>> > > > > > > >>>> Best,
> > > > > > >>> > > > > > > >>>> Richard
> > > > > > >>> > > > > > > >>>>
> > > > > > >>> > > > > > > >>>> On Wed, Feb 5, 2020 at 5:12 PM John Roesler
> <
> > > > > > >>> vvcep...@apache.org>
> > > > > > >>> > > > > > > >> wrote:
> > > > > > >>> > > > > > > >>>>
> > > > > > >>> > > > > > > >>>>> Hi all,
> > > > > > >>> > > > > > > >>>>>
> > > > > > >>> > > > > > > >>>>> Thanks for the thoughtful comments!
> > > > > > >>> > > > > > > >>>>>
> > > > > > >>> > > > > > > >>>>> I need more time to reflect on your
> thoughts, but
> > > > > just
> > > > > > >>> wanted to
> > > > > > >>> > > > > > offer
> > > > > > >>> > > > > > > >>>>> a quick clarification about equals().
> > > > > > >>> > > > > > > >>>>>
> > > > > > >>> > > > > > > >>>>> I only meant that we can't be sure if a
> class's
> > > > > > >>> equals()
> > > > > > >>> > > > > > > >> implementation
> > > > > > >>> > > > > > > >>>>> returns true for two semantically identical
> > > > > instances.
> > > > > > >>> I.e., if a
> > > > > > >>> > > > > > > >> class
> > > > > > >>> > > > > > > >>>>> doesn't
> > > > > > >>> > > > > > > >>>>> override the default equals()
> implementation, then
> > > > > we
> > > > > > >>> would see
> > > > > > >>> > > > > > > >> behavior
> > > > > > >>> > > > > > > >>>>> like:
> > > > > > >>> > > > > > > >>>>>
> > > > > > >>> > > > > > > >>>>> new MyPair("A", 1).equals(new MyPair("A",
> 1))
> > > > > returns
> > > > > > >>> false
> > > > > > >>> > > > > > > >>>>>
> > > > > > >>> > > > > > > >>>>> In that case, I would still like to catch
> no-op
> > > > > > >>> updates by
> > > > > > >>> > > > > > comparing
> > > > > > >>> > > > > > > >> the
> > > > > > >>> > > > > > > >>>>> serialized form of the records when we
> happen to
> > > > > have
> > > > > > >>> it serialized
> > > > > > >>> > > > > > > >> anyway
> > > > > > >>> > > > > > > >>>>> (such as when the operation is stateful,
> or when
> > > > > we're
> > > > > > >>> sending to a
> > > > > > >>> > > > > > > >>>>> repartition topic and we have both the
> "new" and
> > > > > "old"
> > > > > > >>> value from
> > > > > > >>> > > > > > > >>>>> upstream).
> > > > > > >>> > > > > > > >>>>>
> > > > > > >>> > > > > > > >>>>> I didn't mean to suggest we'd try to use
> > > > > reflection to
> > > > > > >>> detect
> > > > > > >>> > > > > > whether
> > > > > > >>> > > > > > > >>>>> equals
> > > > > > >>> > > > > > > >>>>> is implemented, although that is a neat
> trick. I
> > > > > was
> > > > > > >>> thinking more
> > > > > > >>> > > > > > of
> > > > > > >>> > > > > > > >> a
> > > > > > >>> > > > > > > >>>>> belt-and-suspenders algorithm where we do
> the check
> > > > > > >>> for no-ops
> > > > > > >>> > > > > > based
> > > > > > >>> > > > > > > >> on
> > > > > > >>> > > > > > > >>>>> equals() and then _also_ check the
> serialized bytes
> > > > > > >>> for equality.
> > > > > > >>> > > > > > > >>>>>
> > > > > > >>> > > > > > > >>>>> Thanks,
> > > > > > >>> > > > > > > >>>>> -John
> > > > > > >>> > > > > > > >>>>>
> > > > > > >>> > > > > > > >>>>> On Wed, Feb 5, 2020, at 15:31, Ted Yu
> wrote:
> > > > > > >>> > > > > > > >>>>>> Thanks for the comments, Matthias.
> > > > > > >>> > > > > > > >>>>>>
> > > > > > >>> > > > > > > >>>>>> w.r.t. requirement of an `equals()`
> > > > > implementation,
> > > > > > >>> each template
> > > > > > >>> > > > > > > >> type
> > > > > > >>> > > > > > > >>>>>> would have an equals() method. We can use
> the
> > > > > > >>> following code to
> > > > > > >>> > > > > > know
> > > > > > >>> > > > > > > >>>>>> whether it is provided by JVM or provided
> by user.
> > > > > > >>> > > > > > > >>>>>>
> > > > > > >>> > > > > > > >>>>>> boolean customEquals = false;
> > > > > > >>> > > > > > > >>>>>> try {
> > > > > > >>> > > > > > > >>>>>>     Class cls =
> > > > > value.getClass().getMethod("equals",
> > > > > > >>> > > > > > > >>>>>> Object.class).getDeclaringClass();
> > > > > > >>> > > > > > > >>>>>>     if (!Object.class.equals(cls)) {
> > > > > > >>> > > > > > > >>>>>>         customEquals = true;
> > > > > > >>> > > > > > > >>>>>>     }
> > > > > > >>> > > > > > > >>>>>> } catch (NoSuchMethodException nsme) {
> > > > > > >>> > > > > > > >>>>>>     // equals is always defined, this
> wouldn't hit
> > > > > > >>> > > > > > > >>>>>> }
> > > > > > >>> > > > > > > >>>>>>
> > > > > > >>> > > > > > > >>>>>> The next question is: what if the user
> doesn't
> > > > > > >>> provide equals()
> > > > > > >>> > > > > > > >> method ?
> > > > > > >>> > > > > > > >>>>>> Would we automatically fall back to
> > > > > emit-on-update ?
> > > > > > >>> > > > > > > >>>>>>
> > > > > > >>> > > > > > > >>>>>> Cheers
> > > > > > >>> > > > > > > >>>>>>
> > > > > > >>> > > > > > > >>>>>> On Tue, Feb 4, 2020 at 1:37 PM Matthias
> J. Sax <
> > > > > > >>> mj...@apache.org>
> > > > > > >>> > > > > > > >>>>> wrote:
> > > > > > >>> > > > > > > >>>>>>
> > > > > > >>> > > > > > > > First a high level comment:
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > > Overall, I would like to make one step back,
> and make
> > > > > sure
> > > > > > >>> we are
> > > > > > >>> > > > > > > > discussion on the same level. Originally, I
> understood
> > > > > > >>> this KIP
> > > > > > >>> > > > > > > >>> as a
> > > > > > >>> > > > > > > > proposed change of _semantics_, however, given
> the
> > > > > latest
> > > > > > >>> > > > > > > >>> discussion
> > > > > > >>> > > > > > > > it seems it's actually not -- it's more an
> > > > > _optimization_
> > > > > > >>> > > > > > > >>> proposal.
> > > > > > >>> > > > > > > > Hence, we only need to make sure that this
> optimization
> > > > > > >>> does not
> > > > > > >>> > > > > > > >>> break
> > > > > > >>> > > > > > > > existing semantics. It this the right way to
> think
> > > > > about
> > > > > > >>> it?
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > > If yes, than it might actually be ok to have
> different
> > > > > > >>> behavior
> > > > > > >>> > > > > > > > depending if there is a materialized KTable or
> not. So
> > > > > > >>> far, we
> > > > > > >>> > > > > > > >>> never
> > > > > > >>> > > > > > > > defined a public contract about our emit
> strategy and
> > > > > it
> > > > > > >>> seems
> > > > > > >>> > > > > > > >>> this
> > > > > > >>> > > > > > > > KIP does not define one either.
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > > Hence, I don't have as strong of an opinion
> about
> > > > > sending
> > > > > > >>> > > > > > > >>> oldValues
> > > > > > >>> > > > > > > > for example any longer. I guess the question
> is really,
> > > > > > >>> what can
> > > > > > >>> > > > > > > >>> we
> > > > > > >>> > > > > > > > implement in a reasonable way.
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > > Other comments:
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > > @Richard:
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > > Can you please add the KIP to the KIP overview
> table:
> > > > > It's
> > > > > > >>> missing
> > > > > > >>> > > > > > > > (
> > > > > > >>> > > > > > > >>>>>>
> > > > > > >>> > > > > > > >>>
> > > > > > >>> > > > > >
> > > > > > >>>
> > > > >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Pro
> > > > > > >>> > > > > > > > posals).
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > > @Bruno:
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > > You mentioned caching. I think it's irrelevant
> > > > > > >>> (orthogonal) and
> > > > > > >>> > > > > > > >>> we can
> > > > > > >>> > > > > > > > discuss this KIP without considering it.
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > > @John:
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > >>>>>>>>> Even in the source table, we forward
> the
> > > > > updated
> > > > > > >>> record with
> > > > > > >>> > > > > > the
> > > > > > >>> > > > > > > >>>>>>>>> higher of the two timestamps. So the
> example is
> > > > > > >>> more like:
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > > That is not correct. Currently, we forward
> with the
> > > > > smaller
> > > > > > >>> > > > > > > > out-of-order timestamp (changing the timestamp
> would
> > > > > > >>> corrupt the
> > > > > > >>> > > > > > > >>> data
> > > > > > >>> > > > > > > > -- we don't know, because we don't check, if
> the value
> > > > > is
> > > > > > >>> the
> > > > > > >>> > > > > > > >>> same
> > > > > > >>> > > > > > > >>>>>> or
> > > > > > >>> > > > > > > > a different one, hence, we must emit the
> out-of-order
> > > > > > >>> record
> > > > > > >>> > > > > > > >>> as-is).
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > > If we start to do emit-on-change, we also need
> to emit
> > > > > a
> > > > > > >>> new
> > > > > > >>> > > > > > > >>> record if
> > > > > > >>> > > > > > > > the timestamp changes due to out-of-order
> data, hence,
> > > > > we
> > > > > > >>> would
> > > > > > >>> > > > > > > >>> still
> > > > > > >>> > > > > > > > need to emit <K,V,T1> because that give us
> correct
> > > > > > >>> semantics:
> > > > > > >>> > > > > > > >>> assume
> > > > > > >>> > > > > > > > you have a filter() and afterward use the
> filter
> > > > > KTable in
> > > > > > >>> a
> > > > > > >>> > > > > > > > stream-table join -- the lower T1 timestamp
> must be
> > > > > > >>> propagated to
> > > > > > >>> > > > > > > >>> the
> > > > > > >>> > > > > > > > filtered KTable to ensure that that the
> stream-table
> > > > > join
> > > > > > >>> compute
> > > > > > >>> > > > > > > >>> the
> > > > > > >>> > > > > > > > correct result.
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > > Your point about requiring an `equals()`
> > > > > implementation is
> > > > > > >>> > > > > > > >>> actually a
> > > > > > >>> > > > > > > > quite interesting one and boils down to my
> statement
> > > > > from
> > > > > > >>> above
> > > > > > >>> > > > > > > >>> about
> > > > > > >>> > > > > > > > "what can we actually implement". What I don't
> > > > > understand
> > > > > > >>> is:
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > >>>>>>>>> This way, we still don't have to rely
> on the
> > > > > > >>> existence of an
> > > > > > >>> > > > > > > >>>>>>>>> equals() method, but if it is there,
> we can
> > > > > > >>> benefit from it.
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > > Your bullet point (2) says it uses `equals()`
> --
> > > > > hence, it
> > > > > > >>> seems
> > > > > > >>> > > > > > > >>> we
> > > > > > >>> > > > > > > > actually to rely on it? Also, how can we
> detect if
> > > > > there
> > > > > > >>> is an
> > > > > > >>> > > > > > > > `equals()` method to do the comparison? Would
> be fail
> > > > > if
> > > > > > >>> we don't
> > > > > > >>> > > > > > > >>> have
> > > > > > >>> > > > > > > > `equals()` nor corresponding serializes to do
> the
> > > > > > >>> comparison?
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > >>>>>>>>> Wow, really good catch! Yes, we
> absolutely need
> > > > > > >>> metrics and
> > > > > > >>> > > > > > > >>> logs if
> > > > > > >>> > > > > > > >>>>>>>>> we're going to drop any records. And,
> yes, we
> > > > > > >>> should propose
> > > > > > >>> > > > > > > >>>>>>>>> metrics and logs that are similar to
> the
> > > > > existing
> > > > > > >>> ones when we
> > > > > > >>> > > > > > > >>> drop
> > > > > > >>> > > > > > > >>>>>>>>> records for other reasons.
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > > I am not sure about this point. In fact, we
> have
> > > > > already
> > > > > > >>> some
> > > > > > >>> > > > > > > >>> no-ops
> > > > > > >>> > > > > > > > in Kafka Streams in our join-operators and
> don't report
> > > > > > >>> any of
> > > > > > >>> > > > > > > >>> those
> > > > > > >>> > > > > > > > either. Emit-on-change is operator semantics
> and I
> > > > > don't
> > > > > > >>> see why
> > > > > > >>> > > > > > > >>> we
> > > > > > >>> > > > > > > > would need to have a metric for it? It seems
> to be
> > > > > quite
> > > > > > >>> different
> > > > > > >>> > > > > > > > compared to dropping late or malformed records.
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > > -Matthias
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > > On 2/4/20 7:13 AM, Thomas Becker wrote:
> > > > > > >>> > > > > > > >>>>>>>>> Thanks John for your thoughtful reply.
> Some
> > > > > > >>> comments inline.
> > > > > > >>> > > > > > > >>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>> On Mon, 2020-02-03 at 11:51 -0600,
> John Roesler
> > > > > > >>> wrote:
> > > > > > >>> > > > > > > >>>>>>>>>> [EXTERNAL EMAIL] Attention: This
> email was
> > > > > sent
> > > > > > >>> from outside
> > > > > > >>> > > > > > > >>>>>>>>>> TiVo. DO NOT CLICK any links or
> attachments
> > > > > > >>> unless you
> > > > > > >>> > > > > > expected
> > > > > > >>> > > > > > > >>>>>>>>>> them. ________________________________
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>> Hi Tommy,
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>> Thanks for the context. I can see the
> > > > > attraction
> > > > > > >>> of
> > > > > > >>> > > > > > considering
> > > > > > >>> > > > > > > >>>>>>>>>> these use cases together.
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>> To answer your question, if a part of
> the
> > > > > record
> > > > > > >>> is not
> > > > > > >>> > > > > > > >>> relevant
> > > > > > >>> > > > > > > >>>>>>>>>> to downstream consumers, I was
> thinking you
> > > > > could
> > > > > > >>> just use a
> > > > > > >>> > > > > > > >>>>>>>>>> mapValue to remove it.
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>> E.g., suppose you wanted to do a join
> between
> > > > > two
> > > > > > >>> tables.
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>> employeeInfo.join( employeePayroll,
> (info,
> > > > > > >>> payroll) -> new
> > > > > > >>> > > > > > > >>>>>>>>>> Result(info.name(),
> payroll.salary()) )
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>> We only care about one attribute from
> the Info
> > > > > > >>> table (name),
> > > > > > >>> > > > > > > >>> and
> > > > > > >>> > > > > > > >>>>>>>>>> one from the Payroll table (salary),
> and these
> > > > > > >>> attributes
> > > > > > >>> > > > > > > >>> change
> > > > > > >>> > > > > > > >>>>>>>>>> rarely. On the other hand, there
> might be many
> > > > > > >>> other
> > > > > > >>> > > > > > attributes
> > > > > > >>> > > > > > > >>>>>>>>>> that change frequently of these
> tables. We can
> > > > > > >>> avoid
> > > > > > >>> > > > > > triggering
> > > > > > >>> > > > > > > >>>>>>>>>> the join unnecessarily by mapping the
> input
> > > > > > >>> tables to drop the
> > > > > > >>> > > > > > > >>>>>>>>>> unnecessary information before the
> join:
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>> names = employeeInfo.mapValues(info ->
> > > > > info.name())
> > > > > > >>> salaries
> > > > > > >>> > > > > > =
> > > > > > >>> > > > > > > >>>>>>>>>> employeePayroll.mapValues(payroll ->
> > > > > > >>> payroll.salary())
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>> names.join( salaries, (name, salary)
> -> new
> > > > > > >>> Result(name,
> > > > > > >>> > > > > > > >>> salary)
> > > > > > >>> > > > > > > >>>>>>>>>> )
> > > > > > >>> > > > > > > >>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>> Ahh yes I see. This works, but in the
> case
> > > > > where
> > > > > > >>> you're using
> > > > > > >>> > > > > > > >>>>>>>>> schemas as we are (e.g. Avro), it
> seems like
> > > > > this
> > > > > > >>> approach
> > > > > > >>> > > > > > could
> > > > > > >>> > > > > > > >>>>>>>>> lead to a proliferation of "skinny"
> record
> > > > > types
> > > > > > >>> that just drop
> > > > > > >>> > > > > > > >>>>>>>>> various fields.
> > > > > > >>> > > > > > > >>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>> Especially if we take Matthias's idea
> to drop
> > > > > > >>> non-changes even
> > > > > > >>> > > > > > > >>>>>>>>>> for stateless operations, this would
> be quite
> > > > > > >>> efficient and is
> > > > > > >>> > > > > > > >>>>>>>>>> also a very straightforward
> optimization to
> > > > > > >>> understand once
> > > > > > >>> > > > > > you
> > > > > > >>> > > > > > > >>>>>>>>>> know that Streams provides
> emit-on-change.
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>> From the context that you provided,
> it seems
> > > > > like
> > > > > > >>> a slightly
> > > > > > >>> > > > > > > >>>>>>>>>> different situation, though. Reading
> between
> > > > > the
> > > > > > >>> lines a
> > > > > > >>> > > > > > > >>> little,
> > > > > > >>> > > > > > > >>>>>>>>>> it sounds like: in contrast to the
> example
> > > > > above,
> > > > > > >>> in which we
> > > > > > >>> > > > > > > >>> are
> > > > > > >>> > > > > > > >>>>>>>>>> filtering out extra _data_, you have
> some
> > > > > extra
> > > > > > >>> _metadata_
> > > > > > >>> > > > > > that
> > > > > > >>> > > > > > > >>>>>>>>>> you still wish to pass down with the
> data when
> > > > > > >>> there is a
> > > > > > >>> > > > > > > >>> "real"
> > > > > > >>> > > > > > > >>>>>>>>>> update, but you don't want the
> metadata
> > > > > itself to
> > > > > > >>> cause an
> > > > > > >>> > > > > > > >>>>>>>>>> update.
> > > > > > >>> > > > > > > >>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>> Despite my lack of clarity, yes you've
> got it
> > > > > > >>> right ;) This
> > > > > > >>> > > > > > > >>>>>>>>> particular processor is the first stop
> for this
> > > > > > >>> data after
> > > > > > >>> > > > > > > >>> coming
> > > > > > >>> > > > > > > >>>>>>>>> in from external users, who often
> simply post
> > > > > the
> > > > > > >>> same content
> > > > > > >>> > > > > > > >>> each
> > > > > > >>> > > > > > > >>>>>>>>> time and we're trying to shield
> downstream
> > > > > > >>> consumers from
> > > > > > >>> > > > > > > >>>>>>>>> unnecessary churn.
> > > > > > >>> > > > > > > >>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>> It does seem handy to be able to plug
> in a
> > > > > custom
> > > > > > >>> > > > > > > >>> ChangeDetector
> > > > > > >>> > > > > > > >>>>>>>>>> for this purpose, but I worry about
> the API
> > > > > > >>> complexity. Maybe
> > > > > > >>> > > > > > > >>> you
> > > > > > >>> > > > > > > >>>>>>>>>> can help think though how to provide
> the same
> > > > > > >>> benefit while
> > > > > > >>> > > > > > > >>>>>>>>>> limiting user-facing complexity.
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>> Here's some extra context to consider:
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>> We currently don't make any extra
> requirements
> > > > > > >>> about the
> > > > > > >>> > > > > > nature
> > > > > > >>> > > > > > > >>>>>>>>>> of data that you can use in Streams.
> For
> > > > > example,
> > > > > > >>> you don't
> > > > > > >>> > > > > > > >>> have
> > > > > > >>> > > > > > > >>>>>>>>>> to implement hashCode and equals, or
> > > > > compareTo,
> > > > > > >>> etc. With the
> > > > > > >>> > > > > > > >>>>>>>>>> current proposal, we can do an
> airtight
> > > > > > >>> comparison based only
> > > > > > >>> > > > > > > >>> on
> > > > > > >>> > > > > > > >>>>>>>>>> the serialized form of the values,
> and we
> > > > > > >>> actually don't have
> > > > > > >>> > > > > > > >>> to
> > > > > > >>> > > > > > > >>>>>>>>>> deserialize the "prior" value at all
> for a
> > > > > large
> > > > > > >>> number of
> > > > > > >>> > > > > > > >>>>>>>>>> operations. Admitedly, if we extend
> the
> > > > > proposal
> > > > > > >>> to include
> > > > > > >>> > > > > > > >>> no-op
> > > > > > >>> > > > > > > >>>>>>>>>> detection for stateless operations,
> we'd
> > > > > probably
> > > > > > >>> need to rely
> > > > > > >>> > > > > > > >>> on
> > > > > > >>> > > > > > > >>>>>>>>>> equals() for no-op checking,
> otherwise we'd
> > > > > wind
> > > > > > >>> up requiring
> > > > > > >>> > > > > > > >>>>>>>>>> serdes for stateless operations as
> well.
> > > > > > >>> Actually, I'd
> > > > > > >>> > > > > > probably
> > > > > > >>> > > > > > > >>>>>>>>>> argue for doing exactly that:
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>> 1. In stateful operations, drop if the
> > > > > serialized
> > > > > > >>> byte[]s are
> > > > > > >>> > > > > > > >>> the
> > > > > > >>> > > > > > > >>>>>>>>>> same. After deserializing, also drop
> if the
> > > > > > >>> objects are equal
> > > > > > >>> > > > > > > >>>>>>>>>> according to Object#equals().
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>> 2. In stateless operations, compare
> the "new"
> > > > > and
> > > > > > >>> "old" values
> > > > > > >>> > > > > > > >>>>>>>>>> (if "old" is available) based on
> > > > > Object#equals().
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>> 3. As a final optimization, after
> serializing
> > > > > and
> > > > > > >>> before
> > > > > > >>> > > > > > > >>> sending
> > > > > > >>> > > > > > > >>>>>>>>>> repartition records, compare the
> serialized
> > > > > data
> > > > > > >>> and drop
> > > > > > >>> > > > > > > >>>>>>>>>> no-ops.
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>> This way, we still don't have to rely
> on the
> > > > > > >>> existence of an
> > > > > > >>> > > > > > > >>>>>>>>>> equals() method, but if it is there,
> we can
> > > > > > >>> benefit from it.
> > > > > > >>> > > > > > > >>>>>>>>>> Also, we don't require a serde in any
> new
> > > > > > >>> situations, but we
> > > > > > >>> > > > > > > >>> can
> > > > > > >>> > > > > > > >>>>>>>>>> still leverage it when it is
> available.
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>> For clarity, in my example above,
> even if the
> > > > > > >>> employeeInfo and
> > > > > > >>> > > > > > > >>>>>>>>>> employeePayroll and Result records
> all have
> > > > > > >>> serdes, we need
> > > > > > >>> > > > > > the
> > > > > > >>> > > > > > > >>>>>>>>>> "name" field (presumably String) and
> the
> > > > > "salary"
> > > > > > >>> field
> > > > > > >>> > > > > > > >>>>>>>>>> (presumable a Double) to have serdes
> as well
> > > > > in
> > > > > > >>> the naive
> > > > > > >>> > > > > > > >>>>>>>>>> implementation. But if we can leverage
> > > > > equals(),
> > > > > > >>> then the
> > > > > > >>> > > > > > > >>> "right
> > > > > > >>> > > > > > > >>>>>>>>>> thing" happens automatically.
> > > > > > >>> > > > > > > >>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>> I still don't totally follow why the
> individual
> > > > > > >>> components
> > > > > > >>> > > > > > > >>> (name,
> > > > > > >>> > > > > > > >>>>>>>>> salary) would have to have serdes
> here. If
> > > > > Result
> > > > > > >>> has one, we
> > > > > > >>> > > > > > > >>>>>>>>> compare bytes, and if Result
> additionally has
> > > > > an
> > > > > > >>> equals()
> > > > > > >>> > > > > > method
> > > > > > >>> > > > > > > >>>>>>>>> (which presumably includes equals
> comparisons
> > > > > on
> > > > > > >>> the
> > > > > > >>> > > > > > constituent
> > > > > > >>> > > > > > > >>>>>>>>> fields), have we not covered our bases?
> > > > > > >>> > > > > > > >>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>> This dovetails in with my primary UX
> concern;
> > > > > > >>> where would the
> > > > > > >>> > > > > > > >>>>>>>>>> ChangeDetector actually be
> registered? None of
> > > > > > >>> the operators
> > > > > > >>> > > > > > in
> > > > > > >>> > > > > > > >>>>>>>>>> my example have names or topics or
> any other
> > > > > > >>> identifiable
> > > > > > >>> > > > > > > >>>>>>>>>> characteristic that could be passed
> to a
> > > > > > >>> ChangeDetector class
> > > > > > >>> > > > > > > >>>>>>>>>> registered via config. You could say
> that we
> > > > > make
> > > > > > >>> > > > > > > >>> ChangeDetector
> > > > > > >>> > > > > > > >>>>>>>>>> an optional parameter to every
> operation in
> > > > > > >>> Streams, but this
> > > > > > >>> > > > > > > >>>>>>>>>> seems to carry quite a bit of mental
> burden
> > > > > with
> > > > > > >>> it. People
> > > > > > >>> > > > > > > >>> will
> > > > > > >>> > > > > > > >>>>>>>>>> wonder what it's for and whether or
> not they
> > > > > > >>> should be using
> > > > > > >>> > > > > > > >>> it.
> > > > > > >>> > > > > > > >>>>>>>>>> There would almost certainly be a
> > > > > misconception
> > > > > > >>> that it's
> > > > > > >>> > > > > > > >>>>>>>>>> preferable to implement it always,
> which
> > > > > would be
> > > > > > >>> unfortunate.
> > > > > > >>> > > > > > > >>>>>>>>>> Plus, to actually implment metadata
> flowing
> > > > > > >>> through the
> > > > > > >>> > > > > > > >>> topology
> > > > > > >>> > > > > > > >>>>>>>>>> as in your use case, you'd have to do
> two
> > > > > things:
> > > > > > >>> 1. make sure
> > > > > > >>> > > > > > > >>>>>>>>>> that all operations actually preserve
> the
> > > > > > >>> metadata alongside
> > > > > > >>> > > > > > > >>> the
> > > > > > >>> > > > > > > >>>>>>>>>> data (e.g., don't accidentally add a
> mapValues
> > > > > > >>> like I did, or
> > > > > > >>> > > > > > > >>> you
> > > > > > >>> > > > > > > >>>>>>>>>> drop the metadata). 2. implement a
> > > > > ChangeDetector
> > > > > > >>> for every
> > > > > > >>> > > > > > > >>>>>>>>>> single operation in the topology, or
> you don't
> > > > > > >>> get the benefit
> > > > > > >>> > > > > > > >>> of
> > > > > > >>> > > > > > > >>>>>>>>>> dropping non-changes internally 2b.
> > > > > > >>> Alternatively, you could
> > > > > > >>> > > > > > > >>> just
> > > > > > >>> > > > > > > >>>>>>>>>> add the ChangeDetector to one
> operation toward
> > > > > > >>> the end of the
> > > > > > >>> > > > > > > >>>>>>>>>> topology. This would not drop
> redundant
> > > > > > >>> computation
> > > > > > >>> > > > > > internally,
> > > > > > >>> > > > > > > >>>>>>>>>> but only drop redundant _outputs_.
> But this is
> > > > > > >>> just about the
> > > > > > >>> > > > > > > >>>>>>>>>> same as your current solution.
> > > > > > >>> > > > > > > >>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>> I definitely see your point regarding
> > > > > > >>> configuration. I was
> > > > > > >>> > > > > > > >>>>>>>>> originally thinking about this when the
> > > > > > >>> deduplication was going
> > > > > > >>> > > > > > > >>> to
> > > > > > >>> > > > > > > >>>>>>>>> be opt-in, and it seemed very natural
> to say
> > > > > > >>> something like:
> > > > > > >>> > > > > > > >>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>> employeeInfo.join(employeePayroll,
> (info,
> > > > > payroll)
> > > > > > >>> -> new
> > > > > > >>> > > > > > > >>>>>>>>> Result(info.name(), payroll.salary()))
> > > > > > >>> > > > > > > >>>>>>>>>
> > > > > > >>> .suppress(duplicatesAccordingTo(someChangeDetector))
> > > > > > >>> > > > > > > >>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>> Alternatively you can imagine a
> similar method
> > > > > > >>> being on
> > > > > > >>> > > > > > > >>>>>>>>> Materialized, though obviously this
> makes less
> > > > > > >>> sense if we
> > > > > > >>> > > > > > don't
> > > > > > >>> > > > > > > >>>>>>>>> want to require materialization. If
> we're now
> > > > > > >>> talking about
> > > > > > >>> > > > > > > >>>>>>>>> changing the default behavior and not
> having
> > > > > any
> > > > > > >>> configuration
> > > > > > >>> > > > > > > >>>>>>>>> options, it's harder to find a place
> for this.
> > > > > > >>> > > > > > > >>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>> A final thought; if it really is a
> metadata
> > > > > > >>> question, can we
> > > > > > >>> > > > > > > >>> just
> > > > > > >>> > > > > > > >>>>>>>>>> plan to finish up the support for
> headers in
> > > > > > >>> Streams? I.e.,
> > > > > > >>> > > > > > > >>> give
> > > > > > >>> > > > > > > >>>>>>>>>> you a way to control the way that
> headers flow
> > > > > > >>> through the
> > > > > > >>> > > > > > > >>>>>>>>>> topology? Then, we could treat
> headers the
> > > > > same
> > > > > > >>> way we treat
> > > > > > >>> > > > > > > >>>>>>>>>> timestamps in the no-op checking... We
> > > > > completely
> > > > > > >>> ignore them
> > > > > > >>> > > > > > > >>>>>>>>>> for the sake of comparison. Thus,
> neither the
> > > > > > >>> timestamp nor
> > > > > > >>> > > > > > the
> > > > > > >>> > > > > > > >>>>>>>>>> headers would get updated in internal
> state
> > > > > or in
> > > > > > >>> downstream
> > > > > > >>> > > > > > > >>>>>>>>>> views as long as the value itself
> doesn't
> > > > > change.
> > > > > > >>> This seems
> > > > > > >>> > > > > > to
> > > > > > >>> > > > > > > >>>>>>>>>> give us a way to support your use
> case without
> > > > > > >>> adding to the
> > > > > > >>> > > > > > > >>>>>>>>>> mental overhead of using Streams for
> simple
> > > > > > >>> things.
> > > > > > >>> > > > > > > >>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>> Agree headers could be a decent fit
> for this
> > > > > > >>> particular case
> > > > > > >>> > > > > > > >>>>>>>>> because it's mostly metadata, though
> to be
> > > > > honest
> > > > > > >>> we haven't
> > > > > > >>> > > > > > > >>> looked
> > > > > > >>> > > > > > > >>>>>>>>> at headers much (mostly because, and
> to your
> > > > > > >>> point, support
> > > > > > >>> > > > > > > >>> seems
> > > > > > >>> > > > > > > >>>>>>>>> to be lacking). I feel like there
> would be
> > > > > other
> > > > > > >>> cases where
> > > > > > >>> > > > > > > >>> this
> > > > > > >>> > > > > > > >>>>>>>>> feature could be valuable, but I admit
> I can't
> > > > > > >>> come up with
> > > > > > >>> > > > > > > >>>>>>>>> anything right this second. Perhaps
> yuzhihong
> > > > > had
> > > > > > >>> an example in
> > > > > > >>> > > > > > > >>>>>>>>> mind?
> > > > > > >>> > > > > > > >>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>> I.e., simple things should be easy,
> and
> > > > > complex
> > > > > > >>> things should
> > > > > > >>> > > > > > > >>> be
> > > > > > >>> > > > > > > >>>>>>>>>> possible.
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>> What are your thoughts? Thanks, -John
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>> On Mon, Feb 3, 2020, at 07:19, Thomas
> Becker
> > > > > > >>> wrote:
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>> Hi John, Can you describe how you'd
> use
> > > > > > >>> filtering/mapping to
> > > > > > >>> > > > > > > >>>>>>>>>> deduplicate records? To give some
> background
> > > > > on
> > > > > > >>> my suggestion
> > > > > > >>> > > > > > > >>> we
> > > > > > >>> > > > > > > >>>>>>>>>> currently have a small stream
> processor that
> > > > > > >>> exists solely to
> > > > > > >>> > > > > > > >>>>>>>>>> deduplicate, which we do using a
> process that
> > > > > I
> > > > > > >>> assume would
> > > > > > >>> > > > > > be
> > > > > > >>> > > > > > > >>>>>>>>>> similar to what would be done here
> (with a
> > > > > store
> > > > > > >>> of keys and
> > > > > > >>> > > > > > > >>> hash
> > > > > > >>> > > > > > > >>>>>>>>>> values). But the records we are
> deduplicating
> > > > > > >>> have some
> > > > > > >>> > > > > > > >>> metadata
> > > > > > >>> > > > > > > >>>>>>>>>> fields (such as timestamps of when
> the record
> > > > > was
> > > > > > >>> posted) that
> > > > > > >>> > > > > > > >>> we
> > > > > > >>> > > > > > > >>>>>>>>>> don't consider semantically
> meaningful for
> > > > > > >>> downstream
> > > > > > >>> > > > > > > >>> consumers,
> > > > > > >>> > > > > > > >>>>>>>>>> and therefore we also suppress
> updates that
> > > > > only
> > > > > > >>> touch those
> > > > > > >>> > > > > > > >>>>>>>>>> fields.
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>> -Tommy
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>> On Fri, 2020-01-31 at 19:30 -0600,
> John
> > > > > Roesler
> > > > > > >>> wrote:
> > > > > > >>> > > > > > > >>> [EXTERNAL
> > > > > > >>> > > > > > > >>>>>>>>>> EMAIL] Attention: This email was sent
> from
> > > > > > >>> outside TiVo. DO
> > > > > > >>> > > > > > NOT
> > > > > > >>> > > > > > > >>>>>>>>>> CLICK any links or attachments unless
> you
> > > > > > >>> expected them.
> > > > > > >>> > > > > > > >>>>>>>>>> ________________________________
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>> Hi Thomas and yuzhihong, That’s an
> interesting
> > > > > > >>> idea. Can you
> > > > > > >>> > > > > > > >>> help
> > > > > > >>> > > > > > > >>>>>>>>>> think of a use case that isn’t also
> served by
> > > > > > >>> filtering or
> > > > > > >>> > > > > > > >>>>>>>>>> mapping beforehand? Thanks for
> helping to
> > > > > design
> > > > > > >>> this feature!
> > > > > > >>> > > > > > > >>>>>>>>>> -John On Fri, Jan 31, 2020, at 18:56,
> > > > > > >>> yuzhih...@gmail.com
> > > > > > >>> > > > > > > >>>>>>>>>> <mailto:yuzhih...@gmail.com> wrote:
> I think
> > > > > this
> > > > > > >>> is good
> > > > > > >>> > > > > > > >>> idea. On
> > > > > > >>> > > > > > > >>>>>>>>>> Jan 31, 2020, at 4:49 PM, Thomas
> Becker <
> > > > > > >>> > > > > > > >>> thomas.bec...@tivo.com
> > > > > > >>> > > > > > > >>>>>>>>>> <mailto:thomas.bec...@tivo.com>>
> wrote: How
> > > > > do
> > > > > > >>> folks feel
> > > > > > >>> > > > > > > >>> about
> > > > > > >>> > > > > > > >>>>>>>>>> allowing the mechanism by which
> no-ops are
> > > > > > >>> detected to be
> > > > > > >>> > > > > > > >>>>>>>>>> pluggable? Meaning use something like
> a hash
> > > > > by
> > > > > > >>> default, but
> > > > > > >>> > > > > > > >>> you
> > > > > > >>> > > > > > > >>>>>>>>>> could optionally provide an
> implementation of
> > > > > > >>> something to use
> > > > > > >>> > > > > > > >>>>>>>>>> instead, like a ChangeDetector. This
> could be
> > > > > > >>> useful for
> > > > > > >>> > > > > > > >>> example
> > > > > > >>> > > > > > > >>>>>>>>>> to ignore changes to certain fields,
> which may
> > > > > > >>> not be relevant
> > > > > > >>> > > > > > > >>> to
> > > > > > >>> > > > > > > >>>>>>>>>> the operation being performed.
> > > > > > >>> > > > > > ________________________________
> > > > > > >>> > > > > > > >>>>>>>>>> From: John Roesler <
> vvcep...@apache.org
> > > > > > >>> > > > > > > >>>>>>>>>> <mailto:vvcep...@apache.org>> Sent:
> Friday,
> > > > > > >>> January 31, 2020
> > > > > > >>> > > > > > > >>> 4:51
> > > > > > >>> > > > > > > >>>>>>>>>> PM To: dev@kafka.apache.org <mailto:
> > > > > > >>> dev@kafka.apache.org>
> > > > > > >>> > > > > > > >>>>>>>>>> <dev@kafka.apache.org <mailto:
> > > > > > >>> dev@kafka.apache.org>> Subject:
> > > > > > >>> > > > > > > >>> Re:
> > > > > > >>> > > > > > > >>>>>>>>>> [KAFKA-557] Add emit on change
> support for
> > > > > Kafka
> > > > > > >>> Streams
> > > > > > >>> > > > > > > >>>>>>>>>> [EXTERNAL EMAIL] Attention: This
> email was
> > > > > sent
> > > > > > >>> from outside
> > > > > > >>> > > > > > > >>>>>>>>>> TiVo. DO NOT CLICK any links or
> attachments
> > > > > > >>> unless you
> > > > > > >>> > > > > > expected
> > > > > > >>> > > > > > > >>>>>>>>>> them. ________________________________
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>> Hello all, Sorry for my silence. It
> seems
> > > > > like we
> > > > > > >>> are getting
> > > > > > >>> > > > > > > >>>>>>>>>> close to consensus. Hopefully, we
> could move
> > > > > to a
> > > > > > >>> vote soon!
> > > > > > >>> > > > > > > >>> All
> > > > > > >>> > > > > > > >>>>>>>>>> of the reasoning from Matthias and
> Bruno
> > > > > around
> > > > > > >>> timestamp is
> > > > > > >>> > > > > > > >>>>>>>>>> compelling. I would be strongly in
> favor of
> > > > > > >>> stating a few
> > > > > > >>> > > > > > > >>> things
> > > > > > >>> > > > > > > >>>>>>>>>> very clearly in the KIP: 1. Streams
> will drop
> > > > > > >>> no-op updates
> > > > > > >>> > > > > > > >>> only
> > > > > > >>> > > > > > > >>>>>>>>>> for KTable operations. That is, we
> won't make
> > > > > any
> > > > > > >>> changes to
> > > > > > >>> > > > > > > >>>>>>>>>> KStream aggregations at the moment.
> It does
> > > > > seem
> > > > > > >>> like we can
> > > > > > >>> > > > > > > >>>>>>>>>> potentially revisit the time
> semantics of that
> > > > > > >>> operation in
> > > > > > >>> > > > > > the
> > > > > > >>> > > > > > > >>>>>>>>>> future, but we don't need to do it
> now. On the
> > > > > > >>> other hand, the
> > > > > > >>> > > > > > > >>>>>>>>>> proposed semantics for KTable
> timestamps
> > > > > (marking
> > > > > > >>> the
> > > > > > >>> > > > > > beginning
> > > > > > >>> > > > > > > >>>>>>>>>> of the validity of that record) makes
> sense to
> > > > > > >>> me. 2. Streams
> > > > > > >>> > > > > > > >>>>>>>>>> will only drop no-op updates for
> _stateful_
> > > > > > >>> KTable operations.
> > > > > > >>> > > > > > > >>> We
> > > > > > >>> > > > > > > >>>>>>>>>> don't want to add a hard guarantee
> that
> > > > > Streams
> > > > > > >>> will _never_
> > > > > > >>> > > > > > > >>>>>>>>>> emit a no-op table update because it
> would
> > > > > > >>> require adding
> > > > > > >>> > > > > > state
> > > > > > >>> > > > > > > >>>>>>>>>> to otherwise stateless operations. If
> someone
> > > > > is
> > > > > > >>> really
> > > > > > >>> > > > > > > >>> concerned
> > > > > > >>> > > > > > > >>>>>>>>>> about a particular stateless operation
> > > > > producing
> > > > > > >>> a lot of
> > > > > > >>> > > > > > no-op
> > > > > > >>> > > > > > > >>>>>>>>>> results, all they have to do is
> materialize
> > > > > it,
> > > > > > >>> and Streams
> > > > > > >>> > > > > > > >>> would
> > > > > > >>> > > > > > > >>>>>>>>>> automatically drop the no-ops.
> Additionally,
> > > > > I'm
> > > > > > >>> +1 on not
> > > > > > >>> > > > > > > >>> adding
> > > > > > >>> > > > > > > >>>>>>>>>> an opt-out at this time. Regarding
> the KIP
> > > > > > >>> itself, I would
> > > > > > >>> > > > > > > >>> clean
> > > > > > >>> > > > > > > >>>>>>>>>> it up a bit before calling for a
> vote. There
> > > > > is a
> > > > > > >>> lot of
> > > > > > >>> > > > > > > >>>>>>>>>> "discussion"-type language there,
> which is
> > > > > very
> > > > > > >>> natural to
> > > > > > >>> > > > > > > >>> read,
> > > > > > >>> > > > > > > >>>>>>>>>> but makes it a bit hard to see what
> _exactly_
> > > > > the
> > > > > > >>> kip is
> > > > > > >>> > > > > > > >>>>>>>>>> proposing. Richard, would you mind
> just making
> > > > > > >>> the "proposed
> > > > > > >>> > > > > > > >>>>>>>>>> behavior change" a simple and
> succinct list of
> > > > > > >>> bullet points?
> > > > > > >>> > > > > > > >>>>>>>>>> I.e., please drop glue phrases like
> "there has
> > > > > > >>> been some
> > > > > > >>> > > > > > > >>>>>>>>>> discussion" or "possibly we could do
> X". For
> > > > > the
> > > > > > >>> final version
> > > > > > >>> > > > > > > >>> of
> > > > > > >>> > > > > > > >>>>>>>>>> the KIP, it should just say, "Streams
> will do
> > > > > X,
> > > > > > >>> Streams will
> > > > > > >>> > > > > > > >>> do
> > > > > > >>> > > > > > > >>>>>>>>>> Y". Feel free to add an elaboration
> section to
> > > > > > >>> explain more
> > > > > > >>> > > > > > > >>> about
> > > > > > >>> > > > > > > >>>>>>>>>> what X and Y mean, but we don't need
> to talk
> > > > > about
> > > > > > >>> > > > > > > >>> possibilities
> > > > > > >>> > > > > > > >>>>>>>>>> or alternatives except in the
> "rejected
> > > > > > >>> alternatives" section.
> > > > > > >>> > > > > > > >>>>>>>>>> Accordingly, can you also move the
> options you
> > > > > > >>> presented in
> > > > > > >>> > > > > > the
> > > > > > >>> > > > > > > >>>>>>>>>> intro to the "rejected alternatives"
> section
> > > > > and
> > > > > > >>> only mention
> > > > > > >>> > > > > > > >>> the
> > > > > > >>> > > > > > > >>>>>>>>>> final proposal itself? This just
> really helps
> > > > > > >>> reviewers to
> > > > > > >>> > > > > > know
> > > > > > >>> > > > > > > >>>>>>>>>> what they are voting for, and it helps
> > > > > everyone
> > > > > > >>> after the fact
> > > > > > >>> > > > > > > >>>>>>>>>> when they are trying to get clarity
> on what
> > > > > > >>> exactly the
> > > > > > >>> > > > > > > >>> proposal
> > > > > > >>> > > > > > > >>>>>>>>>> is, versus all the things it could
> have been.
> > > > > > >>> Thanks, -John
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>> On Mon, Jan 27, 2020, at 18:14,
> Richard Yu
> > > > > wrote:
> > > > > > >>> Hello to
> > > > > > >>> > > > > > all,
> > > > > > >>> > > > > > > >>>>>>>>>> I've finished making some initial
> > > > > modifications
> > > > > > >>> to the KIP. I
> > > > > > >>> > > > > > > >>>>>>>>>> have decided to keep the
> implementation
> > > > > section
> > > > > > >>> in the KIP for
> > > > > > >>> > > > > > > >>>>>>>>>> record-keeping purposes. For now, we
> should
> > > > > focus
> > > > > > >>> on only the
> > > > > > >>> > > > > > > >>>>>>>>>> proposed behavior changes instead.
> See if you
> > > > > > >>> have any
> > > > > > >>> > > > > > > >>> comments!
> > > > > > >>> > > > > > > >>>>>>>>>> Cheers, Richard On Sat, Jan 25, 2020
> at 11:12
> > > > > AM
> > > > > > >>> Richard Yu
> > > > > > >>> > > > > > > >>>>>>>>>> <yohan.richard...@gmail.com <mailto:
> > > > > > >>> > > > > > yohan.richard...@gmail.com
> > > > > > >>> > > > > > > >>>>>
> > > > > > >>> > > > > > > >>>>>>>>>> wrote: Hi all, Thanks for all the
> discussion!
> > > > > > >>> @John and @Bruno
> > > > > > >>> > > > > > > >>> I
> > > > > > >>> > > > > > > >>>>>>>>>> will survey other possible systems
> and see
> > > > > what I
> > > > > > >>> can do. Just
> > > > > > >>> > > > > > > >>> a
> > > > > > >>> > > > > > > >>>>>>>>>> question, by systems, I suppose you
> would mean
> > > > > > >>> the pros and
> > > > > > >>> > > > > > > >>> cons
> > > > > > >>> > > > > > > >>>>>>>>>> of different reporting strategies?
> I'm not
> > > > > > >>> completely certain
> > > > > > >>> > > > > > > >>> on
> > > > > > >>> > > > > > > >>>>>>>>>> this point, so it would be great if
> you can
> > > > > > >>> clarify on that.
> > > > > > >>> > > > > > So
> > > > > > >>> > > > > > > >>>>>>>>>> here's what I got from all the
> discussion so
> > > > > far:
> > > > > > >>> - Since both
> > > > > > >>> > > > > > > >>>>>>>>>> Matthias and John seems to have come
> to a
> > > > > > >>> consensus on this,
> > > > > > >>> > > > > > > >>> then
> > > > > > >>> > > > > > > >>>>>>>>>> we will go for an all-round
> behavorial change
> > > > > for
> > > > > > >>> KTables.
> > > > > > >>> > > > > > > >>> After
> > > > > > >>> > > > > > > >>>>>>>>>> some thought, I decided that for now,
> an
> > > > > opt-out
> > > > > > >>> config will
> > > > > > >>> > > > > > > >>> not
> > > > > > >>> > > > > > > >>>>>>>>>> be added. As John have pointed out,
> no-op
> > > > > changes
> > > > > > >>> tend to
> > > > > > >>> > > > > > > >>>>>>>>>> explode further down the topology as
> they are
> > > > > > >>> forwarded to
> > > > > > >>> > > > > > more
> > > > > > >>> > > > > > > >>>>>>>>>> and more processor nodes downstream.
> - About
> > > > > > >>> using hash codes,
> > > > > > >>> > > > > > > >>>>>>>>>> after some explanation from John, it
> looks
> > > > > like
> > > > > > >>> hash codes
> > > > > > >>> > > > > > > >>> might
> > > > > > >>> > > > > > > >>>>>>>>>> not be as ideal (for implementation).
> For
> > > > > now, we
> > > > > > >>> will omit
> > > > > > >>> > > > > > > >>> that
> > > > > > >>> > > > > > > >>>>>>>>>> detail, and save it for the PR. -
> @Bruno You
> > > > > do
> > > > > > >>> have valid
> > > > > > >>> > > > > > > >>>>>>>>>> concerns. Though, I am not completely
> certain
> > > > > if
> > > > > > >>> we want to do
> > > > > > >>> > > > > > > >>>>>>>>>> emit-on-change only for materialized
> KTables.
> > > > > I
> > > > > > >>> will put it
> > > > > > >>> > > > > > > >>> down
> > > > > > >>> > > > > > > >>>>>>>>>> in the KIP regardless. I will do my
> best to
> > > > > > >>> address all points
> > > > > > >>> > > > > > > >>>>>>>>>> raised so far on the discussion. Hope
> we could
> > > > > > >>> keep this
> > > > > > >>> > > > > > going!
> > > > > > >>> > > > > > > >>>>>>>>>> Best, Richard On Fri, Jan 24, 2020 at
> 6:07 PM
> > > > > > >>> Bruno Cadonna
> > > > > > >>> > > > > > > >>>>>>>>>> <br...@confluent.io <mailto:
> > > > > br...@confluent.io>>
> > > > > > >>> wrote: Thank
> > > > > > >>> > > > > > > >>> you
> > > > > > >>> > > > > > > >>>>>>>>>> Matthias for the use cases! Looking
> at both
> > > > > use
> > > > > > >>> cases, I think
> > > > > > >>> > > > > > > >>>>>>>>>> you need to elaborate on them in the
> KIP,
> > > > > > >>> Richard. Emit from
> > > > > > >>> > > > > > > >>>>>>>>>> plain KTable: I agree with Matthias
> that the
> > > > > > >>> lower timestamp
> > > > > > >>> > > > > > > >>>>>>>>>> makes sense because it marks the
> start of the
> > > > > > >>> validity of the
> > > > > > >>> > > > > > > >>>>>>>>>> record. Idempotent records with a
> higher
> > > > > > >>> timestamp can be
> > > > > > >>> > > > > > > >>> safely
> > > > > > >>> > > > > > > >>>>>>>>>> ignored. A corner case that I
> discussed with
> > > > > > >>> Matthias offline
> > > > > > >>> > > > > > > >>> is
> > > > > > >>> > > > > > > >>>>>>>>>> when we do not materialize a KTable
> due to
> > > > > > >>> optimization. Then
> > > > > > >>> > > > > > > >>> we
> > > > > > >>> > > > > > > >>>>>>>>>> cannot avoid the idempotent records
> because
> > > > > we do
> > > > > > >>> not keep the
> > > > > > >>> > > > > > > >>>>>>>>>> first record with the lower timestamp
> to
> > > > > compare
> > > > > > >>> to. Emit from
> > > > > > >>> > > > > > > >>>>>>>>>> KTable with aggregations: If we
> specify that
> > > > > an
> > > > > > >>> aggregation
> > > > > > >>> > > > > > > >>>>>>>>>> result should have the highest
> timestamp of
> > > > > the
> > > > > > >>> records that
> > > > > > >>> > > > > > > >>>>>>>>>> participated in the aggregation, we
> cannot
> > > > > ignore
> > > > > > >>> any
> > > > > > >>> > > > > > > >>> idempotent
> > > > > > >>> > > > > > > >>>>>>>>>> records. Admittedly, the result of an
> > > > > aggregation
> > > > > > >>> usually
> > > > > > >>> > > > > > > >>>>>>>>>> changes, but there are aggregations
> where the
> > > > > > >>> result may not
> > > > > > >>> > > > > > > >>>>>>>>>> change like min and max, or sum when
> the
> > > > > incoming
> > > > > > >>> records have
> > > > > > >>> > > > > > > >>> a
> > > > > > >>> > > > > > > >>>>>>>>>> value of zero. In those cases, we
> could
> > > > > benefit
> > > > > > >>> of the emit on
> > > > > > >>> > > > > > > >>>>>>>>>> change, but only if we define the
> semantics
> > > > > of the
> > > > > > >>> > > > > > aggregations
> > > > > > >>> > > > > > > >>>>>>>>>> to not use the highest timestamp of
> the
> > > > > > >>> participating records
> > > > > > >>> > > > > > > >>> for
> > > > > > >>> > > > > > > >>>>>>>>>> the result. In Kafka Streams, we do
> not have
> > > > > min,
> > > > > > >>> max, and sum
> > > > > > >>> > > > > > > >>> as
> > > > > > >>> > > > > > > >>>>>>>>>> explicit aggregations, but we need to
> provide
> > > > > an
> > > > > > >>> API to define
> > > > > > >>> > > > > > > >>>>>>>>>> what timestamp should be used for the
> result
> > > > > of
> > > > > > >>> an aggregation
> > > > > > >>> > > > > > > >>> if
> > > > > > >>> > > > > > > >>>>>>>>>> we want to go down this path. All of
> this does
> > > > > > >>> not block this
> > > > > > >>> > > > > > > >>> KIP
> > > > > > >>> > > > > > > >>>>>>>>>> and I just wanted to put this aspects
> up for
> > > > > > >>> discussion. The
> > > > > > >>> > > > > > > >>> KIP
> > > > > > >>> > > > > > > >>>>>>>>>> can limit itself to emit from
> materialized
> > > > > > >>> KTables. However,
> > > > > > >>> > > > > > > >>> the
> > > > > > >>> > > > > > > >>>>>>>>>> limits should be explicitly stated in
> the KIP.
> > > > > > >>> Best, Bruno
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>> On Fri, Jan 24, 2020 at 10:58 AM
> Matthias J.
> > > > > Sax
> > > > > > >>> > > > > > > >>>>>>>>>> <matth...@confluent.io <mailto:
> > > > > > >>> matth...@confluent.io>> wrote:
> > > > > > >>> > > > > > > >>>>>>>>>> IMHO, the question about semantics
> depends on
> > > > > the
> > > > > > >>> use case, in
> > > > > > >>> > > > > > > >>>>>>>>>> particular on the origin of a KTable.
> If
> > > > > there is
> > > > > > >>> a changlog
> > > > > > >>> > > > > > > >>>>>>>>>> topic that one reads directly into a
> KTable,
> > > > > > >>> emit-on-change
> > > > > > >>> > > > > > > >>> does
> > > > > > >>> > > > > > > >>>>>>>>>> actually make sense, because the
> timestamp
> > > > > > >>> indicates _when_
> > > > > > >>> > > > > > the
> > > > > > >>> > > > > > > >>>>>>>>>> update was _effective_. For this
> case, it is
> > > > > > >>> semantically
> > > > > > >>> > > > > > sound
> > > > > > >>> > > > > > > >>>>>>>>>> to _not_ update the timestamp in the
> store,
> > > > > > >>> because the second
> > > > > > >>> > > > > > > >>>>>>>>>> update is actually idempotent and
> advancing
> > > > > the
> > > > > > >>> timestamp is
> > > > > > >>> > > > > > > >>> not
> > > > > > >>> > > > > > > >>>>>>>>>> ideal (one could even consider it to
> be wrong
> > > > > to
> > > > > > >>> advance the
> > > > > > >>> > > > > > > >>>>>>>>>> timestamp) because the "valid time"
> of the
> > > > > record
> > > > > > >>> pair did not
> > > > > > >>> > > > > > > >>>>>>>>>> change. This reasoning also applies to
> > > > > > >>> KTable-KTable joins.
> > > > > > >>> > > > > > > >>>>>>>>>> However, if the KTable is the result
> of an
> > > > > > >>> aggregation, I
> > > > > > >>> > > > > > think
> > > > > > >>> > > > > > > >>>>>>>>>> emit-on-update is more natural,
> because the
> > > > > > >>> timestamp reflects
> > > > > > >>> > > > > > > >>>>>>>>>> the _last_ time (ie, highest
> timestamp) of all
> > > > > > >>> input records
> > > > > > >>> > > > > > > >>> the
> > > > > > >>> > > > > > > >>>>>>>>>> contributed to the result. Hence,
> updating the
> > > > > > >>> timestamp and
> > > > > > >>> > > > > > > >>>>>>>>>> emitting a new record actually sounds
> correct
> > > > > to
> > > > > > >>> me. This
> > > > > > >>> > > > > > > >>> applies
> > > > > > >>> > > > > > > >>>>>>>>>> to windowed and non-windowed
> aggregations
> > > > > IMHO.
> > > > > > >>> However,
> > > > > > >>> > > > > > > >>>>>>>>>> considering the argument that the
> timestamp
> > > > > > >>> should not be
> > > > > > >>> > > > > > > >>> update
> > > > > > >>> > > > > > > >>>>>>>>>> in the first case in the store to
> begin with,
> > > > > > >>> both cases are
> > > > > > >>> > > > > > > >>>>>>>>>> actually the same, and both can be
> modeled as
> > > > > > >>> emit-on-change:
> > > > > > >>> > > > > > > >>> if
> > > > > > >>> > > > > > > >>>>>>>>>> a `table()` operator does not update
> the
> > > > > > >>> timestamp if the
> > > > > > >>> > > > > > value
> > > > > > >>> > > > > > > >>>>>>>>>> does not change, there is _no_ change
> and thus
> > > > > > >>> nothing is
> > > > > > >>> > > > > > > >>>>>>>>>> emitted. At the same time, if an
> aggregation
> > > > > > >>> operator does
> > > > > > >>> > > > > > > >>> update
> > > > > > >>> > > > > > > >>>>>>>>>> the timestamp (even if the value does
> not
> > > > > change)
> > > > > > >>> there _is_ a
> > > > > > >>> > > > > > > >>>>>>>>>> change and we emit. Note that handling
> > > > > > >>> out-of-order data for
> > > > > > >>> > > > > > > >>>>>>>>>> aggregations would also work
> seamlessly with
> > > > > this
> > > > > > >>> approach --
> > > > > > >>> > > > > > > >>> for
> > > > > > >>> > > > > > > >>>>>>>>>> out-of-order records, the timestamp
> does never
> > > > > > >>> change, and
> > > > > > >>> > > > > > > >>> thus,
> > > > > > >>> > > > > > > >>>>>>>>>> we only emit if the result itself
> changes.
> > > > > > >>> Therefore, I would
> > > > > > >>> > > > > > > >>>>>>>>>> argue that we might not even need any
> config,
> > > > > > >>> because the
> > > > > > >>> > > > > > > >>>>>>>>>> emit-on-change behavior is just
> correct and
> > > > > > >>> reduced the
> > > > > > >>> > > > > > > >>>>>>>>>> downstream load, while our current
> behavior is
> > > > > > >>> not ideal (even
> > > > > > >>> > > > > > > >>> if
> > > > > > >>> > > > > > > >>>>>>>>>> it's also correct). Thoughts?
> -Matthias On
> > > > > > >>> 1/24/20 9:37 AM,
> > > > > > >>> > > > > > > >>> John
> > > > > > >>> > > > > > > >>>>>>>>>> Roesler wrote: Hi Bruno, Thanks for
> that
> > > > > idea. I
> > > > > > >>> hadn't
> > > > > > >>> > > > > > > >>>>>>>>>> considered that option before, and it
> does
> > > > > seem
> > > > > > >>> like that
> > > > > > >>> > > > > > would
> > > > > > >>> > > > > > > >>>>>>>>>> be the right place to put it if we
> think it
> > > > > might
> > > > > > >>> be
> > > > > > >>> > > > > > > >>> semantically
> > > > > > >>> > > > > > > >>>>>>>>>> important to control on a
> table-by-table
> > > > > basis. I
> > > > > > >>> had been
> > > > > > >>> > > > > > > >>>>>>>>>> thinking of it less semantically and
> more
> > > > > > >>> practically. In the
> > > > > > >>> > > > > > > >>>>>>>>>> context of a large topology, or more
> > > > > generally, a
> > > > > > >>> large
> > > > > > >>> > > > > > > >>> software
> > > > > > >>> > > > > > > >>>>>>>>>> system that contains many topologies
> and other
> > > > > > >>> event-driven
> > > > > > >>> > > > > > > >>>>>>>>>> systems, each no-op result becomes an
> input
> > > > > that
> > > > > > >>> is destined
> > > > > > >>> > > > > > to
> > > > > > >>> > > > > > > >>>>>>>>>> itself become a no-op result, and so
> on, all
> > > > > the
> > > > > > >>> way through
> > > > > > >>> > > > > > > >>> the
> > > > > > >>> > > > > > > >>>>>>>>>> system. Thus, a single pointless
> processing
> > > > > > >>> result becomes
> > > > > > >>> > > > > > > >>>>>>>>>> amplified into a large number of
> pointless
> > > > > > >>> computations, cache
> > > > > > >>> > > > > > > >>>>>>>>>> perturbations, and network and disk
> I/O
> > > > > > >>> operations. If you
> > > > > > >>> > > > > > also
> > > > > > >>> > > > > > > >>>>>>>>>> consider operations with fan-out
> implications,
> > > > > > >>> like branching
> > > > > > >>> > > > > > > >>> or
> > > > > > >>> > > > > > > >>>>>>>>>> foreign-key joins, the wasted
> resources are
> > > > > > >>> amplified not just
> > > > > > >>> > > > > > > >>> in
> > > > > > >>> > > > > > > >>>>>>>>>> proportion to the size of the system,
> but the
> > > > > > >>> size of the
> > > > > > >>> > > > > > > >>> system
> > > > > > >>> > > > > > > >>>>>>>>>> times the average fan-out (to the
> power of the
> > > > > > >>> number of
> > > > > > >>> > > > > > > >>> fan-out
> > > > > > >>> > > > > > > >>>>>>>>>> operations on the path(s) through the
> > > > > system). In
> > > > > > >>> my time
> > > > > > >>> > > > > > > >>>>>>>>>> operating such systems, I've observed
> these
> > > > > > >>> effects to be very
> > > > > > >>> > > > > > > >>>>>>>>>> real, and actually, the system and
> use case
> > > > > > >>> doesn't have to be
> > > > > > >>> > > > > > > >>>>>>>>>> very large before the amplification
> poses an
> > > > > > >>> existential
> > > > > > >>> > > > > > threat
> > > > > > >>> > > > > > > >>>>>>>>>> to the system as a whole. This is the
> basis
> > > > > of my
> > > > > > >>> advocating
> > > > > > >>> > > > > > > >>> for
> > > > > > >>> > > > > > > >>>>>>>>>> a simple behavior change, rather than
> an
> > > > > opt-in
> > > > > > >>> config of any
> > > > > > >>> > > > > > > >>>>>>>>>> kind. It seems like Streams should
> "do the
> > > > > right
> > > > > > >>> thing" for
> > > > > > >>> > > > > > the
> > > > > > >>> > > > > > > >>>>>>>>>> majority use case. My theory (which
> may be
> > > > > wrong)
> > > > > > >>> is that the
> > > > > > >>> > > > > > > >>>>>>>>>> majority use case is more like
> "relational
> > > > > > >>> queries" than "CEP
> > > > > > >>> > > > > > > >>>>>>>>>> queries". Even if you were doing some
> > > > > > >>> event-sensitive
> > > > > > >>> > > > > > > >>>>>>>>>> computation, wouldn't you do them as
> Stream
> > > > > > >>> operations (where
> > > > > > >>> > > > > > > >>>>>>>>>> this feature is inapplicable anyway)?
> In
> > > > > keeping
> > > > > > >>> with the
> > > > > > >>> > > > > > > >>>>>>>>>> "practical" perspective, I suggested
> the
> > > > > opt-out
> > > > > > >>> config only
> > > > > > >>> > > > > > in
> > > > > > >>> > > > > > > >>>>>>>>>> the (I think unlikely) event that
> filtering
> > > > > out
> > > > > > >>> pointless
> > > > > > >>> > > > > > > >>> updates
> > > > > > >>> > > > > > > >>>>>>>>>> actually harms performance. I'd also
> be
> > > > > perfectly
> > > > > > >>> fine without
> > > > > > >>> > > > > > > >>>>>>>>>> the opt-out config. I really think
> that
> > > > > (because
> > > > > > >>> of the
> > > > > > >>> > > > > > > >>> timestamp
> > > > > > >>> > > > > > > >>>>>>>>>> semantics work already underway),
> we're
> > > > > already
> > > > > > >>> pre-fetching
> > > > > > >>> > > > > > > >>> the
> > > > > > >>> > > > > > > >>>>>>>>>> prior result most of the time, so
> there would
> > > > > > >>> actually be very
> > > > > > >>> > > > > > > >>>>>>>>>> little extra I/O involved in
> implementing
> > > > > > >>> emit-on-change.
> > > > > > >>> > > > > > > >>>>>>>>>> However, we should consider whether my
> > > > > experience
> > > > > > >>> is likely to
> > > > > > >>> > > > > > > >>>>>>>>>> be general. Do you have some use case
> in mind
> > > > > for
> > > > > > >>> which you'd
> > > > > > >>> > > > > > > >>>>>>>>>> actually want some KTable results to
> be
> > > > > > >>> emit-on-update for
> > > > > > >>> > > > > > > >>>>>>>>>> semantic reasons? Thanks, -John
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>> On Fri, Jan 24, 2020, at 11:02, Bruno
> Cadonna
> > > > > > >>> wrote: Hi
> > > > > > >>> > > > > > > >>> Richard,
> > > > > > >>> > > > > > > >>>>>>>>>> Thank you for the KIP. I agree with
> John that
> > > > > we
> > > > > > >>> should focus
> > > > > > >>> > > > > > > >>> on
> > > > > > >>> > > > > > > >>>>>>>>>> the interface and behavior change in
> a KIP. We
> > > > > > >>> can discuss the
> > > > > > >>> > > > > > > >>>>>>>>>> implementation later. I am also +1
> for the
> > > > > > >>> survey. I had a
> > > > > > >>> > > > > > > >>>>>>>>>> thought about this. Couldn't we
> consider
> > > > > > >>> emit-on-change to be
> > > > > > >>> > > > > > > >>> one
> > > > > > >>> > > > > > > >>>>>>>>>> config of suppress (like
> `untilWindowCloses`)?
> > > > > > >>> What you
> > > > > > >>> > > > > > > >>>>>>>>>> basically propose is to suppress
> updates if
> > > > > they
> > > > > > >>> do not change
> > > > > > >>> > > > > > > >>>>>>>>>> the result. Considering emit on
> change as a
> > > > > > >>> flavour of
> > > > > > >>> > > > > > suppress
> > > > > > >>> > > > > > > >>>>>>>>>> would be more flexible because it
> would
> > > > > specify
> > > > > > >>> the behavior
> > > > > > >>> > > > > > > >>>>>>>>>> locally for a KTable instead of
> globally for
> > > > > all
> > > > > > >>> KTables.
> > > > > > >>> > > > > > > >>>>>>>>>> Additionally, specifying the behavior
> in one
> > > > > > >>> place instead of
> > > > > > >>> > > > > > > >>>>>>>>>> multiple places feels more intuitive
> and
> > > > > > >>> consistent to me.
> > > > > > >>> > > > > > > >>> Best,
> > > > > > >>> > > > > > > >>>>>>>>>> Bruno On Fri, Jan 24, 2020 at 7:49 AM
> John
> > > > > Roesler
> > > > > > >>> > > > > > > >>>>>>>>>> <vvcep...@apache.org <mailto:
> > > > > vvcep...@apache.org>>
> > > > > > >>> wrote: Hi
> > > > > > >>> > > > > > > >>>>>>>>>> Richard, Thanks for picking this up!
> I know
> > > > > of at
> > > > > > >>> least one
> > > > > > >>> > > > > > > >>> large
> > > > > > >>> > > > > > > >>>>>>>>>> community member for which this
> feature is
> > > > > > >>> absolutely
> > > > > > >>> > > > > > > >>> essential.
> > > > > > >>> > > > > > > >>>>>>>>>> If I understand your two options, it
> seems
> > > > > like
> > > > > > >>> the proposal
> > > > > > >>> > > > > > is
> > > > > > >>> > > > > > > >>>>>>>>>> to implement it as a behavior change
> > > > > regardless,
> > > > > > >>> and the
> > > > > > >>> > > > > > > >>> question
> > > > > > >>> > > > > > > >>>>>>>>>> is whether to provide an opt-out
> config or
> > > > > not.
> > > > > > >>> Given that any
> > > > > > >>> > > > > > > >>>>>>>>>> implementation of this feature would
> have some
> > > > > > >>> performance
> > > > > > >>> > > > > > > >>> impact
> > > > > > >>> > > > > > > >>>>>>>>>> under some workloads, and also that
> we don't
> > > > > know
> > > > > > >>> if anyone
> > > > > > >>> > > > > > > >>>>>>>>>> really depends on emit-on-update time
> > > > > semantics,
> > > > > > >>> it seems like
> > > > > > >>> > > > > > > >>> we
> > > > > > >>> > > > > > > >>>>>>>>>> should propose to add an opt-out
> config. Can
> > > > > you
> > > > > > >>> update the
> > > > > > >>> > > > > > KIP
> > > > > > >>> > > > > > > >>>>>>>>>> to mention the exact config key and
> value(s)
> > > > > > >>> you'd propose?
> > > > > > >>> > > > > > > >>> Just
> > > > > > >>> > > > > > > >>>>>>>>>> to move the discussion forward, maybe
> > > > > something
> > > > > > >>> like: emit.on
> > > > > > >>> > > > > > > >>> :=
> > > > > > >>> > > > > > > >>>>>>>>>> change|update with the new default
> being
> > > > > "change"
> > > > > > >>> Thanks for
> > > > > > >>> > > > > > > >>>>>>>>>> pointing out the timestamp issue in
> > > > > particular. I
> > > > > > >>> agree that
> > > > > > >>> > > > > > if
> > > > > > >>> > > > > > > >>>>>>>>>> we discard the latter update as a
> no-op, then
> > > > > we
> > > > > > >>> also have to
> > > > > > >>> > > > > > > >>>>>>>>>> discard its timestamp (obviously, we
> don't
> > > > > > >>> forward the
> > > > > > >>> > > > > > > >>> timestamp
> > > > > > >>> > > > > > > >>>>>>>>>> update, as that's the whole point,
> but we also
> > > > > > >>> can't update
> > > > > > >>> > > > > > the
> > > > > > >>> > > > > > > >>>>>>>>>> timestamp in the store, as the store
> must
> > > > > remain
> > > > > > >>> consistent
> > > > > > >>> > > > > > > >>> with
> > > > > > >>> > > > > > > >>>>>>>>>> what has been emitted). I have to
> confess
> > > > > that I
> > > > > > >>> disagree with
> > > > > > >>> > > > > > > >>>>>>>>>> your implementation proposal, but
> it's also
> > > > > not
> > > > > > >>> necessary to
> > > > > > >>> > > > > > > >>>>>>>>>> discuss implementation in the KIP.
> Maybe it
> > > > > would
> > > > > > >>> be less
> > > > > > >>> > > > > > > >>>>>>>>>> controversial if you just drop that
> section
> > > > > for
> > > > > > >>> now, so that
> > > > > > >>> > > > > > > >>> the
> > > > > > >>> > > > > > > >>>>>>>>>> KIP discussion can focus on the
> behavior
> > > > > change
> > > > > > >>> and config.
> > > > > > >>> > > > > > > >>> Just
> > > > > > >>> > > > > > > >>>>>>>>>> for reference, there is some research
> into
> > > > > this
> > > > > > >>> domain. For
> > > > > > >>> > > > > > > >>>>>>>>>> example, see the "Report" section
> (3.2.3) of
> > > > > the
> > > > > > >>> SECRET paper:
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>
> > > > > > >>> > > > > >
> > > > > > >>>
> > > > >
> https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fpeop
> > > > > > >>> > > > > > > > le.csail.mit.edu
> > > > > > >>> > > > > > > >>>>>>
> > > > > > >>> %2Ftatbul%2Fpublications%2Fmaxstream_vldb10.pdf&amp;data
> > > > > > >>> > > > > > > > =02%7C01%7CThomas.Becker%40tivo.com
> > > > > > >>> > > > > > > >>>>>> %7Ce0235483b1eb4f259c5c08d7a8d1c16b%7
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > >>>>>>
> > > > > > >>> > > > > > > >>>
> > > > > > >>> > > > > >
> > > > > > >>>
> > > > >
> Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C1%7C637163491160859282&amp;sdata
> > > > > > >>> > > > > > > >
> > > > > > >>>
> =4dSGIS8jNPAPP7B48r9e%2BUgFh3WdmzVyXhyT63eP8dI%3D&amp;reserved=0
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > > It might help to round out the proposal if you
> take a
> > > > > brief
> > > > > > >>> > > > > > > >>> survey of
> > > > > > >>> > > > > > > >>>>>>>>>> the behaviors of other systems, along
> with
> > > > > pros
> > > > > > >>> and cons if
> > > > > > >>> > > > > > any
> > > > > > >>> > > > > > > >>>>>>>>>> are reported. Thanks, -John
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>> On Fri, Jan 10, 2020, at 22:27,
> Richard Yu
> > > > > wrote:
> > > > > > >>> Hi
> > > > > > >>> > > > > > everybody!
> > > > > > >>> > > > > > > >>>>>>>>>> I'd like to propose a change that we
> probably
> > > > > > >>> should've added
> > > > > > >>> > > > > > > >>> for
> > > > > > >>> > > > > > > >>>>>>>>>> a long time now. The key benefit of
> this KIP
> > > > > > >>> would be reduced
> > > > > > >>> > > > > > > >>>>>>>>>> traffic in Kafka Streams since a lot
> of no-op
> > > > > > >>> results would no
> > > > > > >>> > > > > > > >>>>>>>>>> longer be sent downstream. Here is
> the KIP for
> > > > > > >>> reference.
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>
> > > > > > >>> > > > > >
> > > > > > >>>
> > > > >
> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwi
> > > > > > >>> > > > > > > > ki.apache.org
> > > > > > >>> > > > > > > >>>>>>
> > > > > > >>> %2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-557%253A%2BAdd%2Bemit
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > >>>>>>
> > > > > > >>> > > > > > > >>>
> > > > > > >>> > > > > >
> > > > > > >>>
> > > > >
> %2Bon%2Bchange%2Bsupport%2Bfor%2BKafka%2BStreams&amp;data=02%7C01%7CThom
> > > > > > >>> > > > > > > > as.Becker%40tivo.com
> > > > > > >>> > > > > > > >>>>>>
> > > > > %7Ce0235483b1eb4f259c5c08d7a8d1c16b%7Cd05b7c6912014c
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > >>>>>>
> > > > > > >>> > > > > > > >>>
> > > > > > >>> > > > > >
> > > > > > >>>
> > > > >
> 0db45d7f1dcc227e4d%7C1%7C1%7C637163491160869277&amp;sdata=zYpCSFOsyN4%2B
> > > > > > >>> > > > > > > >
> 4rKRZBQ%2FZvcGQ4EINR9Qm6PLsB7EKrc%3D&amp;reserved=0
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > > Currently, I seek to formalize our approach
> for this
> > > > > KIP
> > > > > > >>> first
> > > > > > >>> > > > > > > >>> before
> > > > > > >>> > > > > > > >>>>>>>>>> we determine concrete API additions /
> > > > > > >>> configurations. Some
> > > > > > >>> > > > > > > >>>>>>>>>> configs might warrant adding, whiles
> others
> > > > > are
> > > > > > >>> not necessary
> > > > > > >>> > > > > > > >>>>>>>>>> since adding them would only increase
> > > > > complexity
> > > > > > >>> of Kafka
> > > > > > >>> > > > > > > >>>>>>>>>> Streams. Cheers, Richard
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>> ________________________________ This
> email
> > > > > and
> > > > > > >>> any
> > > > > > >>> > > > > > attachments
> > > > > > >>> > > > > > > >>>>>>>>>> may contain confidential and
> privileged
> > > > > material
> > > > > > >>> for the sole
> > > > > > >>> > > > > > > >>> use
> > > > > > >>> > > > > > > >>>>>>>>>> of the intended recipient. Any review,
> > > > > copying, or
> > > > > > >>> > > > > > distribution
> > > > > > >>> > > > > > > >>>>>>>>>> of this email (or any attachments) by
> others
> > > > > is
> > > > > > >>> prohibited. If
> > > > > > >>> > > > > > > >>>>>>>>>> you are not the intended recipient,
> please
> > > > > > >>> contact the sender
> > > > > > >>> > > > > > > >>>>>>>>>> immediately and permanently delete
> this email
> > > > > and
> > > > > > >>> any
> > > > > > >>> > > > > > > >>>>>>>>>> attachments. No employee or agent of
> TiVo is
> > > > > > >>> authorized to
> > > > > > >>> > > > > > > >>>>>>>>>> conclude any binding agreement on
> behalf of
> > > > > TiVo
> > > > > > >>> by email.
> > > > > > >>> > > > > > > >>>>>>>>>> Binding agreements with TiVo may only
> be made
> > > > > by
> > > > > > >>> a signed
> > > > > > >>> > > > > > > >>> written
> > > > > > >>> > > > > > > >>>>>>>>>> agreement. -- *Tommy Becker*
> *Principal
> > > > > Engineer *
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>> *Personalized Content Discovery*
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>> *O* +1 919.460.4747 *tivo.com* <
> > > > > > >>> http://www.tivo.com/>
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>> This email and any attachments may
> contain
> > > > > > >>> confidential and
> > > > > > >>> > > > > > > >>>>>>>>>> privileged material for the sole use
> of the
> > > > > > >>> intended
> > > > > > >>> > > > > > recipient.
> > > > > > >>> > > > > > > >>>>>>>>>> Any review, copying, or distribution
> of this
> > > > > > >>> email (or any
> > > > > > >>> > > > > > > >>>>>>>>>> attachments) by others is prohibited.
> If you
> > > > > are
> > > > > > >>> not the
> > > > > > >>> > > > > > > >>> intended
> > > > > > >>> > > > > > > >>>>>>>>>> recipient, please contact the sender
> > > > > immediately
> > > > > > >>> and
> > > > > > >>> > > > > > > >>> permanently
> > > > > > >>> > > > > > > >>>>>>>>>> delete this email and any
> attachments. No
> > > > > > >>> employee or agent of
> > > > > > >>> > > > > > > >>>>>>>>>> TiVo is authorized to conclude any
> binding
> > > > > > >>> agreement on behalf
> > > > > > >>> > > > > > > >>> of
> > > > > > >>> > > > > > > >>>>>>>>>> TiVo by email. Binding agreements
> with TiVo
> > > > > may
> > > > > > >>> only be made
> > > > > > >>> > > > > > > >>> by a
> > > > > > >>> > > > > > > >>>>>>>>>> signed written agreement.
> > > > > > >>> > > > > > > >>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>> --
> > > > > > >>> > > > > > > >>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>> *Tommy Becker* /Principal Engineer /
> > > > > > >>> > > > > > > >>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>> /Personalized Content Discovery/
> > > > > > >>> > > > > > > >>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>> *O* +1 919.460.4747 *tivo.com* <
> > > > > > >>> http://www.tivo.com/>
> > > > > > >>> > > > > > > >>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>>>>
> > > > > > >>> > > > > > > >>>>>>
> > > > > > >>> > > > > >
> > > > > > >>>
> > > > >
> ----------------------------------------------------------------------
> > > > > > >>> > > > > > > >>>>>>>
> > > > > > >>> > > > > > > >>>>>>
> > > > > > >>> > > > > > > >>>>>
> > > > > > >>> > > > > > > >>>>
> > > > > > >>> > > > > > > >>>
> > > > > > >>> > > > > > > >>
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > >
> > > > > > >>> > > > > > >
> > > > > > >>> > > > > > > Attachments:
> > > > > > >>> > > > > > > * signature.asc
> > > > > > >>> > > > > >
> > > > > > >>> > > >
> > > > > > >>> >
> > > > > > >>>
> > > > > > >>
> > > > > >
> > > > >
> > >
> > >
>

Reply via email to