Hi John,
Can you describe how you'd use filtering/mapping to deduplicate records? To 
give some background on my suggestion we currently have a small stream 
processor that exists solely to deduplicate, which we do using a process that I 
assume would be similar to what would be done here (with a store of keys and 
hash values). But the records we are deduplicating have some metadata fields 
(such as timestamps of when the record was posted) that we don't consider 
semantically meaningful for downstream consumers, and therefore we also 
suppress updates that only touch those fields.

-Tommy


On Fri, 2020-01-31 at 19:30 -0600, John Roesler wrote:

[EXTERNAL EMAIL] Attention: This email was sent from outside TiVo. DO NOT CLICK 
any links or attachments unless you expected them.

________________________________



Hi Thomas and yuzhihong,


That’s an interesting idea. Can you help think of a use case that isn’t also 
served by filtering or mapping beforehand?


Thanks for helping to design this feature!

-John


On Fri, Jan 31, 2020, at 18:56, yuzhih...@gmail.com<mailto:yuzhih...@gmail.com> 
wrote:

I think this is good idea.


On Jan 31, 2020, at 4:49 PM, Thomas Becker 
<thomas.bec...@tivo.com<mailto:thomas.bec...@tivo.com>> wrote:


How do folks feel about allowing the mechanism by which no-ops are detected to 
be pluggable? Meaning use something like a hash by default, but you could 
optionally provide an implementation of something to use instead, like a 
ChangeDetector. This could be useful for example to ignore changes to certain 
fields, which may not be relevant to the operation being performed.

________________________________

From: John Roesler <vvcep...@apache.org<mailto:vvcep...@apache.org>>

Sent: Friday, January 31, 2020 4:51 PM

To: dev@kafka.apache.org<mailto:dev@kafka.apache.org> 
<dev@kafka.apache.org<mailto:dev@kafka.apache.org>>

Subject: Re: [KAFKA-557] Add emit on change support for Kafka Streams


[EXTERNAL EMAIL] Attention: This email was sent from outside TiVo. DO NOT CLICK 
any links or attachments unless you expected them.

________________________________



Hello all,


Sorry for my silence. It seems like we are getting close to consensus.

Hopefully, we could move to a vote soon!


All of the reasoning from Matthias and Bruno around timestamp is compelling. I

would be strongly in favor of stating a few things very clearly in the KIP:

1. Streams will drop no-op updates only for KTable operations.


  That is, we won't make any changes to KStream aggregations at the moment. It

  does seem like we can potentially revisit the time semantics of that operation

  in the future, but we don't need to do it now.


  On the other hand, the proposed semantics for KTable timestamps (marking the

  beginning of the validity of that record) makes sense to me.


2. Streams will only drop no-op updates for _stateful_ KTable operations.


  We don't want to add a hard guarantee that Streams will _never_ emit a no-op

  table update because it would require adding state to otherwise stateless

  operations. If someone is really concerned about a particular stateless

  operation producing a lot of no-op results, all they have to do is

  materialize it, and Streams would automatically drop the no-ops.


Additionally, I'm +1 on not adding an opt-out at this time.


Regarding the KIP itself, I would clean it up a bit before calling for a vote.

There is a lot of "discussion"-type language there, which is very natural to

read, but makes it a bit hard to see what _exactly_ the kip is proposing.


Richard, would you mind just making the "proposed behavior change" a simple and

succinct list of bullet points? I.e., please drop glue phrases like "there has

been some discussion" or "possibly we could do X". For the final version of the

KIP, it should just say, "Streams will do X, Streams will do Y". Feel free to

add an elaboration section to explain more about what X and Y mean, but we don't

need to talk about possibilities or alternatives except in the "rejected

alternatives" section.


Accordingly, can you also move the options you presented in the intro to the

"rejected alternatives" section and only mention the final proposal itself?


This just really helps reviewers to know what they are voting for, and it helps

everyone after the fact when they are trying to get clarity on what exactly the

proposal is, versus all the things it could have been.


Thanks,

-John



On Mon, Jan 27, 2020, at 18:14, Richard Yu wrote:

Hello to all,


I've finished making some initial modifications to the KIP.

I have decided to keep the implementation section in the KIP for

record-keeping purposes.


For now, we should focus on only the proposed behavior changes instead.


See if you have any comments!


Cheers,

Richard


On Sat, Jan 25, 2020 at 11:12 AM Richard Yu 
<yohan.richard...@gmail.com<mailto:yohan.richard...@gmail.com>>

wrote:


Hi all,


Thanks for all the discussion!


@John and @Bruno I will survey other possible systems and see what I can

do.

Just a question, by systems, I suppose you would mean the pros and cons of

different reporting strategies?


I'm not completely certain on this point, so it would be great if you can

clarify on that.


