We wouldn't like to expose the internal mechanism to the public.

As you are a very detail oriented engineer tracking major changes, you
might notice that we "changed" the definition of late record while fixing
late records. Previously the late record is defined as a record having
event time timestamp be earlier than the "current" watermark. How has it
changed? We now take the "previous" watermark to determine the late record,
because they are valid inputs for non-first stateful operators. If we were
exposing the function current_watermark() which provides current watermark
and users somehow build a side-output based on this, it would be broken
when we introduce the fix on late record filtering. Or even worse, we may
decide not to fix the issue worrying too much about existing workloads, and
give up multiple stateful operators.

The change is arguably not a breaking change, because we never guarantee
that we won't process the data which is earlier than the watermark. The
guarantee is one way, we guarantee that the record is processed if the
event time of the record is later than the watermark. The opposite way is
not guaranteed, and we actually documented this in the guide doc.

So the workaround I mentioned cannot be used for capturing dropped late
records - that does not work as expected. We will need to apply exactly the
same criteria (probably the same predicate) on capturing them. We are aware
of the demand for side-output of dropped late records, and I also agree
that just having numbers of dropped records is never ideal.

Let's see whether we have an opportunity to prioritize this. If you have an
idea (sketched design) for implementing this, that should be awesome!

On Tue, Oct 10, 2023 at 6:27 PM Bartosz Konieczny <bartkoniec...@gmail.com>
wrote:

> Thank you for the clarification, Jungtaek 🙏 Indeed, it doesn't sound like
> a highly demanded feature from the end users, haven't seen that a lot on
> StackOverflow or mailing lists. I was just curious about the reasons.
>
> Using the arbitrary stateful processing could be indeed a workaround! But
> IMHO it would be easier to expose this watermark value from a function like
> a current_watermark() and let the users do anything with the data. And it
> wouldn't require having the state store overhead to deal with. The function
> could simplify implementing the *side output pattern* where we could
> process the on-time data differently from the late data, e.g. write late
> data to a dedicated space in the lake and facilitate the backfilling for
> the batch pipelines?
>
> With the current_watermark function it could be expressed as a simple:
>
> streamDataset.foreachBatch((dataframe, batchVersion) =>  {
>   dataframe.cache()
>   dataframe.filter(current_watermark() >
> event_time_from_datafarame).writeTo("late_data")
>   dataframe.filter(current_watermark() <=
> event_time_from_datafarame).writeTo("on_time_data")
> })
>
> A little bit as you can do with Apache Flink in fact:
>
> https://github.com/immerok/recipes/blob/main/late-data-to-sink/src/main/java/com/immerok/cookbook/LateDataToSeparateSink.java#L81
>
> WDYT?
>
> Best,
> Bartosz.
>
> PS. Will be happy to contribute on that if the feature does make sense ;)
>
> On Tue, Oct 10, 2023 at 3:23 AM Jungtaek Lim <kabhwan.opensou...@gmail.com>
> wrote:
>
>> Technically speaking, "late data" represents the data which cannot be
>> processed due to the fact the engine threw out the state associated with
>> the data already.
>>
>> That said, the only reason watermark does exist for streaming is to
>> handle stateful operators. From the engine's point of view, there is no
>> concept about "late data" for stateless query. It's something users have to
>> leverage "filter" by themselves, without relying on the value of watermark.
>> I guess someone may see some benefit of automatic tracking of trend for
>> event time and want to define late data based on the watermark even in
>> stateless query, but personally I don't hear about the request so far.
>>
>> As a workaround you can leverage flatMapGroupsWithState which provides
>> the value of watermark for you, but I'd agree it's too heavyweight just to
>> do this. If we see consistent demand on it, we could probably look into it
>> and maybe introduce a new SQL function (which works only on streaming -
>> that's probably a major blocker on introduction) on it.
>>
>> On Mon, Oct 9, 2023 at 11:03 AM Bartosz Konieczny <
>> bartkoniec...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I've been analyzing the watermark propagation added in the 3.5.0
>>> recently and had to return to the basics of watermarks. One question is
>>> still unanswered in my head.
>>>
>>> Why are the watermarks reserved to stateful queries? Can't they apply to
>>> the filtering late date out only?
>>>
>>> The reason is only historical, as the initial design doc
>>> <https://docs.google.com/document/d/1z-Pazs5v4rA31azvmYhu4I5xwqaNQl6ZLIS03xhkfCQ/edit>
>>> mentions the aggregated queries exclusively? Or are there any technical
>>> limitations why writing the jobs like below don't drop late data
>>> automatically?
>>>
>>>     import sparkSession.implicits._
>>>     implicit val sparkContext = sparkSession.sqlContext
>>>     val clicksStream = MemoryStream[Click]
>>>     val clicksWithWatermark = clicksStream.toDF
>>>       .withWatermark("clickTime", "10 minutes")
>>>     val query =
>>> clicksWithWatermark.writeStream.format("console").option("truncate", false)
>>>       .start()
>>>
>>>     clicksStream.addData(Seq(
>>>       Click(1, Timestamp.valueOf("2023-06-10 10:10:00")),
>>>       Click(2, Timestamp.valueOf("2023-06-10 10:12:00")),
>>>       Click(3, Timestamp.valueOf("2023-06-10 10:14:00"))
>>>     ))
>>>
>>>
>>>     query.processAllAvailable()
>>>
>>>     clicksStream.addData(Seq(
>>>       Click(4, Timestamp.valueOf("2023-06-10 11:00:40")),
>>>       Click(5, Timestamp.valueOf("2023-06-10 11:00:30")),
>>>       Click(6, Timestamp.valueOf("2023-06-10 11:00:10")),
>>>       Click(10, Timestamp.valueOf("2023-06-10 10:00:10"))
>>>     ))
>>>     query.processAllAvailable()
>>>
>>> One quick implementation could be adding a new physical plan rule to the
>>> IncrementalExecution
>>> <https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala>
>>> for the EventTimeWatermark node. That's a first thought, maybe too
>>> simplistic and hiding some pitfalls?
>>>
>>> Best,
>>> Bartosz.
>>> --
>>> freelance data engineer
>>> https://www.waitingforcode.com
>>> https://github.com/bartosz25/
>>> https://twitter.com/waitingforcode
>>>
>>>
>
> --
> Bartosz Konieczny
> freelance data engineer
> https://www.waitingforcode.com
> https://github.com/bartosz25/
> https://twitter.com/waitingforcode
>
>

Reply via email to