Hi John,

I am glad to help you with your imagination. With overhead, I mainly
meant the additional lookup into the state store to get the current
value, but I see now in the code that we do that lookup anyways
(although I think we could avoid that in the cases where we do not
need the old value). With or without config, we need to evaluate the
performance benefits of this change, in any case.

Best,
Bruno

On Wed, Feb 19, 2020 at 7:48 PM John Roesler <vvcep...@apache.org> wrote:
>
> Thanks for your remarks, Bruno!
>
> I'm in favor of standardizing on terminology like "not forwarding
> idempotent updates" or "dropping idempotent updates". Maybe we
> should make a pass on the KIP and just convert everything to this
> phrasing. In retrospect, even the term "emit-on-change" has too much
> semantic baggage, since it implies the semantics from the SECRET
> paper, which we don't really want to imply here.
>
> I'm also in favor of the metric as you propose.
>
> Likewise with stream aggregations, I was also under the impression
> that we agreed on dropping idempotent updates to the aggregation
> result, any time we find that our "new" (key, value, timestamp) result
> is identical to the prior one.
>
> Also, I'm +1 on all your recommendations for updating the KIP document
> for clarity.
>
> Regarding the opt-out config. Perhaps I'm suffering from a failure of
> imagination, but I don't see how the current proposal could really have
> a measurable impact on latency. If all we do is make a single extra pass
> to compare two byte arrays for equality, only in the cases where we already
> have the byte arrays available, it seems unlikely to measurably affect the
> processing of non-idempotent updates. It seems guaranteed to _decrease_
> the latency of processing idempotent updates, since we get to skip a
> store#put, at least one producer#send, and also all downstream processing,
> including all the disk and network operations associated with downstream
> operations.
>
> It seems like if we're pretty sure this change would only _help_, we shouldn't
> introduce the operational burden of an extra configuration. If we want to
> be more aggressive about dropping idempotent operations in the future,
> such as depending on equals() or adding a ChangeDetector interface, then
> we should consider adding a configuration as part of that future work. In
> fact, if we add a simple "opt-in/opt-out" switch right now, we might find
> that it's actually insufficient for whatever future feature we might propose,
> then we have a mess of deprecating the opt-out config and replacing it.
>
> What do you think?
> -John
>
> On Wed, Feb 19, 2020, at 09:50, Bruno Cadonna wrote:
> > Hi all,
> >
> > Sorry for the late reply!
> >
> > I am also in favour of baby steps.
> >
> > I am undecided whether the KIP should contain a opt-out config or not.
> > The overhead of emit-on-change might affect latency. For applications
> > where low latency is crucial and there are not too many idempotent
> > updates, it would be better to fall back to emit-on-update. However,
> > we do not know how much emit-on-change impacts latency. We would first
> > need to benchmark that before we can decide about the opt-out-config.
> >
> > A metric of dropped idempotent updates seems useful to me to be
> > informed about potential upstream applications or upstream operators
> > that produce too many idempotent updates. The KIP should state the
> > name of the metric, its group, its tags, and its recording level (see
> > KIP-444 or KIP-471 for examples). I propose DEBUG as reporting level.
> >
> > Richard, what competing proposals for emit-on-change for aggregations
> > do you mean? I have the feeling that we agreed to get rid of
> > idempotent updates if the aggregate is updated with the same key,
> > value, AND timestamp. I am also fine if we do not include this into
> > this KIP (remember: baby steps).
> >
> > You write that "emit-on-change is more correct". Since we agreed that
> > this is an optimization, IMO you cannot argue this way.
> >
> > Please put "Alternative Approaches" under "Rejected Alternatives", so
> > that it becomes clear that we are not going to implement them. In
> > general, I think the KIP needs a bit of clean-up (probably, you
> > already planned for it). "Design Reasoning" is a bit of behavior
> > changes, rejected alternatives and duplicates a bit the content in
> > those sections.
> >
> > I do not like the name "no-op operations" or "no-ops", because they
> > are rather generic. I like more "idempotent updates".
> >
> > Best,
> > Bruno
> >
> >
> > On Tue, Feb 18, 2020 at 7:25 PM Richard Yu <yohan.richard...@gmail.com> 
> > wrote:
> > >
> > > Hi all,
> > >
> > > We are definitely making progress!
> > >
> > > @John should I emphasize in the proposed behavior changes that we are only
> > > doing binary equality checks for stateful operators?
> > > It looks like we have come close to finalizing this part of the KIP. (I
> > > will note in the KIP that this proposal is intended for optimization, not
> > > semantics correctness)
> > >
> > > I do think maybe we still have one other detail we need to discuss. So 
> > > far,
> > > there has been quite a bit of back and forth about what the behavior of
> > > aggregations should look like in emit on change. I have seen
> > > multiple competing proposals, so I am not completely certain which one we
> > > should go with, or how we will be able to compromise in between them.
> > >
> > > Let me know what your thoughts are on this matter, since we are probably
> > > close to wrapping up most other stuff.
> > > @Matthias J. Sax <matth...@confluent.io>  and @Bruno, see what you think
> > > about this.
> > >
> > > Best,
> > > Richard
> > >
> > >
> > >
> > > On Tue, Feb 18, 2020 at 9:06 AM John Roesler <vvcep...@apache.org> wrote:
> > >
> > > > Thanks, Matthias!
> > > >
> > > > Regarding numbers, it would be hard to know how many applications
> > > > would benefit, since we don't know how many applications there are,
> > > > or anything about their data sets or topologies. We could do a survey,
> > > > but it seems overkill if we take the conservative approach.
> > > >
> > > > I have my own practical stream processing experience that tells me this
> > > > is absolutely critical for any moderate-to-large relational stream
> > > > processing use cases. I'll leave it to you to decide if you find that
> > > > convincing, but it's definitely not an _assumption_. I've also heard 
> > > > from
> > > > a few Streams users who have already had to implement their own
> > > > noop-suppression transformers in order to get to production scale.
> > > >
> > > > Regardless, it sounds like we can agree on taking an opportunistic 
> > > > approach
> > > > and targeting the optimization just to use a binary-equality check at
> > > > stateful operators. (I'd also suggest in sink nodes, when we are about 
> > > > to
> > > > send old and new values, since they are also already present and 
> > > > serialized
> > > > at that point.) We could make the KIP even more vague, and just say that
> > > > we'll drop no-op updates "when possible".
> > > >
> > > > I'm curious what Bruno and the others think about this. If it seems like
> > > > a good starting point, perhaps we could move to a vote soon and get to
> > > > work on the implementation!
> > > >
> > > > Thanks,
> > > > -John
> > > >
> > > > On Mon, Feb 17, 2020, at 20:54, Matthias J. Sax wrote:
> > > > > Talking about optimizations and reducing downstream load:
> > > > >
> > > > > Do we actually have any numbers? I have the impression that this KIP 
> > > > > is
> > > > > more or less build on the _assumption_ that there is a problem. Yes,
> > > > > there are some use cases that would benefit from this; But how many
> > > > > applications would actually benefit? And how much load reduction would
> > > > > they get?
> > > > >
> > > > > The simplest approach (following John idea to make baby steps) would 
> > > > > be
> > > > > to apply the emit-on-change pattern only if there is a store. For this
> > > > > case we need to serialize old and new result anyway and thus a simple
> > > > > byte-array comparison is no overhead.
> > > > >
> > > > > Sending `oldValues` by default would become expensive because we would
> > > > > need to serialize the recomputed old result, as well as the new 
> > > > > result,
> > > > > to make the comparison (and we now the serialization is not cheap). We
> > > > > are facing a trade-off between CPU overhead and downstream load and I 
> > > > > am
> > > > > not sure if we should hard code this. My original argument for sending
> > > > > `oldValues` was about semantics; but for an optimization, I am not 
> > > > > sure
> > > > > if this would be the right choice.
> > > > >
> > > > > For now, users who want to opt-in can force a materialization. A
> > > > > materialization may be expensive and if we see future demand, we could
> > > > > still add an option to send `oldValues` instead of materialization 
> > > > > (this
> > > > > would at least save the store overhead). As we consider the KIP an
> > > > > optimization, a "config" seems to make sense.
> > > > >
> > > > >
> > > > > -Matthias
> > > > >
> > > > >
> > > > > On 2/17/20 5:21 PM, Richard Yu wrote:
> > > > > > Hi John!
> > > > > >
> > > > > > Thanks for the reply.
> > > > > >
> > > > > > About the changes we have discussed so far. I think upon further
> > > > > > consideration, we have been mostly talking about this from the
> > > > perspective
> > > > > > that no stop-gap effort is acceptable. However, in recent 
> > > > > > discussion,
> > > > > if we
> > > > > > consider optimization, then it appears that the perspective I
> > > > mentioned no
> > > > > > longer applies. After all, we are no longer concerned so much about
> > > > > > semantics correctness, then reducing traffic as much as possible
> > > > without
> > > > > > performance tradeoffs.
> > > > > >
> > > > > > In this case, I think a cache would be a good idea for stateless
> > > > > > operations. This cache will not be backed by a store obviously. We 
> > > > > > can
> > > > > > probably use Kafka's ThreadCache. We should be able to catch a large
> > > > > > portion of the no-ops if we at least store some results in the 
> > > > > > cache.
> > > > Not
> > > > > > all will be caught, but I think the impact will be significant.
> > > > > >
> > > > > > On another note, I think that we should implement competing 
> > > > > > proposals
> > > > i.e.
> > > > > > one where we forward both old and new values with a reasonable
> > > > proportion
> > > > > > of artificial no-ops (we do not necessarily have to rely on equals 
> > > > > > so
> > > > much
> > > > > > as comparing the serialized binary data after the operation), and in
> > > > > > another scenario, the cache for stateless ops. It would be
> > > > unreasonable if
> > > > > > we completely disregard either approach, since they both have merit.
> > > > The
> > > > > > reason for implementing both is to perform benchmark tests on them, 
> > > > > > and
> > > > > > compare them with the original. This way, we can more clearly see 
> > > > > > what
> > > > is
> > > > > > the drawbacks and the gains. So far, we have been discussing only
> > > > > > hypotheticals, and if we continue to do so, I think it is likely no
> > > > ground
> > > > > > will be gained.
> > > > > >
> > > > > > After all, what we seek is optimization, and performance benchmarks
> > > > > will be
> > > > > > mandatory for a KIP of this nature.
> > > > > >
> > > > > > Hope this helps,
> > > > > > Richard
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Mon, Feb 17, 2020 at 2:12 PM John Roesler <vvcep...@apache.org>
> > > > wrote:
> > > > > >
> > > > > >> Hi again, all,
> > > > > >>
> > > > > >> Sorry on my part for my silence.
> > > > > >>
> > > > > >> I've just taken another look over the recent history of this
> > > > > discussion. It
> > > > > >> seems like the #1 point to clarify (because it affect everything
> > > > else) is
> > > > > >> that,
> > > > > >> yes, I was 100% envisioning this as an _optimization_.
> > > > > >>
> > > > > >> As a consequence, I don't think it's critical to make any hard
> > > > guarantees
> > > > > >> about
> > > > > >> what results get forwarded and what (no-op updates) get dropped. 
> > > > > >> I'd
> > > > > >> initially
> > > > > >> just been thinking about doing this opportunistically in cases 
> > > > > >> where
> > > > we
> > > > > >> already
> > > > > >> had the "old" and "new" result in memory, thanks to a request to
> > > > > "emit old
> > > > > >> values", or to the implementation of timestamp semantics.
> > > > > >>
> > > > > >> However, whether or not it's semantically critical, I do think that
> > > > > >> Matthias's
> > > > > >> idea to use the change-forwarding mechanism to check for no-ops 
> > > > > >> even
> > > > on
> > > > > >> stateless operations is pretty interesting. Specifically, this 
> > > > > >> would
> > > > > >> _really_
> > > > > >> let you pare down useless updates by using mapValues to strip down
> > > > > records
> > > > > >> only
> > > > > >> to what you really need. However, the dependence on the
> > > > implementation of
> > > > > >> equals() is troubling.
> > > > > >>
> > > > > >> It might make sense to table this idea, as well as my complicated
> > > > no-op
> > > > > >> detection algorithm, and initially propose just a nonconfigurable
> > > > feature
> > > > > >> to
> > > > > >> check "old" and "new" results for binary equality before 
> > > > > >> forwarding.
> > > > > I.e.,
> > > > > >> if
> > > > > >> any operation determines that the old and new results are
> > > > > >> binary-identical, we
> > > > > >> would not forward.
> > > > > >>
> > > > > >> I'll admit that this doesn't serve Tommy's use case very well, but 
> > > > > >> it
> > > > > >> might be
> > > > > >> better to take baby steps with an optimization like this and not 
> > > > > >> risk
> > > > > >> over-reaching in a way that actually harms performance or
> > > > correctness. We
> > > > > >> could
> > > > > >> always expand the feature to use equals() or some kind of
> > > > ChangeDetector
> > > > > >> later
> > > > > >> on, in a more focused discussion.
> > > > > >>
> > > > > >> Regarding metrics or debug logs, I guess I don't feel strongly, 
> > > > > >> but it
> > > > > >> feels
> > > > > >> like two things will happen that make it nicer to add them:
> > > > > >>
> > > > > >> 1. This feature is going to surprise/annoy _somebody_, and it 
> > > > > >> would be
> > > > > >> nice to
> > > > > >> be able to definitively say the reason that updates are dropped is
> > > > that
> > > > > >> they
> > > > > >> were no-ops. The easiest smoking gun is if there are debug-logs 
> > > > > >> that
> > > > > can be
> > > > > >> enabled. This person might just be looking at the dashboards,
> > > > > wondering why
> > > > > >> there are 100K updates per second going into their app, but only 1K
> > > > > >> results per
> > > > > >> second coming out. Having the metric there makes the accounting
> > > > easier.
> > > > > >>
> > > > > >> 2. Somebody is going to struggle with high-volume updates, and it
> > > > > would be
> > > > > >> nice
> > > > > >> for them to know that this feature is saving them X-thousand 
> > > > > >> updates
> > > > per
> > > > > >> second,
> > > > > >> etc.
> > > > > >>
> > > > > >> What does everyone think about this? Note, as I read it, what I've
> > > > said
> > > > > >> above is
> > > > > >> already reflected in the text of the KIP.
> > > > > >>
> > > > > >> Thanks,
> > > > > >> -John
> > > > > >>
> > > > > >>
> > > > > >> On Tue, Feb 11, 2020, at 18:27, Richard Yu wrote:
> > > > > >>> Hi all,
> > > > > >>>
> > > > > >>> Bumping this. If you feel that this KIP is not too urgent. Then 
> > > > > >>> let
> > > > me
> > > > > >>> know. :)
> > > > > >>>
> > > > > >>> Cheers,
> > > > > >>> Richard
> > > > > >>>
> > > > > >>> On Thu, Feb 6, 2020 at 4:55 PM Richard Yu <
> > > > yohan.richard...@gmail.com>
> > > > > >>> wrote:
> > > > > >>>
> > > > > >>>> Hi all,
> > > > > >>>>
> > > > > >>>> I've had just a few thoughts regarding the forwarding of <key,
> > > > > >>>> change<old_value, new_value>>. As Matthias already mentioned, 
> > > > > >>>> there
> > > > > >> are two
> > > > > >>>> separate priorities by which we can judge this KIP:
> > > > > >>>>
> > > > > >>>> 1. A optimization perspective: In this case, the user would 
> > > > > >>>> prefer
> > > > the
> > > > > >>>> impact of this KIP to be as minimal as possible. By such logic, 
> > > > > >>>> if
> > > > > >>>> stateless operations are performed twice, that could prove
> > > > > >> unacceptable for
> > > > > >>>> them. (since operations can prove expensive)
> > > > > >>>>
> > > > > >>>> 2. Semantics correctness perspective: Unlike the optimization
> > > > > >> approach, we
> > > > > >>>> are more concerned with all KTable operations obeying the same
> > > > emission
> > > > > >>>> policy. i.e. emit on change. In this case, a discrepancy would 
> > > > > >>>> not
> > > > be
> > > > > >>>> tolerated, even though an extra performance cost will be 
> > > > > >>>> incurred.
> > > > > >>>> Therefore, we will follow Matthias's approach, and then perform 
> > > > > >>>> the
> > > > > >>>> operation once on the old value, and once on the new.
> > > > > >>>>
> > > > > >>>> The issue here I think is more black and white than in between. 
> > > > > >>>> The
> > > > > >> second
> > > > > >>>> option in particular would be favorable for users with 
> > > > > >>>> inexpensive
> > > > > >>>> stateless operations, while for the former option, we are 
> > > > > >>>> probably
> > > > > >> dealing
> > > > > >>>> with more expensive ones. So the simplest solution is probably to
> > > > > >> allow the
> > > > > >>>> user to choose one of the behaviors, and have a config which can
> > > > > >> switch in
> > > > > >>>> between them.
> > > > > >>>>
> > > > > >>>> Its the simplest compromise I can come up with at the moment, 
> > > > > >>>> but if
> > > > > >> you
> > > > > >>>> think you have a better plan which could better balance 
> > > > > >>>> tradeoffs.
> > > > Then
> > > > > >>>> please let us know. :)
> > > > > >>>>
> > > > > >>>> Best,
> > > > > >>>> Richard
> > > > > >>>>
> > > > > >>>> On Wed, Feb 5, 2020 at 5:12 PM John Roesler <vvcep...@apache.org>
> > > > > >> wrote:
> > > > > >>>>
> > > > > >>>>> Hi all,
> > > > > >>>>>
> > > > > >>>>> Thanks for the thoughtful comments!
> > > > > >>>>>
> > > > > >>>>> I need more time to reflect on your thoughts, but just wanted to
> > > > offer
> > > > > >>>>> a quick clarification about equals().
> > > > > >>>>>
> > > > > >>>>> I only meant that we can't be sure if a class's equals()
> > > > > >> implementation
> > > > > >>>>> returns true for two semantically identical instances. I.e., if 
> > > > > >>>>> a
> > > > > >> class
> > > > > >>>>> doesn't
> > > > > >>>>> override the default equals() implementation, then we would see
> > > > > >> behavior
> > > > > >>>>> like:
> > > > > >>>>>
> > > > > >>>>> new MyPair("A", 1).equals(new MyPair("A", 1)) returns false
> > > > > >>>>>
> > > > > >>>>> In that case, I would still like to catch no-op updates by
> > > > comparing
> > > > > >> the
> > > > > >>>>> serialized form of the records when we happen to have it 
> > > > > >>>>> serialized
> > > > > >> anyway
> > > > > >>>>> (such as when the operation is stateful, or when we're sending 
> > > > > >>>>> to a
> > > > > >>>>> repartition topic and we have both the "new" and "old" value 
> > > > > >>>>> from
> > > > > >>>>> upstream).
> > > > > >>>>>
> > > > > >>>>> I didn't mean to suggest we'd try to use reflection to detect
> > > > whether
> > > > > >>>>> equals
> > > > > >>>>> is implemented, although that is a neat trick. I was thinking 
> > > > > >>>>> more
> > > > of
> > > > > >> a
> > > > > >>>>> belt-and-suspenders algorithm where we do the check for no-ops
> > > > based
> > > > > >> on
> > > > > >>>>> equals() and then _also_ check the serialized bytes for 
> > > > > >>>>> equality.
> > > > > >>>>>
> > > > > >>>>> Thanks,
> > > > > >>>>> -John
> > > > > >>>>>
> > > > > >>>>> On Wed, Feb 5, 2020, at 15:31, Ted Yu wrote:
> > > > > >>>>>> Thanks for the comments, Matthias.
> > > > > >>>>>>
> > > > > >>>>>> w.r.t. requirement of an `equals()` implementation, each 
> > > > > >>>>>> template
> > > > > >> type
> > > > > >>>>>> would have an equals() method. We can use the following code to
> > > > know
> > > > > >>>>>> whether it is provided by JVM or provided by user.
> > > > > >>>>>>
> > > > > >>>>>> boolean customEquals = false;
> > > > > >>>>>> try {
> > > > > >>>>>>     Class cls = value.getClass().getMethod("equals",
> > > > > >>>>>> Object.class).getDeclaringClass();
> > > > > >>>>>>     if (!Object.class.equals(cls)) {
> > > > > >>>>>>         customEquals = true;
> > > > > >>>>>>     }
> > > > > >>>>>> } catch (NoSuchMethodException nsme) {
> > > > > >>>>>>     // equals is always defined, this wouldn't hit
> > > > > >>>>>> }
> > > > > >>>>>>
> > > > > >>>>>> The next question is: what if the user doesn't provide equals()
> > > > > >> method ?
> > > > > >>>>>> Would we automatically fall back to emit-on-update ?
> > > > > >>>>>>
> > > > > >>>>>> Cheers
> > > > > >>>>>>
> > > > > >>>>>> On Tue, Feb 4, 2020 at 1:37 PM Matthias J. Sax 
> > > > > >>>>>> <mj...@apache.org>
> > > > > >>>>> wrote:
> > > > > >>>>>>
> > > > > > First a high level comment:
> > > > > >
> > > > > > Overall, I would like to make one step back, and make sure we are
> > > > > > discussion on the same level. Originally, I understood this KIP
> > > > > >>> as a
> > > > > > proposed change of _semantics_, however, given the latest
> > > > > >>> discussion
> > > > > > it seems it's actually not -- it's more an _optimization_
> > > > > >>> proposal.
> > > > > > Hence, we only need to make sure that this optimization does not
> > > > > >>> break
> > > > > > existing semantics. It this the right way to think about it?
> > > > > >
> > > > > > If yes, than it might actually be ok to have different behavior
> > > > > > depending if there is a materialized KTable or not. So far, we
> > > > > >>> never
> > > > > > defined a public contract about our emit strategy and it seems
> > > > > >>> this
> > > > > > KIP does not define one either.
> > > > > >
> > > > > > Hence, I don't have as strong of an opinion about sending
> > > > > >>> oldValues
> > > > > > for example any longer. I guess the question is really, what can
> > > > > >>> we
> > > > > > implement in a reasonable way.
> > > > > >
> > > > > >
> > > > > >
> > > > > > Other comments:
> > > > > >
> > > > > >
> > > > > > @Richard:
> > > > > >
> > > > > > Can you please add the KIP to the KIP overview table: It's missing
> > > > > > (
> > > > > >>>>>>
> > > > > >>>
> > > > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Pro
> > > > > > posals).
> > > > > >
> > > > > >
> > > > > > @Bruno:
> > > > > >
> > > > > > You mentioned caching. I think it's irrelevant (orthogonal) and
> > > > > >>> we can
> > > > > > discuss this KIP without considering it.
> > > > > >
> > > > > >
> > > > > > @John:
> > > > > >
> > > > > >>>>>>>>> Even in the source table, we forward the updated record with
> > > > the
> > > > > >>>>>>>>> higher of the two timestamps. So the example is more like:
> > > > > >
> > > > > > That is not correct. Currently, we forward with the smaller
> > > > > > out-of-order timestamp (changing the timestamp would corrupt the
> > > > > >>> data
> > > > > > -- we don't know, because we don't check, if the value is the
> > > > > >>> same
> > > > > >>>>>> or
> > > > > > a different one, hence, we must emit the out-of-order record
> > > > > >>> as-is).
> > > > > >
> > > > > > If we start to do emit-on-change, we also need to emit a new
> > > > > >>> record if
> > > > > > the timestamp changes due to out-of-order data, hence, we would
> > > > > >>> still
> > > > > > need to emit <K,V,T1> because that give us correct semantics:
> > > > > >>> assume
> > > > > > you have a filter() and afterward use the filter KTable in a
> > > > > > stream-table join -- the lower T1 timestamp must be propagated to
> > > > > >>> the
> > > > > > filtered KTable to ensure that that the stream-table join compute
> > > > > >>> the
> > > > > > correct result.
> > > > > >
> > > > > >
> > > > > >
> > > > > > Your point about requiring an `equals()` implementation is
> > > > > >>> actually a
> > > > > > quite interesting one and boils down to my statement from above
> > > > > >>> about
> > > > > > "what can we actually implement". What I don't understand is:
> > > > > >
> > > > > >>>>>>>>> This way, we still don't have to rely on the existence of an
> > > > > >>>>>>>>> equals() method, but if it is there, we can benefit from it.
> > > > > >
> > > > > > Your bullet point (2) says it uses `equals()` -- hence, it seems
> > > > > >>> we
> > > > > > actually to rely on it? Also, how can we detect if there is an
> > > > > > `equals()` method to do the comparison? Would be fail if we don't
> > > > > >>> have
> > > > > > `equals()` nor corresponding serializes to do the comparison?
> > > > > >
> > > > > >
> > > > > >
> > > > > >>>>>>>>> Wow, really good catch! Yes, we absolutely need metrics and
> > > > > >>> logs if
> > > > > >>>>>>>>> we're going to drop any records. And, yes, we should propose
> > > > > >>>>>>>>> metrics and logs that are similar to the existing ones when 
> > > > > >>>>>>>>> we
> > > > > >>> drop
> > > > > >>>>>>>>> records for other reasons.
> > > > > >
> > > > > > I am not sure about this point. In fact, we have already some
> > > > > >>> no-ops
> > > > > > in Kafka Streams in our join-operators and don't report any of
> > > > > >>> those
> > > > > > either. Emit-on-change is operator semantics and I don't see why
> > > > > >>> we
> > > > > > would need to have a metric for it? It seems to be quite different
> > > > > > compared to dropping late or malformed records.
> > > > > >
> > > > > >
> > > > > > -Matthias
> > > > > >
> > > > > >
> > > > > >
> > > > > > On 2/4/20 7:13 AM, Thomas Becker wrote:
> > > > > >>>>>>>>> Thanks John for your thoughtful reply. Some comments inline.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> On Mon, 2020-02-03 at 11:51 -0600, John Roesler wrote:
> > > > > >>>>>>>>>> [EXTERNAL EMAIL] Attention: This email was sent from 
> > > > > >>>>>>>>>> outside
> > > > > >>>>>>>>>> TiVo. DO NOT CLICK any links or attachments unless you
> > > > expected
> > > > > >>>>>>>>>> them. ________________________________
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> Hi Tommy,
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> Thanks for the context. I can see the attraction of
> > > > considering
> > > > > >>>>>>>>>> these use cases together.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> To answer your question, if a part of the record is not
> > > > > >>> relevant
> > > > > >>>>>>>>>> to downstream consumers, I was thinking you could just use 
> > > > > >>>>>>>>>> a
> > > > > >>>>>>>>>> mapValue to remove it.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> E.g., suppose you wanted to do a join between two tables.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> employeeInfo.join( employeePayroll, (info, payroll) -> new
> > > > > >>>>>>>>>> Result(info.name(), payroll.salary()) )
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> We only care about one attribute from the Info table 
> > > > > >>>>>>>>>> (name),
> > > > > >>> and
> > > > > >>>>>>>>>> one from the Payroll table (salary), and these attributes
> > > > > >>> change
> > > > > >>>>>>>>>> rarely. On the other hand, there might be many other
> > > > attributes
> > > > > >>>>>>>>>> that change frequently of these tables. We can avoid
> > > > triggering
> > > > > >>>>>>>>>> the join unnecessarily by mapping the input tables to drop 
> > > > > >>>>>>>>>> the
> > > > > >>>>>>>>>> unnecessary information before the join:
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> names = employeeInfo.mapValues(info -> info.name()) 
> > > > > >>>>>>>>>> salaries
> > > > =
> > > > > >>>>>>>>>> employeePayroll.mapValues(payroll -> payroll.salary())
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> names.join( salaries, (name, salary) -> new Result(name,
> > > > > >>> salary)
> > > > > >>>>>>>>>> )
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> Ahh yes I see. This works, but in the case where you're 
> > > > > >>>>>>>>> using
> > > > > >>>>>>>>> schemas as we are (e.g. Avro), it seems like this approach
> > > > could
> > > > > >>>>>>>>> lead to a proliferation of "skinny" record types that just 
> > > > > >>>>>>>>> drop
> > > > > >>>>>>>>> various fields.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> Especially if we take Matthias's idea to drop non-changes 
> > > > > >>>>>>>>>> even
> > > > > >>>>>>>>>> for stateless operations, this would be quite efficient 
> > > > > >>>>>>>>>> and is
> > > > > >>>>>>>>>> also a very straightforward optimization to understand once
> > > > you
> > > > > >>>>>>>>>> know that Streams provides emit-on-change.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> From the context that you provided, it seems like a 
> > > > > >>>>>>>>>> slightly
> > > > > >>>>>>>>>> different situation, though. Reading between the lines a
> > > > > >>> little,
> > > > > >>>>>>>>>> it sounds like: in contrast to the example above, in which 
> > > > > >>>>>>>>>> we
> > > > > >>> are
> > > > > >>>>>>>>>> filtering out extra _data_, you have some extra _metadata_
> > > > that
> > > > > >>>>>>>>>> you still wish to pass down with the data when there is a
> > > > > >>> "real"
> > > > > >>>>>>>>>> update, but you don't want the metadata itself to cause an
> > > > > >>>>>>>>>> update.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> Despite my lack of clarity, yes you've got it right ;) This
> > > > > >>>>>>>>> particular processor is the first stop for this data after
> > > > > >>> coming
> > > > > >>>>>>>>> in from external users, who often simply post the same 
> > > > > >>>>>>>>> content
> > > > > >>> each
> > > > > >>>>>>>>> time and we're trying to shield downstream consumers from
> > > > > >>>>>>>>> unnecessary churn.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> It does seem handy to be able to plug in a custom
> > > > > >>> ChangeDetector
> > > > > >>>>>>>>>> for this purpose, but I worry about the API complexity. 
> > > > > >>>>>>>>>> Maybe
> > > > > >>> you
> > > > > >>>>>>>>>> can help think though how to provide the same benefit while
> > > > > >>>>>>>>>> limiting user-facing complexity.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> Here's some extra context to consider:
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> We currently don't make any extra requirements about the
> > > > nature
> > > > > >>>>>>>>>> of data that you can use in Streams. For example, you don't
> > > > > >>> have
> > > > > >>>>>>>>>> to implement hashCode and equals, or compareTo, etc. With 
> > > > > >>>>>>>>>> the
> > > > > >>>>>>>>>> current proposal, we can do an airtight comparison based 
> > > > > >>>>>>>>>> only
> > > > > >>> on
> > > > > >>>>>>>>>> the serialized form of the values, and we actually don't 
> > > > > >>>>>>>>>> have
> > > > > >>> to
> > > > > >>>>>>>>>> deserialize the "prior" value at all for a large number of
> > > > > >>>>>>>>>> operations. Admitedly, if we extend the proposal to include
> > > > > >>> no-op
> > > > > >>>>>>>>>> detection for stateless operations, we'd probably need to 
> > > > > >>>>>>>>>> rely
> > > > > >>> on
> > > > > >>>>>>>>>> equals() for no-op checking, otherwise we'd wind up 
> > > > > >>>>>>>>>> requiring
> > > > > >>>>>>>>>> serdes for stateless operations as well. Actually, I'd
> > > > probably
> > > > > >>>>>>>>>> argue for doing exactly that:
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> 1. In stateful operations, drop if the serialized byte[]s 
> > > > > >>>>>>>>>> are
> > > > > >>> the
> > > > > >>>>>>>>>> same. After deserializing, also drop if the objects are 
> > > > > >>>>>>>>>> equal
> > > > > >>>>>>>>>> according to Object#equals().
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> 2. In stateless operations, compare the "new" and "old" 
> > > > > >>>>>>>>>> values
> > > > > >>>>>>>>>> (if "old" is available) based on Object#equals().
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> 3. As a final optimization, after serializing and before
> > > > > >>> sending
> > > > > >>>>>>>>>> repartition records, compare the serialized data and drop
> > > > > >>>>>>>>>> no-ops.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> This way, we still don't have to rely on the existence of 
> > > > > >>>>>>>>>> an
> > > > > >>>>>>>>>> equals() method, but if it is there, we can benefit from 
> > > > > >>>>>>>>>> it.
> > > > > >>>>>>>>>> Also, we don't require a serde in any new situations, but 
> > > > > >>>>>>>>>> we
> > > > > >>> can
> > > > > >>>>>>>>>> still leverage it when it is available.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> For clarity, in my example above, even if the employeeInfo 
> > > > > >>>>>>>>>> and
> > > > > >>>>>>>>>> employeePayroll and Result records all have serdes, we need
> > > > the
> > > > > >>>>>>>>>> "name" field (presumably String) and the "salary" field
> > > > > >>>>>>>>>> (presumable a Double) to have serdes as well in the naive
> > > > > >>>>>>>>>> implementation. But if we can leverage equals(), then the
> > > > > >>> "right
> > > > > >>>>>>>>>> thing" happens automatically.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> I still don't totally follow why the individual components
> > > > > >>> (name,
> > > > > >>>>>>>>> salary) would have to have serdes here. If Result has one, 
> > > > > >>>>>>>>> we
> > > > > >>>>>>>>> compare bytes, and if Result additionally has an equals()
> > > > method
> > > > > >>>>>>>>> (which presumably includes equals comparisons on the
> > > > constituent
> > > > > >>>>>>>>> fields), have we not covered our bases?
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> This dovetails in with my primary UX concern; where would 
> > > > > >>>>>>>>>> the
> > > > > >>>>>>>>>> ChangeDetector actually be registered? None of the 
> > > > > >>>>>>>>>> operators
> > > > in
> > > > > >>>>>>>>>> my example have names or topics or any other identifiable
> > > > > >>>>>>>>>> characteristic that could be passed to a ChangeDetector 
> > > > > >>>>>>>>>> class
> > > > > >>>>>>>>>> registered via config. You could say that we make
> > > > > >>> ChangeDetector
> > > > > >>>>>>>>>> an optional parameter to every operation in Streams, but 
> > > > > >>>>>>>>>> this
> > > > > >>>>>>>>>> seems to carry quite a bit of mental burden with it. People
> > > > > >>> will
> > > > > >>>>>>>>>> wonder what it's for and whether or not they should be 
> > > > > >>>>>>>>>> using
> > > > > >>> it.
> > > > > >>>>>>>>>> There would almost certainly be a misconception that it's
> > > > > >>>>>>>>>> preferable to implement it always, which would be 
> > > > > >>>>>>>>>> unfortunate.
> > > > > >>>>>>>>>> Plus, to actually implment metadata flowing through the
> > > > > >>> topology
> > > > > >>>>>>>>>> as in your use case, you'd have to do two things: 1. make 
> > > > > >>>>>>>>>> sure
> > > > > >>>>>>>>>> that all operations actually preserve the metadata 
> > > > > >>>>>>>>>> alongside
> > > > > >>> the
> > > > > >>>>>>>>>> data (e.g., don't accidentally add a mapValues like I did, 
> > > > > >>>>>>>>>> or
> > > > > >>> you
> > > > > >>>>>>>>>> drop the metadata). 2. implement a ChangeDetector for every
> > > > > >>>>>>>>>> single operation in the topology, or you don't get the 
> > > > > >>>>>>>>>> benefit
> > > > > >>> of
> > > > > >>>>>>>>>> dropping non-changes internally 2b. Alternatively, you 
> > > > > >>>>>>>>>> could
> > > > > >>> just
> > > > > >>>>>>>>>> add the ChangeDetector to one operation toward the end of 
> > > > > >>>>>>>>>> the
> > > > > >>>>>>>>>> topology. This would not drop redundant computation
> > > > internally,
> > > > > >>>>>>>>>> but only drop redundant _outputs_. But this is just about 
> > > > > >>>>>>>>>> the
> > > > > >>>>>>>>>> same as your current solution.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> I definitely see your point regarding configuration. I was
> > > > > >>>>>>>>> originally thinking about this when the deduplication was 
> > > > > >>>>>>>>> going
> > > > > >>> to
> > > > > >>>>>>>>> be opt-in, and it seemed very natural to say something like:
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> employeeInfo.join(employeePayroll, (info, payroll) -> new
> > > > > >>>>>>>>> Result(info.name(), payroll.salary()))
> > > > > >>>>>>>>> .suppress(duplicatesAccordingTo(someChangeDetector))
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> Alternatively you can imagine a similar method being on
> > > > > >>>>>>>>> Materialized, though obviously this makes less sense if we
> > > > don't
> > > > > >>>>>>>>> want to require materialization. If we're now talking about
> > > > > >>>>>>>>> changing the default behavior and not having any 
> > > > > >>>>>>>>> configuration
> > > > > >>>>>>>>> options, it's harder to find a place for this.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> A final thought; if it really is a metadata question, can 
> > > > > >>>>>>>>>> we
> > > > > >>> just
> > > > > >>>>>>>>>> plan to finish up the support for headers in Streams? I.e.,
> > > > > >>> give
> > > > > >>>>>>>>>> you a way to control the way that headers flow through the
> > > > > >>>>>>>>>> topology? Then, we could treat headers the same way we 
> > > > > >>>>>>>>>> treat
> > > > > >>>>>>>>>> timestamps in the no-op checking... We completely ignore 
> > > > > >>>>>>>>>> them
> > > > > >>>>>>>>>> for the sake of comparison. Thus, neither the timestamp nor
> > > > the
> > > > > >>>>>>>>>> headers would get updated in internal state or in 
> > > > > >>>>>>>>>> downstream
> > > > > >>>>>>>>>> views as long as the value itself doesn't change. This 
> > > > > >>>>>>>>>> seems
> > > > to
> > > > > >>>>>>>>>> give us a way to support your use case without adding to 
> > > > > >>>>>>>>>> the
> > > > > >>>>>>>>>> mental overhead of using Streams for simple things.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> Agree headers could be a decent fit for this particular case
> > > > > >>>>>>>>> because it's mostly metadata, though to be honest we haven't
> > > > > >>> looked
> > > > > >>>>>>>>> at headers much (mostly because, and to your point, support
> > > > > >>> seems
> > > > > >>>>>>>>> to be lacking). I feel like there would be other cases where
> > > > > >>> this
> > > > > >>>>>>>>> feature could be valuable, but I admit I can't come up with
> > > > > >>>>>>>>> anything right this second. Perhaps yuzhihong had an 
> > > > > >>>>>>>>> example in
> > > > > >>>>>>>>> mind?
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> I.e., simple things should be easy, and complex things 
> > > > > >>>>>>>>>> should
> > > > > >>> be
> > > > > >>>>>>>>>> possible.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> What are your thoughts? Thanks, -John
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> On Mon, Feb 3, 2020, at 07:19, Thomas Becker wrote:
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> Hi John, Can you describe how you'd use filtering/mapping 
> > > > > >>>>>>>>>> to
> > > > > >>>>>>>>>> deduplicate records? To give some background on my 
> > > > > >>>>>>>>>> suggestion
> > > > > >>> we
> > > > > >>>>>>>>>> currently have a small stream processor that exists solely 
> > > > > >>>>>>>>>> to
> > > > > >>>>>>>>>> deduplicate, which we do using a process that I assume 
> > > > > >>>>>>>>>> would
> > > > be
> > > > > >>>>>>>>>> similar to what would be done here (with a store of keys 
> > > > > >>>>>>>>>> and
> > > > > >>> hash
> > > > > >>>>>>>>>> values). But the records we are deduplicating have some
> > > > > >>> metadata
> > > > > >>>>>>>>>> fields (such as timestamps of when the record was posted) 
> > > > > >>>>>>>>>> that
> > > > > >>> we
> > > > > >>>>>>>>>> don't consider semantically meaningful for downstream
> > > > > >>> consumers,
> > > > > >>>>>>>>>> and therefore we also suppress updates that only touch 
> > > > > >>>>>>>>>> those
> > > > > >>>>>>>>>> fields.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> -Tommy
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> On Fri, 2020-01-31 at 19:30 -0600, John Roesler wrote:
> > > > > >>> [EXTERNAL
> > > > > >>>>>>>>>> EMAIL] Attention: This email was sent from outside TiVo. DO
> > > > NOT
> > > > > >>>>>>>>>> CLICK any links or attachments unless you expected them.
> > > > > >>>>>>>>>> ________________________________
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> Hi Thomas and yuzhihong, That’s an interesting idea. Can 
> > > > > >>>>>>>>>> you
> > > > > >>> help
> > > > > >>>>>>>>>> think of a use case that isn’t also served by filtering or
> > > > > >>>>>>>>>> mapping beforehand? Thanks for helping to design this 
> > > > > >>>>>>>>>> feature!
> > > > > >>>>>>>>>> -John On Fri, Jan 31, 2020, at 18:56, yuzhih...@gmail.com
> > > > > >>>>>>>>>> <mailto:yuzhih...@gmail.com> wrote: I think this is good
> > > > > >>> idea. On
> > > > > >>>>>>>>>> Jan 31, 2020, at 4:49 PM, Thomas Becker <
> > > > > >>> thomas.bec...@tivo.com
> > > > > >>>>>>>>>> <mailto:thomas.bec...@tivo.com>> wrote: How do folks feel
> > > > > >>> about
> > > > > >>>>>>>>>> allowing the mechanism by which no-ops are detected to be
> > > > > >>>>>>>>>> pluggable? Meaning use something like a hash by default, 
> > > > > >>>>>>>>>> but
> > > > > >>> you
> > > > > >>>>>>>>>> could optionally provide an implementation of something to 
> > > > > >>>>>>>>>> use
> > > > > >>>>>>>>>> instead, like a ChangeDetector. This could be useful for
> > > > > >>> example
> > > > > >>>>>>>>>> to ignore changes to certain fields, which may not be 
> > > > > >>>>>>>>>> relevant
> > > > > >>> to
> > > > > >>>>>>>>>> the operation being performed.
> > > > ________________________________
> > > > > >>>>>>>>>> From: John Roesler <vvcep...@apache.org
> > > > > >>>>>>>>>> <mailto:vvcep...@apache.org>> Sent: Friday, January 31, 
> > > > > >>>>>>>>>> 2020
> > > > > >>> 4:51
> > > > > >>>>>>>>>> PM To: dev@kafka.apache.org <mailto:dev@kafka.apache.org>
> > > > > >>>>>>>>>> <dev@kafka.apache.org <mailto:dev@kafka.apache.org>> 
> > > > > >>>>>>>>>> Subject:
> > > > > >>> Re:
> > > > > >>>>>>>>>> [KAFKA-557] Add emit on change support for Kafka Streams
> > > > > >>>>>>>>>> [EXTERNAL EMAIL] Attention: This email was sent from 
> > > > > >>>>>>>>>> outside
> > > > > >>>>>>>>>> TiVo. DO NOT CLICK any links or attachments unless you
> > > > expected
> > > > > >>>>>>>>>> them. ________________________________
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> Hello all, Sorry for my silence. It seems like we are 
> > > > > >>>>>>>>>> getting
> > > > > >>>>>>>>>> close to consensus. Hopefully, we could move to a vote 
> > > > > >>>>>>>>>> soon!
> > > > > >>> All
> > > > > >>>>>>>>>> of the reasoning from Matthias and Bruno around timestamp 
> > > > > >>>>>>>>>> is
> > > > > >>>>>>>>>> compelling. I would be strongly in favor of stating a few
> > > > > >>> things
> > > > > >>>>>>>>>> very clearly in the KIP: 1. Streams will drop no-op updates
> > > > > >>> only
> > > > > >>>>>>>>>> for KTable operations. That is, we won't make any changes 
> > > > > >>>>>>>>>> to
> > > > > >>>>>>>>>> KStream aggregations at the moment. It does seem like we 
> > > > > >>>>>>>>>> can
> > > > > >>>>>>>>>> potentially revisit the time semantics of that operation in
> > > > the
> > > > > >>>>>>>>>> future, but we don't need to do it now. On the other hand, 
> > > > > >>>>>>>>>> the
> > > > > >>>>>>>>>> proposed semantics for KTable timestamps (marking the
> > > > beginning
> > > > > >>>>>>>>>> of the validity of that record) makes sense to me. 2. 
> > > > > >>>>>>>>>> Streams
> > > > > >>>>>>>>>> will only drop no-op updates for _stateful_ KTable 
> > > > > >>>>>>>>>> operations.
> > > > > >>> We
> > > > > >>>>>>>>>> don't want to add a hard guarantee that Streams will 
> > > > > >>>>>>>>>> _never_
> > > > > >>>>>>>>>> emit a no-op table update because it would require adding
> > > > state
> > > > > >>>>>>>>>> to otherwise stateless operations. If someone is really
> > > > > >>> concerned
> > > > > >>>>>>>>>> about a particular stateless operation producing a lot of
> > > > no-op
> > > > > >>>>>>>>>> results, all they have to do is materialize it, and Streams
> > > > > >>> would
> > > > > >>>>>>>>>> automatically drop the no-ops. Additionally, I'm +1 on not
> > > > > >>> adding
> > > > > >>>>>>>>>> an opt-out at this time. Regarding the KIP itself, I would
> > > > > >>> clean
> > > > > >>>>>>>>>> it up a bit before calling for a vote. There is a lot of
> > > > > >>>>>>>>>> "discussion"-type language there, which is very natural to
> > > > > >>> read,
> > > > > >>>>>>>>>> but makes it a bit hard to see what _exactly_ the kip is
> > > > > >>>>>>>>>> proposing. Richard, would you mind just making the 
> > > > > >>>>>>>>>> "proposed
> > > > > >>>>>>>>>> behavior change" a simple and succinct list of bullet 
> > > > > >>>>>>>>>> points?
> > > > > >>>>>>>>>> I.e., please drop glue phrases like "there has been some
> > > > > >>>>>>>>>> discussion" or "possibly we could do X". For the final 
> > > > > >>>>>>>>>> version
> > > > > >>> of
> > > > > >>>>>>>>>> the KIP, it should just say, "Streams will do X, Streams 
> > > > > >>>>>>>>>> will
> > > > > >>> do
> > > > > >>>>>>>>>> Y". Feel free to add an elaboration section to explain more
> > > > > >>> about
> > > > > >>>>>>>>>> what X and Y mean, but we don't need to talk about
> > > > > >>> possibilities
> > > > > >>>>>>>>>> or alternatives except in the "rejected alternatives" 
> > > > > >>>>>>>>>> section.
> > > > > >>>>>>>>>> Accordingly, can you also move the options you presented in
> > > > the
> > > > > >>>>>>>>>> intro to the "rejected alternatives" section and only 
> > > > > >>>>>>>>>> mention
> > > > > >>> the
> > > > > >>>>>>>>>> final proposal itself? This just really helps reviewers to
> > > > know
> > > > > >>>>>>>>>> what they are voting for, and it helps everyone after the 
> > > > > >>>>>>>>>> fact
> > > > > >>>>>>>>>> when they are trying to get clarity on what exactly the
> > > > > >>> proposal
> > > > > >>>>>>>>>> is, versus all the things it could have been. Thanks, -John
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> On Mon, Jan 27, 2020, at 18:14, Richard Yu wrote: Hello to
> > > > all,
> > > > > >>>>>>>>>> I've finished making some initial modifications to the 
> > > > > >>>>>>>>>> KIP. I
> > > > > >>>>>>>>>> have decided to keep the implementation section in the KIP 
> > > > > >>>>>>>>>> for
> > > > > >>>>>>>>>> record-keeping purposes. For now, we should focus on only 
> > > > > >>>>>>>>>> the
> > > > > >>>>>>>>>> proposed behavior changes instead. See if you have any
> > > > > >>> comments!
> > > > > >>>>>>>>>> Cheers, Richard On Sat, Jan 25, 2020 at 11:12 AM Richard Yu
> > > > > >>>>>>>>>> <yohan.richard...@gmail.com <mailto:
> > > > yohan.richard...@gmail.com
> > > > > >>>>>
> > > > > >>>>>>>>>> wrote: Hi all, Thanks for all the discussion! @John and 
> > > > > >>>>>>>>>> @Bruno
> > > > > >>> I
> > > > > >>>>>>>>>> will survey other possible systems and see what I can do. 
> > > > > >>>>>>>>>> Just
> > > > > >>> a
> > > > > >>>>>>>>>> question, by systems, I suppose you would mean the pros and
> > > > > >>> cons
> > > > > >>>>>>>>>> of different reporting strategies? I'm not completely 
> > > > > >>>>>>>>>> certain
> > > > > >>> on
> > > > > >>>>>>>>>> this point, so it would be great if you can clarify on 
> > > > > >>>>>>>>>> that.
> > > > So
> > > > > >>>>>>>>>> here's what I got from all the discussion so far: - Since 
> > > > > >>>>>>>>>> both
> > > > > >>>>>>>>>> Matthias and John seems to have come to a consensus on 
> > > > > >>>>>>>>>> this,
> > > > > >>> then
> > > > > >>>>>>>>>> we will go for an all-round behavorial change for KTables.
> > > > > >>> After
> > > > > >>>>>>>>>> some thought, I decided that for now, an opt-out config 
> > > > > >>>>>>>>>> will
> > > > > >>> not
> > > > > >>>>>>>>>> be added. As John have pointed out, no-op changes tend to
> > > > > >>>>>>>>>> explode further down the topology as they are forwarded to
> > > > more
> > > > > >>>>>>>>>> and more processor nodes downstream. - About using hash 
> > > > > >>>>>>>>>> codes,
> > > > > >>>>>>>>>> after some explanation from John, it looks like hash codes
> > > > > >>> might
> > > > > >>>>>>>>>> not be as ideal (for implementation). For now, we will omit
> > > > > >>> that
> > > > > >>>>>>>>>> detail, and save it for the PR. - @Bruno You do have valid
> > > > > >>>>>>>>>> concerns. Though, I am not completely certain if we want 
> > > > > >>>>>>>>>> to do
> > > > > >>>>>>>>>> emit-on-change only for materialized KTables. I will put it
> > > > > >>> down
> > > > > >>>>>>>>>> in the KIP regardless. I will do my best to address all 
> > > > > >>>>>>>>>> points
> > > > > >>>>>>>>>> raised so far on the discussion. Hope we could keep this
> > > > going!
> > > > > >>>>>>>>>> Best, Richard On Fri, Jan 24, 2020 at 6:07 PM Bruno Cadonna
> > > > > >>>>>>>>>> <br...@confluent.io <mailto:br...@confluent.io>> wrote: 
> > > > > >>>>>>>>>> Thank
> > > > > >>> you
> > > > > >>>>>>>>>> Matthias for the use cases! Looking at both use cases, I 
> > > > > >>>>>>>>>> think
> > > > > >>>>>>>>>> you need to elaborate on them in the KIP, Richard. Emit 
> > > > > >>>>>>>>>> from
> > > > > >>>>>>>>>> plain KTable: I agree with Matthias that the lower 
> > > > > >>>>>>>>>> timestamp
> > > > > >>>>>>>>>> makes sense because it marks the start of the validity of 
> > > > > >>>>>>>>>> the
> > > > > >>>>>>>>>> record. Idempotent records with a higher timestamp can be
> > > > > >>> safely
> > > > > >>>>>>>>>> ignored. A corner case that I discussed with Matthias 
> > > > > >>>>>>>>>> offline
> > > > > >>> is
> > > > > >>>>>>>>>> when we do not materialize a KTable due to optimization. 
> > > > > >>>>>>>>>> Then
> > > > > >>> we
> > > > > >>>>>>>>>> cannot avoid the idempotent records because we do not keep 
> > > > > >>>>>>>>>> the
> > > > > >>>>>>>>>> first record with the lower timestamp to compare to. Emit 
> > > > > >>>>>>>>>> from
> > > > > >>>>>>>>>> KTable with aggregations: If we specify that an aggregation
> > > > > >>>>>>>>>> result should have the highest timestamp of the records 
> > > > > >>>>>>>>>> that
> > > > > >>>>>>>>>> participated in the aggregation, we cannot ignore any
> > > > > >>> idempotent
> > > > > >>>>>>>>>> records. Admittedly, the result of an aggregation usually
> > > > > >>>>>>>>>> changes, but there are aggregations where the result may 
> > > > > >>>>>>>>>> not
> > > > > >>>>>>>>>> change like min and max, or sum when the incoming records 
> > > > > >>>>>>>>>> have
> > > > > >>> a
> > > > > >>>>>>>>>> value of zero. In those cases, we could benefit of the 
> > > > > >>>>>>>>>> emit on
> > > > > >>>>>>>>>> change, but only if we define the semantics of the
> > > > aggregations
> > > > > >>>>>>>>>> to not use the highest timestamp of the participating 
> > > > > >>>>>>>>>> records
> > > > > >>> for
> > > > > >>>>>>>>>> the result. In Kafka Streams, we do not have min, max, and 
> > > > > >>>>>>>>>> sum
> > > > > >>> as
> > > > > >>>>>>>>>> explicit aggregations, but we need to provide an API to 
> > > > > >>>>>>>>>> define
> > > > > >>>>>>>>>> what timestamp should be used for the result of an 
> > > > > >>>>>>>>>> aggregation
> > > > > >>> if
> > > > > >>>>>>>>>> we want to go down this path. All of this does not block 
> > > > > >>>>>>>>>> this
> > > > > >>> KIP
> > > > > >>>>>>>>>> and I just wanted to put this aspects up for discussion. 
> > > > > >>>>>>>>>> The
> > > > > >>> KIP
> > > > > >>>>>>>>>> can limit itself to emit from materialized KTables. 
> > > > > >>>>>>>>>> However,
> > > > > >>> the
> > > > > >>>>>>>>>> limits should be explicitly stated in the KIP. Best, Bruno
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> On Fri, Jan 24, 2020 at 10:58 AM Matthias J. Sax
> > > > > >>>>>>>>>> <matth...@confluent.io <mailto:matth...@confluent.io>> 
> > > > > >>>>>>>>>> wrote:
> > > > > >>>>>>>>>> IMHO, the question about semantics depends on the use 
> > > > > >>>>>>>>>> case, in
> > > > > >>>>>>>>>> particular on the origin of a KTable. If there is a 
> > > > > >>>>>>>>>> changlog
> > > > > >>>>>>>>>> topic that one reads directly into a KTable, emit-on-change
> > > > > >>> does
> > > > > >>>>>>>>>> actually make sense, because the timestamp indicates _when_
> > > > the
> > > > > >>>>>>>>>> update was _effective_. For this case, it is semantically
> > > > sound
> > > > > >>>>>>>>>> to _not_ update the timestamp in the store, because the 
> > > > > >>>>>>>>>> second
> > > > > >>>>>>>>>> update is actually idempotent and advancing the timestamp 
> > > > > >>>>>>>>>> is
> > > > > >>> not
> > > > > >>>>>>>>>> ideal (one could even consider it to be wrong to advance 
> > > > > >>>>>>>>>> the
> > > > > >>>>>>>>>> timestamp) because the "valid time" of the record pair did 
> > > > > >>>>>>>>>> not
> > > > > >>>>>>>>>> change. This reasoning also applies to KTable-KTable joins.
> > > > > >>>>>>>>>> However, if the KTable is the result of an aggregation, I
> > > > think
> > > > > >>>>>>>>>> emit-on-update is more natural, because the timestamp 
> > > > > >>>>>>>>>> reflects
> > > > > >>>>>>>>>> the _last_ time (ie, highest timestamp) of all input 
> > > > > >>>>>>>>>> records
> > > > > >>> the
> > > > > >>>>>>>>>> contributed to the result. Hence, updating the timestamp 
> > > > > >>>>>>>>>> and
> > > > > >>>>>>>>>> emitting a new record actually sounds correct to me. This
> > > > > >>> applies
> > > > > >>>>>>>>>> to windowed and non-windowed aggregations IMHO. However,
> > > > > >>>>>>>>>> considering the argument that the timestamp should not be
> > > > > >>> update
> > > > > >>>>>>>>>> in the first case in the store to begin with, both cases 
> > > > > >>>>>>>>>> are
> > > > > >>>>>>>>>> actually the same, and both can be modeled as 
> > > > > >>>>>>>>>> emit-on-change:
> > > > > >>> if
> > > > > >>>>>>>>>> a `table()` operator does not update the timestamp if the
> > > > value
> > > > > >>>>>>>>>> does not change, there is _no_ change and thus nothing is
> > > > > >>>>>>>>>> emitted. At the same time, if an aggregation operator does
> > > > > >>> update
> > > > > >>>>>>>>>> the timestamp (even if the value does not change) there 
> > > > > >>>>>>>>>> _is_ a
> > > > > >>>>>>>>>> change and we emit. Note that handling out-of-order data 
> > > > > >>>>>>>>>> for
> > > > > >>>>>>>>>> aggregations would also work seamlessly with this approach 
> > > > > >>>>>>>>>> --
> > > > > >>> for
> > > > > >>>>>>>>>> out-of-order records, the timestamp does never change, and
> > > > > >>> thus,
> > > > > >>>>>>>>>> we only emit if the result itself changes. Therefore, I 
> > > > > >>>>>>>>>> would
> > > > > >>>>>>>>>> argue that we might not even need any config, because the
> > > > > >>>>>>>>>> emit-on-change behavior is just correct and reduced the
> > > > > >>>>>>>>>> downstream load, while our current behavior is not ideal 
> > > > > >>>>>>>>>> (even
> > > > > >>> if
> > > > > >>>>>>>>>> it's also correct). Thoughts? -Matthias On 1/24/20 9:37 AM,
> > > > > >>> John
> > > > > >>>>>>>>>> Roesler wrote: Hi Bruno, Thanks for that idea. I hadn't
> > > > > >>>>>>>>>> considered that option before, and it does seem like that
> > > > would
> > > > > >>>>>>>>>> be the right place to put it if we think it might be
> > > > > >>> semantically
> > > > > >>>>>>>>>> important to control on a table-by-table basis. I had been
> > > > > >>>>>>>>>> thinking of it less semantically and more practically. In 
> > > > > >>>>>>>>>> the
> > > > > >>>>>>>>>> context of a large topology, or more generally, a large
> > > > > >>> software
> > > > > >>>>>>>>>> system that contains many topologies and other event-driven
> > > > > >>>>>>>>>> systems, each no-op result becomes an input that is 
> > > > > >>>>>>>>>> destined
> > > > to
> > > > > >>>>>>>>>> itself become a no-op result, and so on, all the way 
> > > > > >>>>>>>>>> through
> > > > > >>> the
> > > > > >>>>>>>>>> system. Thus, a single pointless processing result becomes
> > > > > >>>>>>>>>> amplified into a large number of pointless computations, 
> > > > > >>>>>>>>>> cache
> > > > > >>>>>>>>>> perturbations, and network and disk I/O operations. If you
> > > > also
> > > > > >>>>>>>>>> consider operations with fan-out implications, like 
> > > > > >>>>>>>>>> branching
> > > > > >>> or
> > > > > >>>>>>>>>> foreign-key joins, the wasted resources are amplified not 
> > > > > >>>>>>>>>> just
> > > > > >>> in
> > > > > >>>>>>>>>> proportion to the size of the system, but the size of the
> > > > > >>> system
> > > > > >>>>>>>>>> times the average fan-out (to the power of the number of
> > > > > >>> fan-out
> > > > > >>>>>>>>>> operations on the path(s) through the system). In my time
> > > > > >>>>>>>>>> operating such systems, I've observed these effects to be 
> > > > > >>>>>>>>>> very
> > > > > >>>>>>>>>> real, and actually, the system and use case doesn't have 
> > > > > >>>>>>>>>> to be
> > > > > >>>>>>>>>> very large before the amplification poses an existential
> > > > threat
> > > > > >>>>>>>>>> to the system as a whole. This is the basis of my 
> > > > > >>>>>>>>>> advocating
> > > > > >>> for
> > > > > >>>>>>>>>> a simple behavior change, rather than an opt-in config of 
> > > > > >>>>>>>>>> any
> > > > > >>>>>>>>>> kind. It seems like Streams should "do the right thing" for
> > > > the
> > > > > >>>>>>>>>> majority use case. My theory (which may be wrong) is that 
> > > > > >>>>>>>>>> the
> > > > > >>>>>>>>>> majority use case is more like "relational queries" than 
> > > > > >>>>>>>>>> "CEP
> > > > > >>>>>>>>>> queries". Even if you were doing some event-sensitive
> > > > > >>>>>>>>>> computation, wouldn't you do them as Stream operations 
> > > > > >>>>>>>>>> (where
> > > > > >>>>>>>>>> this feature is inapplicable anyway)? In keeping with the
> > > > > >>>>>>>>>> "practical" perspective, I suggested the opt-out config 
> > > > > >>>>>>>>>> only
> > > > in
> > > > > >>>>>>>>>> the (I think unlikely) event that filtering out pointless
> > > > > >>> updates
> > > > > >>>>>>>>>> actually harms performance. I'd also be perfectly fine 
> > > > > >>>>>>>>>> without
> > > > > >>>>>>>>>> the opt-out config. I really think that (because of the
> > > > > >>> timestamp
> > > > > >>>>>>>>>> semantics work already underway), we're already 
> > > > > >>>>>>>>>> pre-fetching
> > > > > >>> the
> > > > > >>>>>>>>>> prior result most of the time, so there would actually be 
> > > > > >>>>>>>>>> very
> > > > > >>>>>>>>>> little extra I/O involved in implementing emit-on-change.
> > > > > >>>>>>>>>> However, we should consider whether my experience is 
> > > > > >>>>>>>>>> likely to
> > > > > >>>>>>>>>> be general. Do you have some use case in mind for which 
> > > > > >>>>>>>>>> you'd
> > > > > >>>>>>>>>> actually want some KTable results to be emit-on-update for
> > > > > >>>>>>>>>> semantic reasons? Thanks, -John
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> On Fri, Jan 24, 2020, at 11:02, Bruno Cadonna wrote: Hi
> > > > > >>> Richard,
> > > > > >>>>>>>>>> Thank you for the KIP. I agree with John that we should 
> > > > > >>>>>>>>>> focus
> > > > > >>> on
> > > > > >>>>>>>>>> the interface and behavior change in a KIP. We can discuss 
> > > > > >>>>>>>>>> the
> > > > > >>>>>>>>>> implementation later. I am also +1 for the survey. I had a
> > > > > >>>>>>>>>> thought about this. Couldn't we consider emit-on-change to 
> > > > > >>>>>>>>>> be
> > > > > >>> one
> > > > > >>>>>>>>>> config of suppress (like `untilWindowCloses`)? What you
> > > > > >>>>>>>>>> basically propose is to suppress updates if they do not 
> > > > > >>>>>>>>>> change
> > > > > >>>>>>>>>> the result. Considering emit on change as a flavour of
> > > > suppress
> > > > > >>>>>>>>>> would be more flexible because it would specify the 
> > > > > >>>>>>>>>> behavior
> > > > > >>>>>>>>>> locally for a KTable instead of globally for all KTables.
> > > > > >>>>>>>>>> Additionally, specifying the behavior in one place instead 
> > > > > >>>>>>>>>> of
> > > > > >>>>>>>>>> multiple places feels more intuitive and consistent to me.
> > > > > >>> Best,
> > > > > >>>>>>>>>> Bruno On Fri, Jan 24, 2020 at 7:49 AM John Roesler
> > > > > >>>>>>>>>> <vvcep...@apache.org <mailto:vvcep...@apache.org>> wrote: 
> > > > > >>>>>>>>>> Hi
> > > > > >>>>>>>>>> Richard, Thanks for picking this up! I know of at least one
> > > > > >>> large
> > > > > >>>>>>>>>> community member for which this feature is absolutely
> > > > > >>> essential.
> > > > > >>>>>>>>>> If I understand your two options, it seems like the 
> > > > > >>>>>>>>>> proposal
> > > > is
> > > > > >>>>>>>>>> to implement it as a behavior change regardless, and the
> > > > > >>> question
> > > > > >>>>>>>>>> is whether to provide an opt-out config or not. Given that 
> > > > > >>>>>>>>>> any
> > > > > >>>>>>>>>> implementation of this feature would have some performance
> > > > > >>> impact
> > > > > >>>>>>>>>> under some workloads, and also that we don't know if anyone
> > > > > >>>>>>>>>> really depends on emit-on-update time semantics, it seems 
> > > > > >>>>>>>>>> like
> > > > > >>> we
> > > > > >>>>>>>>>> should propose to add an opt-out config. Can you update the
> > > > KIP
> > > > > >>>>>>>>>> to mention the exact config key and value(s) you'd propose?
> > > > > >>> Just
> > > > > >>>>>>>>>> to move the discussion forward, maybe something like: 
> > > > > >>>>>>>>>> emit.on
> > > > > >>> :=
> > > > > >>>>>>>>>> change|update with the new default being "change" Thanks 
> > > > > >>>>>>>>>> for
> > > > > >>>>>>>>>> pointing out the timestamp issue in particular. I agree 
> > > > > >>>>>>>>>> that
> > > > if
> > > > > >>>>>>>>>> we discard the latter update as a no-op, then we also have 
> > > > > >>>>>>>>>> to
> > > > > >>>>>>>>>> discard its timestamp (obviously, we don't forward the
> > > > > >>> timestamp
> > > > > >>>>>>>>>> update, as that's the whole point, but we also can't update
> > > > the
> > > > > >>>>>>>>>> timestamp in the store, as the store must remain consistent
> > > > > >>> with
> > > > > >>>>>>>>>> what has been emitted). I have to confess that I disagree 
> > > > > >>>>>>>>>> with
> > > > > >>>>>>>>>> your implementation proposal, but it's also not necessary 
> > > > > >>>>>>>>>> to
> > > > > >>>>>>>>>> discuss implementation in the KIP. Maybe it would be less
> > > > > >>>>>>>>>> controversial if you just drop that section for now, so 
> > > > > >>>>>>>>>> that
> > > > > >>> the
> > > > > >>>>>>>>>> KIP discussion can focus on the behavior change and config.
> > > > > >>> Just
> > > > > >>>>>>>>>> for reference, there is some research into this domain. For
> > > > > >>>>>>>>>> example, see the "Report" section (3.2.3) of the SECRET 
> > > > > >>>>>>>>>> paper:
> > > > > >>>>>>>>>>
> > > > > >>>>>>
> > > > https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fpeop
> > > > > > le.csail.mit.edu
> > > > > >>>>>> %2Ftatbul%2Fpublications%2Fmaxstream_vldb10.pdf&amp;data
> > > > > > =02%7C01%7CThomas.Becker%40tivo.com
> > > > > >>>>>> %7Ce0235483b1eb4f259c5c08d7a8d1c16b%7
> > > > > >
> > > > > >>>>>>
> > > > > >>>
> > > > Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C1%7C637163491160859282&amp;sdata
> > > > > > =4dSGIS8jNPAPP7B48r9e%2BUgFh3WdmzVyXhyT63eP8dI%3D&amp;reserved=0
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > > It might help to round out the proposal if you take a brief
> > > > > >>> survey of
> > > > > >>>>>>>>>> the behaviors of other systems, along with pros and cons if
> > > > any
> > > > > >>>>>>>>>> are reported. Thanks, -John
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> On Fri, Jan 10, 2020, at 22:27, Richard Yu wrote: Hi
> > > > everybody!
> > > > > >>>>>>>>>> I'd like to propose a change that we probably should've 
> > > > > >>>>>>>>>> added
> > > > > >>> for
> > > > > >>>>>>>>>> a long time now. The key benefit of this KIP would be 
> > > > > >>>>>>>>>> reduced
> > > > > >>>>>>>>>> traffic in Kafka Streams since a lot of no-op results 
> > > > > >>>>>>>>>> would no
> > > > > >>>>>>>>>> longer be sent downstream. Here is the KIP for reference.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>
> > > > https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwi
> > > > > > ki.apache.org
> > > > > >>>>>> %2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-557%253A%2BAdd%2Bemit
> > > > > >
> > > > > >>>>>>
> > > > > >>>
> > > > %2Bon%2Bchange%2Bsupport%2Bfor%2BKafka%2BStreams&amp;data=02%7C01%7CThom
> > > > > > as.Becker%40tivo.com
> > > > > >>>>>> %7Ce0235483b1eb4f259c5c08d7a8d1c16b%7Cd05b7c6912014c
> > > > > >
> > > > > >>>>>>
> > > > > >>>
> > > > 0db45d7f1dcc227e4d%7C1%7C1%7C637163491160869277&amp;sdata=zYpCSFOsyN4%2B
> > > > > > 4rKRZBQ%2FZvcGQ4EINR9Qm6PLsB7EKrc%3D&amp;reserved=0
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > > Currently, I seek to formalize our approach for this KIP first
> > > > > >>> before
> > > > > >>>>>>>>>> we determine concrete API additions / configurations. Some
> > > > > >>>>>>>>>> configs might warrant adding, whiles others are not 
> > > > > >>>>>>>>>> necessary
> > > > > >>>>>>>>>> since adding them would only increase complexity of Kafka
> > > > > >>>>>>>>>> Streams. Cheers, Richard
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> ________________________________ This email and any
> > > > attachments
> > > > > >>>>>>>>>> may contain confidential and privileged material for the 
> > > > > >>>>>>>>>> sole
> > > > > >>> use
> > > > > >>>>>>>>>> of the intended recipient. Any review, copying, or
> > > > distribution
> > > > > >>>>>>>>>> of this email (or any attachments) by others is 
> > > > > >>>>>>>>>> prohibited. If
> > > > > >>>>>>>>>> you are not the intended recipient, please contact the 
> > > > > >>>>>>>>>> sender
> > > > > >>>>>>>>>> immediately and permanently delete this email and any
> > > > > >>>>>>>>>> attachments. No employee or agent of TiVo is authorized to
> > > > > >>>>>>>>>> conclude any binding agreement on behalf of TiVo by email.
> > > > > >>>>>>>>>> Binding agreements with TiVo may only be made by a signed
> > > > > >>> written
> > > > > >>>>>>>>>> agreement. -- *Tommy Becker* *Principal Engineer *
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> *Personalized Content Discovery*
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> *O* +1 919.460.4747 *tivo.com* <http://www.tivo.com/>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> This email and any attachments may contain confidential and
> > > > > >>>>>>>>>> privileged material for the sole use of the intended
> > > > recipient.
> > > > > >>>>>>>>>> Any review, copying, or distribution of this email (or any
> > > > > >>>>>>>>>> attachments) by others is prohibited. If you are not the
> > > > > >>> intended
> > > > > >>>>>>>>>> recipient, please contact the sender immediately and
> > > > > >>> permanently
> > > > > >>>>>>>>>> delete this email and any attachments. No employee or 
> > > > > >>>>>>>>>> agent of
> > > > > >>>>>>>>>> TiVo is authorized to conclude any binding agreement on 
> > > > > >>>>>>>>>> behalf
> > > > > >>> of
> > > > > >>>>>>>>>> TiVo by email. Binding agreements with TiVo may only be 
> > > > > >>>>>>>>>> made
> > > > > >>> by a
> > > > > >>>>>>>>>> signed written agreement.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> --
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> *Tommy Becker* /Principal Engineer /
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> /Personalized Content Discovery/
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> *O* +1 919.460.4747 *tivo.com* <http://www.tivo.com/>
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>
> > > > ----------------------------------------------------------------------
> > > > > >>>>>>>
> > > > > >>>>>>
> > > > > >>>>>
> > > > > >>>>
> > > > > >>>
> > > > > >>
> > > > > >
> > > > >
> > > > >
> > > > > Attachments:
> > > > > * signature.asc
> > > >
> >

Reply via email to