sample code: Let's say Xyz is POJO with a field called timestamp,
regarding code withWatermark("timestamp", "20 seconds") I expect the msg with timestamp 20 seconds or older will be dropped, what does 20 seconds compare to? based on my test nothing was dropped no matter how old the timestamp is, what did i miss? Dataset<Xyz> xyz = lines .as(Encoders.STRING()) .map((MapFunction<String, Xyz>) value -> mapper.readValue(value, Xyz.class), Encoders.bean(Xyz.class)); Dataset<Row> aggregated = xyz.withWatermark("timestamp", "20 seconds") .groupBy(functions.window(xyz.col("timestamp"), "5 seconds"), xyz.col("x") //tumbling window of size 5 seconds (timestamp) ).count(); Thanks