So here's what I got from all the discussion so far:


  - Since both Matthias and John seems to have come to a consensus on

  this, then we will go for an all-round behavorial change for KTables. After

  some thought, I decided that for now, an opt-out config will not be added.

  As John have pointed out, no-op changes tend to explode further down the

  topology as they are forwarded to more and more processor nodes downstream.

  - About using hash codes, after some explanation from John, it looks

  like hash codes might not be as ideal (for implementation). For now, we

  will omit that detail, and save it for the PR.

  - @Bruno You do have valid concerns. Though, I am not completely

  certain if we want to do emit-on-change only for materialized KTables. I

  will put it down in the KIP regardless.


I will do my best to address all points raised so far on the discussion.

Hope we could keep this going!


Best,

Richard


On Fri, Jan 24, 2020 at 6:07 PM Bruno Cadonna 
<br...@confluent.io<mailto:br...@confluent.io>> wrote:


Thank you Matthias for the use cases!


Looking at both use cases, I think you need to elaborate on them in

the KIP, Richard.


Emit from plain KTable:

I agree with Matthias that the lower timestamp makes sense because it

marks the start of the validity of the record. Idempotent records with

a higher timestamp can be safely ignored. A corner case that I

discussed with Matthias offline is when we do not materialize a KTable

due to optimization. Then we cannot avoid the idempotent records

because we do not keep the first record with the lower timestamp to

compare to.


Emit from KTable with aggregations:

If we specify that an aggregation result should have the highest

timestamp of the records that participated in the aggregation, we

cannot ignore any idempotent records. Admittedly, the result of an

aggregation usually changes, but there are aggregations where the

result may not change like min and max, or sum when the incoming

records have a value of zero. In those cases, we could benefit of the

emit on change, but only if we define the semantics of the

aggregations to not use the highest timestamp of the participating

records for the result. In Kafka Streams, we do not have min, max, and

sum as explicit aggregations, but we need to provide an API to define

what timestamp should be used for the result of an aggregation if we

want to go down this path.


All of this does not block this KIP and I just wanted to put this

aspects up for discussion. The KIP can limit itself to emit from

materialized KTables. However, the limits should be explicitly stated

in the KIP.


Best,

Bruno




On Fri, Jan 24, 2020 at 10:58 AM Matthias J. Sax 
<matth...@confluent.io<mailto:matth...@confluent.io>>

wrote:


IMHO, the question about semantics depends on the use case, in

particular on the origin of a KTable.


If there is a changlog topic that one reads directly into a KTable,

emit-on-change does actually make sense, because the timestamp indicates

_when_ the update was _effective_. For this case, it is semantically

sound to _not_ update the timestamp in the store, because the second

update is actually idempotent and advancing the timestamp is not ideal

(one could even consider it to be wrong to advance the timestamp)

because the "valid time" of the record pair did not change.


This reasoning also applies to KTable-KTable joins.


However, if the KTable is the result of an aggregation, I think

emit-on-update is more natural, because the timestamp reflects the

_last_ time (ie, highest timestamp) of all input records the contributed

to the result. Hence, updating the timestamp and emitting a new record

actually sounds correct to me. This applies to windowed and non-windowed

aggregations IMHO.


However, considering the argument that the timestamp should not be

update in the first case in the store to begin with, both cases are

actually the same, and both can be modeled as emit-on-change: if a

`table()` operator does not update the timestamp if the value does not

change, there is _no_ change and thus nothing is emitted. At the same

time, if an aggregation operator does update the timestamp (even if the

value does not change) there _is_ a change and we emit.


Note that handling out-of-order data for aggregations would also work

seamlessly with this approach -- for out-of-order records, the timestamp

does never change, and thus, we only emit if the result itself changes.


Therefore, I would argue that we might not even need any config, because

the emit-on-change behavior is just correct and reduced the downstream

load, while our current behavior is not ideal (even if it's also

correct).


Thoughts?


-Matthias


On 1/24/20 9:37 AM, John Roesler wrote:

Hi Bruno,


Thanks for that idea. I hadn't considered that

option before, and it does seem like that would be

the right place to put it if we think it might be

semantically important to control on a

table-by-table basis.


I had been thinking of it less semantically and

more practically. In the context of a large

topology, or more generally, a large software

system that contains many topologies and other

event-driven systems, each no-op result becomes an

input that is destined to itself become a no-op

result, and so on, all the way through the system.

Thus, a single pointless processing result becomes

amplified into a large number of pointless

computations, cache perturbations, and network

and disk I/O operations. If you also consider

operations with fan-out implications, like

branching or foreign-key joins, the wasted

resources are amplified not just in proportion to

the size of the system, but the size of the system

times the average fan-out (to the power of the

number of fan-out operations on the path(s)

through the system).


In my time operating such systems, I've observed

these effects to be very real, and actually, the

system and use case doesn't have to be very large

before the amplification poses an existential

threat to the system as a whole.


This is the basis of my advocating for a simple

behavior change, rather than an opt-in config of

any kind. It seems like Streams should "do the

right thing" for the majority use case. My theory

(which may be wrong) is that the majority use case

