Hey John, Since this is a common question and I've seen many users asking about window semantics like this, could you file a JIRA ticket for creating a wiki page like Join Semantics ( https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics) to summarize the windowing operations like this too?
Guozhang On Sat, Jan 18, 2020 at 3:22 PM John Roesler <vvcep...@apache.org> wrote: > Glad it helped! > -John > > On Sat, Jan 18, 2020, at 12:27, Viktor Markvardt wrote: > > Hi John, > > > > Thank you for your assistance! > > Your example very help me and I understood kafka-streams more clearly > now. > > Have a nice weekend :) > > > > Best regards, > > Viktor Markvardt > > > > чт, 16 янв. 2020 г. в 19:29, John Roesler <vvcep...@apache.org>: > > > > > Hi Viktor, > > > > > > I’m starting to wonder what exactly “duplicate” means in this context. > Can > > > you elaborate? > > > > > > In case it helps, with your window definition, if I send a record with > > > timestamp 20, it would actually belong to three different windows: > > > [0,30) > > > [10,40) > > > [20,50) > > > > > > Because of this, you would (correctly) see three output records for > that > > > one input, but the outputs wouldn’t be “duplicates” properly, because > > > they’d have different keys: > > > > > > Input: > > > Key1: Val1 @ timestamp:20 > > > > > > Output: > > > Windowed<Window(0,30),Key1>: 1 > > > Windowed<Window(10,40),Key1>: 1 > > > Windowed<Window(20,50),Key1>: 1 > > > > > > Any chance that explains your observation? > > > > > > Thanks, > > > John > > > > > > > > > > > > On Thu, Jan 16, 2020, at 10:18, Viktor Markvardt wrote: > > > > Hi John, > > > > > > > > Thanks for answering my questions! > > > > I observe behavior which I can not understand. > > > > The code is working, but when delay between records larger then > window > > > > duration I receive duplicated records. > > > > With the code below I received duplicated records in the output > kstream. > > > > Count of duplicate records is always 3. If I change > duration/advanceBy > > > > count of duplicated records is changing also. > > > > Do you have any ideas why duplicated records are received in the > output > > > > kstream? > > > > > > > > KStream<String, String> windowedStream = source > > > > .groupByKey() > > > > > > > > > > > > .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ZERO).advanceBy(Duration.ofSeconds(10))) > > > > .count() > > > > > > > > > > > > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) > > > > .toStream(); > > > > > > > > > > > > Best regards, > > > > Viktor Markvardt > > > > > > > > чт, 16 янв. 2020 г. в 04:59, John Roesler <vvcep...@apache.org>: > > > > > > > > > Hi Viktor, > > > > > > > > > > I’m not sure why you get two identical outputs in response to a > single > > > > > record. Regardless, since you say that you want to get a single, > final > > > > > result for the window and you expect multiple inputs to the > windows, > > > you > > > > > need Suppression. > > > > > > > > > > My guess is that you just sent one record to try it out and didn’t > see > > > any > > > > > output? This is expected. Just as the window boundaries are > defined by > > > the > > > > > time stamps of the records, not by the current system time, > > > suppression is > > > > > governed by the timestamp of the records. I.e., a thirty-second > window > > > is > > > > > not actually closed until you see a new record with a timestamp > thirty > > > > > seconds later. > > > > > > > > > > Maybe try just sending a sequence of updates with incrementing > > > > > timestamps. If the first record has timestamp T, then you should > see an > > > > > output when you pass in a record with timestamp T+30. > > > > > > > > > > Important note: there is a built-in grace period that delays the > > > output of > > > > > final results after the window ends. For complicated reasons, the > > > default > > > > > is 24 hours! So you would actually not see an output until you > send a > > > > > record with timestamp T+30+(24 hours) ! I strongly recommend you > set > > > the > > > > > grace period on TimeWindows to zero for your testing. You can > increase > > > it > > > > > later if you want to tolerate some late-arriving records. > > > > > > > > > > Thanks, > > > > > -John > > > > > > > > > > On Tue, Jan 14, 2020, at 10:41, Viktor Markvardt wrote: > > > > > > Hi, > > > > > > > > > > > > My name is Viktor. I'm currently working with Kafka streams and > have > > > > > > several questions about Kafka and I can not find answers in the > > > official > > > > > > docs. > > > > > > > > > > > > 1. Why suppress functionality does not work with Hopping windows? > > > How to > > > > > > make it work? > > > > > > > > > > > > Example of the code: > > > > > > > > > > > > KStream<String, String> finalStream = source > > > > > > .groupByKey() > > > > > > > > > > > > > > > > > > > > > .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10))) > > > > > > .reduce((aggValue, newValue) -> newValue, > > > > > > Materialized.with(Serdes.String(), Serdes.String())) > > > > > > > > > > > > > > > > > > > > > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) > > > > > > .toStream(); > > > > > > > > > > > > finalStream.print(Printed.toSysOut()); > > > > > > finalStream.to(outputTopic); > > > > > > > > > > > > After I run the code above - output stream is empty. There were > no > > > > > > errors/exceptions. > > > > > > NOTE: With Tumbling Window the code working as expected. > > > > > > Maybe I simply use it incorrectly? > > > > > > > > > > > > 2. Why with Hopping windows (without suppress) there are > duplicates > > > in > > > > > the > > > > > > output stream? > > > > > > E.g., I send one record in the input kstream with Hopping window > > > > > > (duration=30s, advanceBy=2s) but get two same records > (duplicate) in > > > the > > > > > > output kstream. > > > > > > Is that an expected behavior? If so, how can I filter/switch off > > > these > > > > > > duplicates? > > > > > > > > > > > > 3. Mainly I'm trying to solve this problem: > > > > > > I have kstream with events inside and events can be repeated > > > > > (duplicates). > > > > > > In the output kstream I would like to receive only unique events > for > > > the > > > > > > last 24 hours (window duration) with 1 hour window overlay > (window > > > > > > advanceBy). > > > > > > Could you recommend me any examples of code or docs please? > > > > > > I have already read official docs and examples but it was not > enough > > > to > > > > > get > > > > > > full understanding of how I can achieve this. > > > > > > > > > > > > Best regards, > > > > > > Viktor Markvardt > > > > > > > > > > > > > > > > > > > > > -- -- Guozhang