Hi Jeyhun,

Thanks for working on this FLIP.

In general, I think it's a good idea to generalize the concept of Watermark
to not only representing the advancing of event time, but general
indicators / events / signals that need to be passed along the data
streams. So +1 for working towards this direction.

However, my major concern after reading this FLIP is that, the current
design might be too complicated. It tries take all possible kinds of events
(timestamp watermark, end-of-data, end-of-partition, internal watermark,
and arbitrary user defined watermarks) into consideration, which
complicates the design when it comes to serialization and propagation.

IMHO, this feature, the ability to send custom events across operators
along the data streams, has the potential to become a major differentiator
of DataStream API V2 comparing to V1. For such a feature, I don't think
it's feasible to design everything properly at the very beginning without
enough user feedbacks. I'd suggest to start with a smaller scope, and build
the feature incrementally as new demands and feedbacks arise.

To be specific, I'd suggest to:
1. Only focus on user facing events, thus Watermarks that are either
generated or handled by user codes (process functions and connectors).
Refactor of existing internal events  does not bring any benefit to users,
and may even unstablize existing mechanisms. We could do that incrementally
after the generalized watermark mechsnism becomes stable.
2. Start with a limited set of supported data types and propagation
strategies. We can add suport for arbitrary types and strategies later, if
proved necessary. By that time, we should be able to better understand the
use cases based on real feedbacks.
3. Try to minimize the set of concepts and apis that users need to
understand and work with, and make them simple and easy to understand. I'm
not saying we should not discuss designs of internal implementations in
this FLIP. Just it would be easier to understand the FLIP if it presents
first how users should understand and use the feature, then the key
internal designs in order to achieve that.

# Some detailed suggestions

## Use cases

Concrete use cases are usually helpful for designing such general
mechanism. You may examine the design by trying to use it to fulfill the
demands from the use cases. In cases you are looking for such use cases in
addition to the event-time watermaks, here are some inputs.
- In FLIP-309/327/329 [1-3], we proposed to detect the data freshness from
source, and use that information for various improvements. In DataStream
API V1, such information is carried by RecordAttributes, which is quite
similar to the genralized watermark except that we do not allow defining
arbitrary custom attributes.
- In Flink CDC, there are two phases, scaning the table at certain time
point, and cosuming the binlog from that time point. In the first phase,
there's only +I but no +U/-U/-D in the changelog, and downstream operators
can do many optimizations based on that information. We haven't bring those
optimizations to the community, because that requires the runtime layer to
understand the concept of table / sql changelogs. If we can send custom
events accross operators, without requiring runtime to understand those
events, the problem would be solved.
- In processing-time temporal join, the join operator does not wait for the
build side to complete before consuming the probe side data. This is
because the build side is contineously updated and the join operator does
not know when the initial build is finished. The result is that, initial
data from the probe side that should be joined with initial data from the
build side are missed. If we can send a signal from the build side source
to the join operator, notifying about the completion of initial build, the
problem would be solved. Similar to the previous case, such information
should not be understood by the runtime layer.

## Watermark Definition

The FLIP defines the new generalized Watermak as "indicators in data
streams", which is a bit too general.

I think we should at least include the following information:
- non-data events / records / indicators
- flow along the data streams
- can be generated / handled by process functions, connectors, and the
framework
- may need to be aligned across parallel data streams

## Types of Watermarks

Requiring users to always implement serializations for custom watermarks
might be a bit too heavy. Alternatively, we may consider only support
primitive types for Watermarks, i.e., BOOLEAN, LONG, etc. If complex types
are proved necessary in future, we can introduce STRING or BYTES so that
users can do custom serde by themselves.

Another benefit of using primitive types is that, it simplifies the
alignment semantics. Currently in this FLIP, users are required to
implement a WatermarkCombiner, which is not trivil. If we only support
limited types, we can (maybe only) provide built-in combiners for users,
e.g., ALL / ANY for BOOLEAN, GREATEST / LEAST for LONG, etc. Combiners for
STRING and BYTES are a bit more complicated, that's why I don't recommend
to support them in the first version.

The primitive types should already cover existing use cases.
- event-time watermark, LONG, LEAST (means we should return the earliest
timestamp among all channels)
- is-processing-backlog (data freshness), BOOLEAN, ALL (means we should
return true only if values recieved from all channels are all true)
- insert-only (changelog that only contains +I), BOOLEAN, ALL
- initial-build completed, BOOLEAN, ALL

## Blocking Alignment?

In addition to providing built-in combiners, another thing that might be
declared is whether we should block the channel for aligning.

I think this is not needed for all the above listed use cases.

The only two examples that I can think of are:
- checkpoint barrier
- schema evolution in flink cdc, where no data of the new schema is allowed
to be processed before data of the old schema from all channels are drained.

Both of them are not user-facing and already have existing solutions. So
maybe we can exclude this from the first version.

## Declaration and Handling

It is a bit confusing that a process function (as shown in TLDR/Example)
has three methods: onWatermark(), watermarkPolicy() and
declareWatermarks(). What are the differences between onWatermark() and
watermarkPolicy()?

IIUC, a process function should only do three things about the watermarks:
- Declare watermarks that it may generate
- Emit watermarks
- Handle received watermarks

Given that watermark is emitted via WatermarkManager.emitWatermark(), there
should be only two methods on ProcessFunction, for declaring and handling
respectively. The current onWatermark() and watermarkPolicy() can be
combined by making onWatermark returns PEEK / POP to indicate whether the
current watermark should be further handled by the framework.

Moreover, supporting custom watermarks means there's no guarantee that all
operators and the framework knows how to handle the watermark. The default
behavior should be defined when declaring the watermark. E.g., a changelog
stream that contains only +I records going through an AGG operator may
result in a changelog stream that contains -U/+U. Therefore, if the
operators and framework don't know how to handle an insert-only watermark,
the safest play is to stop propagating the watermark.

Best,

Xintong


[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255069517

[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+switching+from+batch+to+stream+mode+to+improve+throughput+when+processing+backlog+data

[3]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag



On Wed, Jul 3, 2024 at 6:08 AM Jeyhun Karimov <je.kari...@gmail.com> wrote:

> Hi devs,
>
> I'd like to start a discussion about FLIP-467: Introduce Generalized
> Watermarks [1] .
> This is another sub-FLIP of DataStream API V2 [2].
>
>
> After this FLIP one can declare generalized (custom) watermarks and define
> their custom propagation and alignment process. This FLIP opens new
> prospects to simplify "signal"ing mechanisms inside the Flink runtime and
> at the same time reveals new use-cases.
>
> You can find more details in the FLIP [1].
> Looking forward to hearing your comments, thanks!
>
>
> Best regards,
> Jeyhun
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-467%3A+Introduce+Generalized+Watermarks
> [2]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-408%3A+%5BUmbrella%5D+Introduce+DataStream+API+V2
>

Reply via email to