@vvcephei I have not reviewed the latest changes on this PR yet, but here are two meta-level thought I'd like to share:
1. Serdes: here's my reasoning on whether we need to enforce serdes. We have the following scenarios: a) The KTable-to-be-suppressed (I will just call it KTable from now on) has user-specified serdes. In this case we do not need to require serdes again for suppression. b) The KTable is materialized and users do not specify serdes during materialization. In this case we will try to use the default ones from config (or we can use the inherited ones in the future, but that is not guaranteed to be always correct anyways), so if the default serde to use is incorrect, we will get the exception even before suppression at all. So we do not need to require serdes either. c) The KTable is NOT materialized and users do not specify serdes. Today this case is not possible but in the future it may be the case due to optimizations, e.g. `KTable#filter / mapValues` generated KTable. In this case if we do not require users to specify serdes and default ones are not correct, it will indeed have unexpected exceptions. But I think this case can still be walk-around by users to provide the `Materialized` object in those operators; plus in the future we can have further optimization to "push the suppression ahead" which I will talk about later in this comment. So in sum, I think it is not necessary to always enforce users to provide serdes in the buffer config. 2. Changelogs: About whether or not we should add new changelog topics for the suppression buffer itself, I think it depends on how we will implement the commit behavior. My suggestion is the following: a) for size / interval based intermediate suppression, we will still honor the commit operation to always "flush" the suppression buffer, i.e. the intermediate suppression is still best-effort which yields to commits. In this case, we do not need to bookkeep "what records have been buffered and not emitted" in the changelog either but can simply assume none have been emitted until commit. b) for final result suppression of window stores, we cannot yield to commit because that will violate the intended guarantees, BUT since we will not emit any records before the grace deadline anyways, the "state" of the buffer does not need to be book-kept anyways: if it is beyond the grace deadline, then every records should be flushed, otherwise, none should be flushed. Note that the above approach works for both EOS and non-EOS: for non-EOS, if there is a crash in between commits, we may emit a single record multiple times but that is fine for non-EOS; for EOS, if there is a crash in between commits we need to restore the whole state from the beginning anyways as of now (until we have consistent checkpoints someday), so this is also fine. One caveat though is that 2.b) above relies on the current stream time to determine upon re-start whether or not the window store data have been emitted to downstream; but stream time today is not deterministically defined even with KIP-353 merged in so if we re-process, it may generate different behavior. I think this is acceptable as of now and can be fixed in the future: for example, we can include the "current stream time" in the commit message when adding consistent checkpoints to make the stream time deterministic on those checkpoints. So in sum, I think we can accept to not have changelogs for the buffer itself as long as we still respect commits except for final result suppression, which we can implement as a special case. In addition, the KTable-before-suppression's changelog can be suppressed along with the buffer as well: we can only write to the change logger when the buffer emits. More details in the next section. 3. Future roadmap: We have discussed about how to re-design KIP-63 after this is done, and one thing is to consider how to maintain the suppression effect on the state store's changelog topics as well. Together it has some implications on our memory management story as well. Here's my thinking following the above proposal on changelogs: a) Say if we remove the buffer on top of the state stores, the saved memory can be given to 1) the added buffer "behind" the state stores, and 2) to enlarge the state store's only write buffer (e.g. rocksDB). b) We can optimize the topology to "implicitly" add suppression when necessary in addition to user-requested suppressions to reduce the traffic to the original KTable's store changlog topics. More concretely, think about the following examples: ``` table2 = table1.filter(); ``` * with logical materialization, only table1 will be materialized. * with logical materialization plus implicit suppression, the above will become: ``` table 2 = table1.suppress().filter() ``` in which case table1's store changelog will be suppressed as well: we only write to the change logger when we emit. Now if users explicitly calls suppression: ``` table2 = table1.filter().suppress(); ``` It will also be re-written to ``` table 2 = table1.suppress().filter(); ``` As well, in which case table1's changelog will be suppressed still based on the `suppress()` config, and then the suppressed changelog stream will be filtered to table2, which can be logically materialized still. c) Finally about memory management: we can deprecate the current `max.cache.bytes` and replace with the `total.memory.bytes` which controls the total amount of memory that Streams will use, similarly to what we described in https://cwiki.apache.org/confluence/display/KAFKA/Discussion%3A+Memory+Management+in+Kafka+Streams. Note this total bytes will cover both user-requested and implicitly added suppression buffers. In other words, each buffer's own buffer config's semantics will be a soft-limit which is only best-effort full-filled, since it is yield to commit interval, AND the total bytes usable. This is just a sketchy thought and may need more detailed implementation discussions. Let me know WDYT. [ Full content available at: https://github.com/apache/kafka/pull/5567 ] This message was relayed via gitbox.apache.org for [email protected]
