Hi

20 second corresponds to when the window state should be cleared. For the
late message to be dropped, it should come in after you receive a message
with event time >= window end time + 20 seconds.

I wrote a post on this recently:
http://vishnuviswanath.com/spark_structured_streaming.html#watermark

Thanks,
Vishnu

On Tue, Feb 6, 2018 at 12:11 PM Jiewen Shao <fifistorm...@gmail.com> wrote:

> sample code:
>
> Let's say Xyz is POJO with a field called timestamp,
>
> regarding code withWatermark("timestamp", "20 seconds")
>
> I expect the msg with timestamp 20 seconds or older will be dropped, what
> does 20 seconds compare to? based on my test nothing was dropped no matter
> how old the timestamp is, what did i miss?
>
> Dataset<Xyz> xyz = lines
>         .as(Encoders.STRING())
>         .map((MapFunction<String, Xyz>) value -> mapper.readValue(value, 
> Xyz.class), Encoders.bean(Xyz.class));
>
> Dataset<Row> aggregated = xyz.withWatermark("timestamp", "20 seconds")
>         .groupBy(functions.window(xyz.col("timestamp"), "5 seconds"), 
> xyz.col("x") //tumbling window of size 5 seconds (timestamp)
>         ).count();
>
> Thanks
>
>

Reply via email to