Fabian Hueske commented on FLINK-7245:

Hi [~xccui], thanks for the pointer to you code. I had a look at it and left 
some comments.

I think the idea of supporting multiple watermarks for different fields is 
interesting. However this is nothing that can be easily changed. Currently, the 
Table API is built on top of the DataStream API which only supports a single 

There would be two ways to add support for multiple watermarks

1. extend the DataStream API. This would touch many sensitive parts of Flink's 
core API and might introduce regression or bugs. Also, the DataStream API 
itself would not benefit from that because it hides timestamps and watermarks 
as much as possible from users.
2. move the Table API from the DataStream API. This would mean to write (or 
copy) a lot of code. In the long run, this is probably what will happen anyway 
in order to allow for more low-level optimizations. However, I don't see this 
happen in the near future.

The query you've shown would also work with our current design which would 
delay the (single) watermark until it is behind both time attributes. It would 
add additional latency for the attribute that is ahead, but would be 
semantically correct. If we are clever about the optimization, we could even 
detect which attribute is used in later operations and only delay for those 

I would propose the following: Let's put a hold on this issue (a generic 
watermark delaying operator) for now and and start working on the event-time 
join. FLINK-7337 will be merged soon and in [my 
 I described a lightweight method to delay watermarks without touching the 
public DataStream API. If we want to add an event-time window join for 1.4.0 we 
need to hurry up a bit.

What do you think?

> Enhance the operators to support holding back watermarks
> --------------------------------------------------------
>                 Key: FLINK-7245
>                 URL: https://issues.apache.org/jira/browse/FLINK-7245
>             Project: Flink
>          Issue Type: New Feature
>          Components: DataStream API
>            Reporter: Xingcan Cui
>            Assignee: Xingcan Cui
> Currently the watermarks are applied and emitted by the 
> {{AbstractStreamOperator}} instantly. 
> {code:java}
> public void processWatermark(Watermark mark) throws Exception {
>       if (timeServiceManager != null) {
>               timeServiceManager.advanceWatermark(mark);
>       }
>       output.emitWatermark(mark);
> }
> {code}
> Some calculation results (with timestamp fields) triggered by these 
> watermarks (e.g., join or aggregate results) may be regarded as delayed by 
> the downstream operators since their timestamps must be less than or equal to 
> the corresponding triggers. 
> This issue aims to add another "working mode", which supports holding back 
> watermarks, to current operators. These watermarks should be blocked and 
> stored by the operators until all the corresponding new generated results are 
> emitted.

This message was sent by Atlassian JIRA

Reply via email to