sample code:

Let's say Xyz is POJO with a field called timestamp,

regarding code withWatermark("timestamp", "20 seconds")

I expect the msg with timestamp 20 seconds or older will be dropped, what
does 20 seconds compare to? based on my test nothing was dropped no matter
how old the timestamp is, what did i miss?

Dataset<Xyz> xyz = lines
        .as(Encoders.STRING())
        .map((MapFunction<String, Xyz>) value ->
mapper.readValue(value, Xyz.class), Encoders.bean(Xyz.class));

Dataset<Row> aggregated = xyz.withWatermark("timestamp", "20 seconds")
        .groupBy(functions.window(xyz.col("timestamp"), "5 seconds"),
xyz.col("x") //tumbling window of size 5 seconds (timestamp)
        ).count();

Thanks

Reply via email to