is more like "relational queries" than "CEP

queries". Even if you were doing some

event-sensitive computation, wouldn't you do them

as Stream operations (where this feature is

inapplicable anyway)?


In keeping with the "practical" perspective, I

suggested the opt-out config only in the (I think

unlikely) event that filtering out pointless

updates actually harms performance. I'd also be

perfectly fine without the opt-out config. I

really think that (because of the timestamp

semantics work already underway), we're already

pre-fetching the prior result most of the time, so

there would actually be very little extra I/O

involved in implementing emit-on-change.


However, we should consider whether my experience

is likely to be general. Do you have some use

case in mind for which you'd actually want some

KTable results to be emit-on-update for semantic

reasons?


Thanks,

-John



On Fri, Jan 24, 2020, at 11:02, Bruno Cadonna wrote:

Hi Richard,


Thank you for the KIP.


I agree with John that we should focus on the interface and behavior

change in a KIP. We can discuss the implementation later.


I am also +1 for the survey.


I had a thought about this. Couldn't we consider emit-on-change to be

one config of suppress (like `untilWindowCloses`)? What you basically

propose is to suppress updates if they do not change the result.

Considering emit on change as a flavour of suppress would be more

flexible because it would specify the behavior locally for a KTable

instead of globally for all KTables. Additionally, specifying the

behavior in one place instead of multiple places feels more intuitive

and consistent to me.


Best,

Bruno


On Fri, Jan 24, 2020 at 7:49 AM John Roesler 
<vvcep...@apache.org<mailto:vvcep...@apache.org>>

wrote:


Hi Richard,


Thanks for picking this up! I know of at least one large community

member

for which this feature is absolutely essential.


If I understand your two options, it seems like the proposal is to

implement

it as a behavior change regardless, and the question is whether to

provide

an opt-out config or not.


Given that any implementation of this feature would have some

performance

impact under some workloads, and also that we don't know if anyone

really

depends on emit-on-update time semantics, it seems like we should

propose

to add an opt-out config. Can you update the KIP to mention the

exact

config key and value(s) you'd propose?


Just to move the discussion forward, maybe something like:

   emit.on := change|update

with the new default being "change"


Thanks for pointing out the timestamp issue in particular. I agree

that if

we discard the latter update as a no-op, then we also have to

discard its

timestamp (obviously, we don't forward the timestamp update, as

that's

the whole point, but we also can't update the timestamp in the

store, as

the store must remain consistent with what has been emitted).


I have to confess that I disagree with your implementation

proposal, but

it's also not necessary to discuss implementation in the KIP. Maybe

it would

be less controversial if you just drop that section for now, so

that the KIP

discussion can focus on the behavior change and config.


Just for reference, there is some research into this domain. For

example,

see the "Report" section (3.2.3) of the SECRET paper:


https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fpeople.csail.mit.edu%2Ftatbul%2Fpublications%2Fmaxstream_vldb10.pdf&amp;data=02%7C01%7CThomas.Becker%40tivo.com%7C3311a4c9f4ec4c633e2808d7a6b6604d%7Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C1%7C637161174553313197&amp;sdata=aMm0982jNXDcnz7pqckOPybn36%2BgT%2BU6ed9Lh55woSo%3D&amp;reserved=0


It might help to round out the proposal if you take a brief survey

of the

behaviors of other systems, along with pros and cons if any are

reported.


Thanks,

-John



On Fri, Jan 10, 2020, at 22:27, Richard Yu wrote:

Hi everybody!


I'd like to propose a change that we probably should've added for

a long

time now.


The key benefit of this KIP would be reduced traffic in Kafka

Streams since

a lot of no-op results would no longer be sent downstream.

Here is the KIP for reference.



https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-557%253A%2BAdd%2Bemit%2Bon%2Bchange%2Bsupport%2Bfor%2BKafka%2BStreams&amp;data=02%7C01%7CThomas.Becker%40tivo.com%7C3311a4c9f4ec4c633e2808d7a6b6604d%7Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C1%7C637161174553313197&amp;sdata=GiE6FG30PjU9zhv2fw3VKOk44V1yFczOYydpRPCTcxI%3D&amp;reserved=0


Currently, I seek to formalize our approach for this KIP first

before we

determine concrete API additions / configurations.

Some configs might warrant adding, whiles others are not necessary

since

adding them would only increase complexity of Kafka Streams.


Cheers,

Richard








________________________________


This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
is authorized to conclude any binding agreement on behalf of TiVo by email. 
Binding agreements with TiVo may only be made by a signed written agreement.


--
[cid:f2e07bc024217d84b72c4f87a4b0d3a868248e6f.camel@tivo.com] Tommy Becker
Principal Engineer
Personalized Content Discovery
O +1 919.460.4747
tivo.com<http://www.tivo.com/>

________________________________

This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
is authorized to conclude any binding agreement on behalf of TiVo by email. 
Binding agreements with TiVo may only be made by a signed written agreement.

Reply via email to