@vvcephei I have not reviewed the latest changes on this PR yet, but here are 
two meta-level thought I'd like to share:

1. Serdes: here's my reasoning on whether we need to enforce serdes. We have 
the following scenarios:

a) The KTable-to-be-suppressed (I will just call it KTable from now on) has 
user-specified serdes. In this case we do not need to require serdes again for 
suppression.

b) The KTable is materialized and users do not specify serdes during 
materialization. In this case we will try to use the default ones from config 
(or we can use the inherited ones in the future, but that is not guaranteed to 
be always correct anyways), so if the default serde to use is incorrect, we 
will get the exception even before suppression at all. So we do not need to 
require serdes either.

c) The KTable is NOT materialized and users do not specify serdes. Today this 
case is not possible but in the future it may be the case due to optimizations, 
e.g. `KTable#filter / mapValues` generated KTable. In this case if we do not 
require users to specify serdes and default ones are not correct, it will 
indeed have unexpected exceptions. But I think this case can still be 
walk-around by users to provide the `Materialized` object in those operators; 
plus in the future we can have further optimization to "push the suppression 
ahead" which I will talk about later in this comment.

So in sum, I think it is not necessary to always enforce users to provide 
serdes in the buffer config.

2. Changelogs:

About whether or not we should add new changelog topics for the suppression 
buffer itself, I think it depends on how we will implement the commit behavior. 
My suggestion is the following:

a) for size / interval based intermediate suppression, we will still honor the 
commit operation to always "flush" the suppression buffer, i.e. the 
intermediate suppression is still best-effort which yields to commits. In this 
case, we do not need to bookkeep "what records have been buffered and not 
emitted" in the changelog either but can simply assume none have been emitted 
until commit.

b) for final result suppression of window stores, we cannot yield to commit 
because that will violate the intended guarantees, BUT since we will not emit 
any records before the grace deadline anyways, the "state" of the buffer does 
not need to be book-kept anyways: if it is beyond the grace deadline, then 
every records should be flushed, otherwise, none should be flushed.

Note that the above approach works for both EOS and non-EOS: for non-EOS, if 
there is a crash in between commits, we may emit a single record multiple times 
but that is fine for non-EOS; for EOS, if there is a crash in between commits 
we need to restore the whole state from the beginning anyways as of now (until 
we have consistent checkpoints someday), so this is also fine.

One caveat though is that 2.b) above relies on the current stream time to 
determine upon re-start whether or not the window store data have been emitted 
to downstream; but stream time today is not deterministically defined even with 
KIP-353 merged in so if we re-process, it may generate different behavior. I 
think this is acceptable as of now and can be fixed in the future: for example, 
we can include the "current stream time" in the commit message when adding 
consistent checkpoints to make the stream time deterministic on those 
checkpoints.

So in sum, I think we can accept to not have changelogs for the buffer itself 
as long as we still respect commits except for final result suppression, which 
we can implement as a special case. In addition, the 
KTable-before-suppression's changelog can be suppressed along with the buffer 
as well: we can only write to the change logger when the buffer emits. More 
details in the next section.


3. Future roadmap:

We have discussed about how to re-design KIP-63 after this is done, and one 
thing is to consider how to maintain the suppression effect on the state 
store's changelog topics as well. Together it has some implications on our 
memory management story as well. Here's my thinking following the above 
proposal on changelogs:

a) Say if we remove the buffer on top of the state stores, the saved memory can 
be given to 1) the added buffer "behind" the state stores, and 2) to enlarge 
the state store's only write buffer (e.g. rocksDB). 

b) We can optimize the topology to "implicitly" add suppression when necessary 
in addition to user-requested suppressions to reduce the traffic to the 
original KTable's store changlog topics. More concretely, think about the 
following examples:

```
table2 = table1.filter();
```

* with logical materialization, only table1 will be materialized.
* with logical materialization plus implicit suppression, the above will become:

```
table 2 = table1.suppress().filter()
```

in which case table1's store changelog will be suppressed as well: we only 
write to the change logger when we emit.

Now if users explicitly calls suppression:

```
table2 = table1.filter().suppress();
```

It will also be re-written to

```
table 2 = table1.suppress().filter();
```

As well, in which case table1's changelog will be suppressed still based on the 
`suppress()` config, and then the suppressed changelog stream will be filtered 
to table2, which can be logically materialized still.

c) Finally about memory management: we can deprecate the current 
`max.cache.bytes` and replace with the `total.memory.bytes` which controls the 
total amount of memory that Streams will use, similarly to what we described in 
https://cwiki.apache.org/confluence/display/KAFKA/Discussion%3A+Memory+Management+in+Kafka+Streams.
 Note this total bytes will cover both user-requested and implicitly added 
suppression buffers. In other words, each buffer's own buffer config's 
semantics will be a soft-limit which is only best-effort full-filled, since it 
is yield to commit interval, AND the total bytes usable.

This is just a sketchy thought and may need more detailed implementation 
discussions. Let me know WDYT.

[ Full content available at: https://github.com/apache/kafka/pull/5567 ]
This message was relayed via gitbox.apache.org for [email protected]

Reply via email to