Setting a new record for procrastination, I've just created this ticket:
https://issues.apache.org/jira/browse/KAFKA-10058

On Sat, Jan 18, 2020, at 22:03, John Roesler wrote:
> Good idea! I’ll make a note to do it when I’m at a computer. 
> 
> On Sat, Jan 18, 2020, at 21:51, Guozhang Wang wrote:
> > Hey John,
> > 
> > Since this is a common question and I've seen many users asking about
> > window semantics like this, could you file a JIRA ticket for creating a
> > wiki page like Join Semantics (
> > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics)
> > to summarize the windowing operations like this too?
> > 
> > Guozhang
> > 
> > On Sat, Jan 18, 2020 at 3:22 PM John Roesler <[email protected]> wrote:
> > 
> > > Glad it helped!
> > > -John
> > >
> > > On Sat, Jan 18, 2020, at 12:27, Viktor Markvardt wrote:
> > > > Hi John,
> > > >
> > > > Thank you for your assistance!
> > > > Your example very help me and I understood kafka-streams more clearly
> > > now.
> > > > Have a nice weekend :)
> > > >
> > > > Best regards,
> > > > Viktor Markvardt
> > > >
> > > > чт, 16 янв. 2020 г. в 19:29, John Roesler <[email protected]>:
> > > >
> > > > > Hi Viktor,
> > > > >
> > > > > I’m starting to wonder what exactly “duplicate” means in this context.
> > > Can
> > > > > you elaborate?
> > > > >
> > > > > In case it helps, with your window definition, if I send a record with
> > > > > timestamp 20, it would actually belong to three different windows:
> > > > > [0,30)
> > > > > [10,40)
> > > > > [20,50)
> > > > >
> > > > > Because of this, you would (correctly) see three output records for
> > > that
> > > > > one input, but the outputs wouldn’t be “duplicates” properly, because
> > > > > they’d have different keys:
> > > > >
> > > > > Input:
> > > > > Key1: Val1 @ timestamp:20
> > > > >
> > > > > Output:
> > > > > Windowed<Window(0,30),Key1>: 1
> > > > > Windowed<Window(10,40),Key1>: 1
> > > > > Windowed<Window(20,50),Key1>: 1
> > > > >
> > > > > Any chance that explains your observation?
> > > > >
> > > > > Thanks,
> > > > > John
> > > > >
> > > > >
> > > > >
> > > > > On Thu, Jan 16, 2020, at 10:18, Viktor Markvardt wrote:
> > > > > > Hi John,
> > > > > >
> > > > > > Thanks for answering my questions!
> > > > > > I observe behavior which I can not understand.
> > > > > > The code is working, but when delay between records larger then
> > > window
> > > > > > duration I receive duplicated records.
> > > > > > With the code below I received duplicated records in the output
> > > kstream.
> > > > > > Count of duplicate records is always 3. If I change
> > > duration/advanceBy
> > > > > > count of duplicated records is changing also.
> > > > > > Do you have any ideas why duplicated records are received in the
> > > output
> > > > > > kstream?
> > > > > >
> > > > > > KStream<String, String> windowedStream = source
> > > > > >     .groupByKey()
> > > > > >
> > > > > >
> > > > >
> > > .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ZERO).advanceBy(Duration.ofSeconds(10)))
> > > > > >     .count()
> > > > > >
> > > > > >
> > > > >
> > > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > > > > >     .toStream();
> > > > > >
> > > > > >
> > > > > > Best regards,
> > > > > > Viktor Markvardt
> > > > > >
> > > > > > чт, 16 янв. 2020 г. в 04:59, John Roesler <[email protected]>:
> > > > > >
> > > > > > > Hi Viktor,
> > > > > > >
> > > > > > > I’m not sure why you get two identical outputs in response to a
> > > single
> > > > > > > record. Regardless, since you say that you want to get a single,
> > > final
> > > > > > > result for the window and you expect multiple inputs to the
> > > windows,
> > > > > you
> > > > > > > need Suppression.
> > > > > > >
> > > > > > > My guess is that you just sent one record to try it out and didn’t
> > > see
> > > > > any
> > > > > > > output? This is expected. Just as the window boundaries are
> > > defined by
> > > > > the
> > > > > > > time stamps of the records, not by the current system time,
> > > > > suppression is
> > > > > > > governed by the timestamp of the records. I.e., a thirty-second
> > > window
> > > > > is
> > > > > > > not actually closed until you see a new record with a timestamp
> > > thirty
> > > > > > > seconds later.
> > > > > > >
> > > > > > >  Maybe try just sending a sequence of updates with incrementing
> > > > > > > timestamps. If the first record has timestamp T, then you should
> > > see an
> > > > > > > output when you pass in a record with timestamp T+30.
> > > > > > >
> > > > > > > Important note: there is a built-in grace period that delays the
> > > > > output of
> > > > > > > final results after the window ends. For complicated reasons, the
> > > > > default
> > > > > > > is 24 hours! So you would actually not see an output until you
> > > send a
> > > > > > > record with timestamp T+30+(24 hours) ! I strongly recommend you
> > > set
> > > > > the
> > > > > > > grace period on TimeWindows to zero for your testing. You can
> > > increase
> > > > > it
> > > > > > > later if you want to tolerate some late-arriving records.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > -John
> > > > > > >
> > > > > > > On Tue, Jan 14, 2020, at 10:41, Viktor Markvardt wrote:
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > My name is Viktor. I'm currently working with Kafka streams and
> > > have
> > > > > > > > several questions about Kafka and I can not find answers in the
> > > > > official
> > > > > > > > docs.
> > > > > > > >
> > > > > > > > 1. Why suppress functionality does not work with Hopping 
> > > > > > > > windows?
> > > > > How to
> > > > > > > > make it work?
> > > > > > > >
> > > > > > > > Example of the code:
> > > > > > > >
> > > > > > > > KStream<String, String> finalStream = source
> > > > > > > >                 .groupByKey()
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > >
> > > .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10)))
> > > > > > > >                 .reduce((aggValue, newValue) -> newValue,
> > > > > > > > Materialized.with(Serdes.String(), Serdes.String()))
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > >
> > > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > > > > > > >                 .toStream();
> > > > > > > >
> > > > > > > > finalStream.print(Printed.toSysOut());
> > > > > > > > finalStream.to(outputTopic);
> > > > > > > >
> > > > > > > > After I run the code above - output stream is empty. There were
> > > no
> > > > > > > > errors/exceptions.
> > > > > > > > NOTE: With Tumbling Window the code working as expected.
> > > > > > > > Maybe I simply use it incorrectly?
> > > > > > > >
> > > > > > > > 2. Why with Hopping windows (without suppress) there are
> > > duplicates
> > > > > in
> > > > > > > the
> > > > > > > > output stream?
> > > > > > > > E.g., I send one record in the input kstream with Hopping window
> > > > > > > > (duration=30s, advanceBy=2s) but get two same records
> > > (duplicate) in
> > > > > the
> > > > > > > > output kstream.
> > > > > > > > Is that an expected behavior? If so, how can I filter/switch off
> > > > > these
> > > > > > > > duplicates?
> > > > > > > >
> > > > > > > > 3. Mainly I'm trying to solve this problem:
> > > > > > > > I have kstream with events inside and events can be repeated
> > > > > > > (duplicates).
> > > > > > > > In the output kstream I would like to receive only unique events
> > > for
> > > > > the
> > > > > > > > last 24 hours (window duration) with 1 hour window overlay
> > > (window
> > > > > > > > advanceBy).
> > > > > > > > Could you recommend me any examples of code or docs please?
> > > > > > > > I have already read official docs and examples but it was not
> > > enough
> > > > > to
> > > > > > > get
> > > > > > > > full understanding of how I can achieve this.
> > > > > > > >
> > > > > > > > Best regards,
> > > > > > > > Viktor Markvardt
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > 
> > 
> > -- 
> > -- Guozhang
> >
>

Reply via email to