slight correction/clarification: We now take the "previous" watermark to
determine the late record, because they are valid inputs for non-first
stateful operators dropping records based on the same criteria would drop
valid records from previous (upstream) stateful operators. Please look back
which criteria we use for evicting states, which could become outputs of
the operator.

On Tue, Oct 10, 2023 at 8:10 PM Jungtaek Lim <kabhwan.opensou...@gmail.com>
wrote:

> We wouldn't like to expose the internal mechanism to the public.
>
> As you are a very detail oriented engineer tracking major changes, you
> might notice that we "changed" the definition of late record while fixing
> late records. Previously the late record is defined as a record having
> event time timestamp be earlier than the "current" watermark. How has it
> changed? We now take the "previous" watermark to determine the late record,
> because they are valid inputs for non-first stateful operators. If we were
> exposing the function current_watermark() which provides current watermark
> and users somehow build a side-output based on this, it would be broken
> when we introduce the fix on late record filtering. Or even worse, we may
> decide not to fix the issue worrying too much about existing workloads, and
> give up multiple stateful operators.
>
> The change is arguably not a breaking change, because we never guarantee
> that we won't process the data which is earlier than the watermark. The
> guarantee is one way, we guarantee that the record is processed if the
> event time of the record is later than the watermark. The opposite way is
> not guaranteed, and we actually documented this in the guide doc.
>
> So the workaround I mentioned cannot be used for capturing dropped late
> records - that does not work as expected. We will need to apply exactly the
> same criteria (probably the same predicate) on capturing them. We are aware
> of the demand for side-output of dropped late records, and I also agree
> that just having numbers of dropped records is never ideal.
>
> Let's see whether we have an opportunity to prioritize this. If you have
> an idea (sketched design) for implementing this, that should be awesome!
>
> On Tue, Oct 10, 2023 at 6:27 PM Bartosz Konieczny <bartkoniec...@gmail.com>
> wrote:
>
>> Thank you for the clarification, Jungtaek 🙏 Indeed, it doesn't sound
>> like a highly demanded feature from the end users, haven't seen that a lot
>> on StackOverflow or mailing lists. I was just curious about the reasons.
>>
>> Using the arbitrary stateful processing could be indeed a workaround! But
>> IMHO it would be easier to expose this watermark value from a function like
>> a current_watermark() and let the users do anything with the data. And
>> it wouldn't require having the state store overhead to deal with. The
>> function could simplify implementing the *side output pattern* where we
>> could process the on-time data differently from the late data, e.g. write
>> late data to a dedicated space in the lake and facilitate the backfilling
>> for the batch pipelines?
>>
>> With the current_watermark function it could be expressed as a simple:
>>
>> streamDataset.foreachBatch((dataframe, batchVersion) =>  {
>>   dataframe.cache()
>>   dataframe.filter(current_watermark() >
>> event_time_from_datafarame).writeTo("late_data")
>>   dataframe.filter(current_watermark() <=
>> event_time_from_datafarame).writeTo("on_time_data")
>> })
>>
>> A little bit as you can do with Apache Flink in fact:
>>
>> https://github.com/immerok/recipes/blob/main/late-data-to-sink/src/main/java/com/immerok/cookbook/LateDataToSeparateSink.java#L81
>>
>> WDYT?
>>
>> Best,
>> Bartosz.
>>
>> PS. Will be happy to contribute on that if the feature does make sense ;)
>>
>> On Tue, Oct 10, 2023 at 3:23 AM Jungtaek Lim <
>> kabhwan.opensou...@gmail.com> wrote:
>>
>>> Technically speaking, "late data" represents the data which cannot be
>>> processed due to the fact the engine threw out the state associated with
>>> the data already.
>>>
>>> That said, the only reason watermark does exist for streaming is to
>>> handle stateful operators. From the engine's point of view, there is no
>>> concept about "late data" for stateless query. It's something users have to
>>> leverage "filter" by themselves, without relying on the value of watermark.
>>> I guess someone may see some benefit of automatic tracking of trend for
>>> event time and want to define late data based on the watermark even in
>>> stateless query, but personally I don't hear about the request so far.
>>>
>>> As a workaround you can leverage flatMapGroupsWithState which provides
>>> the value of watermark for you, but I'd agree it's too heavyweight just to
>>> do this. If we see consistent demand on it, we could probably look into it
>>> and maybe introduce a new SQL function (which works only on streaming -
>>> that's probably a major blocker on introduction) on it.
>>>
>>> On Mon, Oct 9, 2023 at 11:03 AM Bartosz Konieczny <
>>> bartkoniec...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I've been analyzing the watermark propagation added in the 3.5.0
>>>> recently and had to return to the basics of watermarks. One question is
>>>> still unanswered in my head.
>>>>
>>>> Why are the watermarks reserved to stateful queries? Can't they apply
>>>> to the filtering late date out only?
>>>>
>>>> The reason is only historical, as the initial design doc
>>>> <https://docs.google.com/document/d/1z-Pazs5v4rA31azvmYhu4I5xwqaNQl6ZLIS03xhkfCQ/edit>
>>>> mentions the aggregated queries exclusively? Or are there any technical
>>>> limitations why writing the jobs like below don't drop late data
>>>> automatically?
>>>>
>>>>     import sparkSession.implicits._
>>>>     implicit val sparkContext = sparkSession.sqlContext
>>>>     val clicksStream = MemoryStream[Click]
>>>>     val clicksWithWatermark = clicksStream.toDF
>>>>       .withWatermark("clickTime", "10 minutes")
>>>>     val query =
>>>> clicksWithWatermark.writeStream.format("console").option("truncate", false)
>>>>       .start()
>>>>
>>>>     clicksStream.addData(Seq(
>>>>       Click(1, Timestamp.valueOf("2023-06-10 10:10:00")),
>>>>       Click(2, Timestamp.valueOf("2023-06-10 10:12:00")),
>>>>       Click(3, Timestamp.valueOf("2023-06-10 10:14:00"))
>>>>     ))
>>>>
>>>>
>>>>     query.processAllAvailable()
>>>>
>>>>     clicksStream.addData(Seq(
>>>>       Click(4, Timestamp.valueOf("2023-06-10 11:00:40")),
>>>>       Click(5, Timestamp.valueOf("2023-06-10 11:00:30")),
>>>>       Click(6, Timestamp.valueOf("2023-06-10 11:00:10")),
>>>>       Click(10, Timestamp.valueOf("2023-06-10 10:00:10"))
>>>>     ))
>>>>     query.processAllAvailable()
>>>>
>>>> One quick implementation could be adding a new physical plan rule to
>>>> the IncrementalExecution
>>>> <https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala>
>>>> for the EventTimeWatermark node. That's a first thought, maybe too
>>>> simplistic and hiding some pitfalls?
>>>>
>>>> Best,
>>>> Bartosz.
>>>> --
>>>> freelance data engineer
>>>> https://www.waitingforcode.com
>>>> https://github.com/bartosz25/
>>>> https://twitter.com/waitingforcode
>>>>
>>>>
>>
>> --
>> Bartosz Konieczny
>> freelance data engineer
>> https://www.waitingforcode.com
>> https://github.com/bartosz25/
>> https://twitter.com/waitingforcode
>>
>>

Reply via email to