Hi Dongjin, Ah, I think I may have been confused. I 100% agree that we need a materialized variant for suppress(). Then, you could do: ...suppress(..., Materialized.as(“final-count”))
If that’s your proposal, then we are on the same page. I was under the impression that you wanted to expand the scope of the KIP to additionally allow querying the internal buffer, not just the result. Can you clarify whether you are proposing to allow querying the state of the internal buffer, the result, or both? Thanks, John On Thu, Feb 20, 2020, at 08:41, Dongjin Lee wrote: > Hi John, > Thanks for your kind explanation with an example. > > > But it feels like you're saying you're trying to do something different > than just query the windowed key and get back the current count? > > Yes, for example, what if we need to retrieve the (all or range) keys with > a closed window? In this example, let's imagine we need to retrieve only > (key=A, window=10), not (key=A, window=20). > > Of course, the value accompanied by a flushed key is exactly the same to > the one in the upstream KTable; However, if our intention is not pointing > out a specific key but retrieving a group of unspecified keys, we stuck in > trouble - since we can't be sure which key is flushed out beforehand. > > One workaround would be materializing it with `suppressed.filter(e -> true, > Materialized.as("final-count"))`. But I think providing a materialized > variant for suppress method is better than this workaround. > > Thanks, > Dongjin > > On Thu, Feb 20, 2020 at 1:26 AM John Roesler <vvcep...@apache.org> wrote: > > > Thanks for the response, Dongjin, > > > > I'm sorry, but I'm still not following. It seems like the view you would > > get on the "current state of the buffer" would always be equivalent to > > the view of the upstream table. > > > > Let me try an example, and maybe you can point out the flaw in my > > reasoning. > > > > Let's say we're doing 10 ms windows with a grace period of zero. > > Let's also say we're computing a windowed count, and that we have > > a "final results" suppression after the count. Let's materialize the > > count as "Count" and the suppressed result as "Final Count". > > > > Suppose we get an input event: > > (time=10, key=A, value=...) > > > > Then, Count will look like: > > > > | window | key | value | > > | 10 | A | 1 | > > > > The (internal) suppression buffer will contain: > > > > | window | key | value | > > | 10 | A | 1 | > > > > The record is still buffered because the window isn't closed yet. > > Final Count is an empty table: > > > > | window | key | value | > > > > --------------- > > > > Now, we get a second event: > > (time=15, key=A, value=...) > > > > Then, Count will look like: > > > > | window | key | value | > > | 10 | A | 2 | > > > > The (internal) suppression buffer will contain: > > > > | window | key | value | > > | 10 | A | 2 | > > > > The record is still buffered because the window isn't closed yet. > > Final Count is an empty table: > > > > | window | key | value | > > > > > > --------------- > > > > Finally, we get a third event: > > (time=20, key=A, value=...) > > > > Then, Count will look like: > > > > | window | key | value | > > | 10 | A | 2 | > > | 20 | A | 1 | > > > > The (internal) suppression buffer will contain: > > > > | window | key | value | > > | 20 | A | 1 | > > > > Note that window 10 has been flushed out, because it's now closed. > > And window 20 is buffered because it isn't closed yet. > > Final Count is now: > > > > | window | key | value | > > | 10 | A | 2 | > > > > > > --------------- > > > > Reading your email, I can't figure out what value there is in querying the > > internal suppression buffer, since it only contains exactly the same value > > as > > the upstream table, for each key that is still buffered. But it feels like > > you're saying you're trying to do something different than just query the > > windowed key and get back the current count? > > > > Thanks, > > -John > > > > > > On Wed, Feb 19, 2020, at 09:49, Dongjin Lee wrote: > > > Hi John, > > > > > > 'The intermediate state of the suppression' in KIP does not mean the > > state > > > of upstream KTable - sure, the state of the upstream KTable can be > > queried > > > by materializing the operator immediately before the suppress as you > > shown. > > > What I meant in KIP was the final state of the buffer, which is not > > emitted > > > yet. (I agree, the current description may be confusing; it would be > > better > > > to change it with 'the current state of the suppression' or 'the results > > of > > > the suppression', like the Jira issue > > > <https://issues.apache.org/jira/browse/KAFKA-8403> states.) > > > > > > For a little bit more about the motivation, here is one of my > > experience: I > > > had to build a monitoring application which collects signals from IoT > > > devices (say, a semiconductor production line.) If the number of > > collected > > > signals within the time window is much less than the expected, there may > > be > > > some problems like network hiccup in the systems. We wanted to build the > > > system in the form of a dashboard, but could not by lack of materializing > > > feature. It was precisely the case of querying only the final results of > > a > > > windowed aggregation, as the Jira issue > > > <https://issues.apache.org/jira/browse/KAFKA-8403> states. We finally > > ended > > > in implementing the system in an email alerting system like this > > > < > > https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers/ > > > > > > and had to collect the keys and windows of trouble by hand. > > > > > > I think these kinds of use cases would be much common. Should it be > > > described in the KIP much more in detail? > > > > > > Thanks, > > > Dongjin > > > > > > On Sat, Feb 15, 2020 at 4:43 AM John Roesler <vvcep...@apache.org> > > wrote: > > > > > > > Hi Dongjin, > > > > > > > > Thanks for the KIP! > > > > > > > > Can you explain more about why the internal data structures of > > suppression > > > > should be queriable? The motivation just says that users might want to > > do > > > > it, which seems like it could justify literally anything :) > > > > > > > > One design point of Suppression is that if you wanted to query the > > “final > > > > state”, you can Materialize the suppress itself (which is why it needs > > the > > > > variant); if you wanted to query the “intermediate state”, you can > > > > materialize the operator immediately before the suppress. > > > > > > > > Example: > > > > > > > > ...count(Materialized.as(“intermediate”)) > > > > .supress(untilWindowClosed(), Materialized.as(“final”)) > > > > > > > > I’m not sure what use case would require actually fetching from the > > > > internal buffers. > > > > > > > > Thanks, > > > > John > > > > > > > > > > > > On Fri, Feb 14, 2020, at 07:55, Dongjin Lee wrote: > > > > > Hi devs, > > > > > > > > > > I'd like to reboot the discussion on KIP-508, which aims to support a > > > > > Materialized variant of KTable#suppress. It was initially submitted > > > > several > > > > > months ago but closed by the inactivity. > > > > > > > > > > - KIP: > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-508%3A+Make+Suppression+State+Queriable > > > > > - Jira: https://issues.apache.org/jira/browse/KAFKA-8403 > > > > > > > > > > All kinds of feedback will be greatly appreciated. > > > > > > > > > > Best, > > > > > Dongjin > > > > > > > > > > -- > > > > > *Dongjin Lee* > > > > > > > > > > *A hitchhiker in the mathematical world.* > > > > > *github: <http://goog_969573159/>github.com/dongjinleekr > > > > > <https://github.com/dongjinleekr>linkedin: > > > > kr.linkedin.com/in/dongjinleekr > > > > > <https://kr.linkedin.com/in/dongjinleekr>speakerdeck: > > > > speakerdeck.com/dongjin > > > > > <https://speakerdeck.com/dongjin>* > > > > > > > > > > > > > > > > > > -- > > > *Dongjin Lee* > > > > > > *A hitchhiker in the mathematical world.* > > > *github: <http://goog_969573159/>github.com/dongjinleekr > > > <https://github.com/dongjinleekr>linkedin: > > kr.linkedin.com/in/dongjinleekr > > > <https://kr.linkedin.com/in/dongjinleekr>speakerdeck: > > speakerdeck.com/dongjin > > > <https://speakerdeck.com/dongjin>* > > > > > > > > -- > *Dongjin Lee* > > *A hitchhiker in the mathematical world.* > *github: <http://goog_969573159/>github.com/dongjinleekr > <https://github.com/dongjinleekr>linkedin: kr.linkedin.com/in/dongjinleekr > <https://kr.linkedin.com/in/dongjinleekr>speakerdeck: speakerdeck.com/dongjin > <https://speakerdeck.com/dongjin>* >