Technically speaking, "late data" represents the data which cannot be
processed due to the fact the engine threw out the state associated with
the data already.

That said, the only reason watermark does exist for streaming is to handle
stateful operators. From the engine's point of view, there is no concept
about "late data" for stateless query. It's something users have to
leverage "filter" by themselves, without relying on the value of watermark.
I guess someone may see some benefit of automatic tracking of trend for
event time and want to define late data based on the watermark even in
stateless query, but personally I don't hear about the request so far.

As a workaround you can leverage flatMapGroupsWithState which provides the
value of watermark for you, but I'd agree it's too heavyweight just to do
this. If we see consistent demand on it, we could probably look into it and
maybe introduce a new SQL function (which works only on streaming - that's
probably a major blocker on introduction) on it.

On Mon, Oct 9, 2023 at 11:03 AM Bartosz Konieczny <bartkoniec...@gmail.com>
wrote:

> Hi,
>
> I've been analyzing the watermark propagation added in the 3.5.0 recently
> and had to return to the basics of watermarks. One question is still
> unanswered in my head.
>
> Why are the watermarks reserved to stateful queries? Can't they apply to
> the filtering late date out only?
>
> The reason is only historical, as the initial design doc
> <https://docs.google.com/document/d/1z-Pazs5v4rA31azvmYhu4I5xwqaNQl6ZLIS03xhkfCQ/edit>
> mentions the aggregated queries exclusively? Or are there any technical
> limitations why writing the jobs like below don't drop late data
> automatically?
>
>     import sparkSession.implicits._
>     implicit val sparkContext = sparkSession.sqlContext
>     val clicksStream = MemoryStream[Click]
>     val clicksWithWatermark = clicksStream.toDF
>       .withWatermark("clickTime", "10 minutes")
>     val query =
> clicksWithWatermark.writeStream.format("console").option("truncate", false)
>       .start()
>
>     clicksStream.addData(Seq(
>       Click(1, Timestamp.valueOf("2023-06-10 10:10:00")),
>       Click(2, Timestamp.valueOf("2023-06-10 10:12:00")),
>       Click(3, Timestamp.valueOf("2023-06-10 10:14:00"))
>     ))
>
>
>     query.processAllAvailable()
>
>     clicksStream.addData(Seq(
>       Click(4, Timestamp.valueOf("2023-06-10 11:00:40")),
>       Click(5, Timestamp.valueOf("2023-06-10 11:00:30")),
>       Click(6, Timestamp.valueOf("2023-06-10 11:00:10")),
>       Click(10, Timestamp.valueOf("2023-06-10 10:00:10"))
>     ))
>     query.processAllAvailable()
>
> One quick implementation could be adding a new physical plan rule to the
> IncrementalExecution
> <https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala>
> for the EventTimeWatermark node. That's a first thought, maybe too
> simplistic and hiding some pitfalls?
>
> Best,
> Bartosz.
> --
> freelance data engineer
> https://www.waitingforcode.com
> https://github.com/bartosz25/
> https://twitter.com/waitingforcode
>
>

Reply via email to