Hi Viktor,

I’m not sure why you get two identical outputs in response to a single record. 
Regardless, since you say that you want to get a single, final result for the 
window and you expect multiple inputs to the windows, you need Suppression.

My guess is that you just sent one record to try it out and didn’t see any 
output? This is expected. Just as the window boundaries are defined by the time 
stamps of the records, not by the current system time, suppression is governed 
by the timestamp of the records. I.e., a thirty-second window is not actually 
closed until you see a new record with a timestamp thirty seconds later.

 Maybe try just sending a sequence of updates with incrementing timestamps. If 
the first record has timestamp T, then you should see an output when you pass 
in a record with timestamp T+30. 

Important note: there is a built-in grace period that delays the output of 
final results after the window ends. For complicated reasons, the default is 24 
hours! So you would actually not see an output until you send a record with 
timestamp T+30+(24 hours) ! I strongly recommend you set the grace period on 
TimeWindows to zero for your testing. You can increase it later if you want to 
tolerate some late-arriving records. 

Thanks,
-John

On Tue, Jan 14, 2020, at 10:41, Viktor Markvardt wrote:
> Hi,
> 
> My name is Viktor. I'm currently working with Kafka streams and have
> several questions about Kafka and I can not find answers in the official
> docs.
> 
> 1. Why suppress functionality does not work with Hopping windows? How to
> make it work?
> 
> Example of the code:
> 
> KStream<String, String> finalStream = source
>                 .groupByKey()
> 
> .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10)))
>                 .reduce((aggValue, newValue) -> newValue,
> Materialized.with(Serdes.String(), Serdes.String()))
> 
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
>                 .toStream();
> 
> finalStream.print(Printed.toSysOut());
> finalStream.to(outputTopic);
> 
> After I run the code above - output stream is empty. There were no
> errors/exceptions.
> NOTE: With Tumbling Window the code working as expected.
> Maybe I simply use it incorrectly?
> 
> 2. Why with Hopping windows (without suppress) there are duplicates in the
> output stream?
> E.g., I send one record in the input kstream with Hopping window
> (duration=30s, advanceBy=2s) but get two same records (duplicate) in the
> output kstream.
> Is that an expected behavior? If so, how can I filter/switch off these
> duplicates?
> 
> 3. Mainly I'm trying to solve this problem:
> I have kstream with events inside and events can be repeated (duplicates).
> In the output kstream I would like to receive only unique events for the
> last 24 hours (window duration) with 1 hour window overlay (window
> advanceBy).
> Could you recommend me any examples of code or docs please?
> I have already read official docs and examples but it was not enough to get
> full understanding of how I can achieve this.
> 
> Best regards,
> Viktor Markvardt
>

Reply via email to