[
https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16121855#comment-16121855
]
Xingcan Cui commented on FLINK-7245:
------------------------------------
Hi [~fhueske], I've got some new ideas about the rowtime/watermark.
Currently in an operator with two inputs, the watermarks from different streams
are merged in advance and only lower ones can be reserved. For example, given
two streams {{S1}} and {{S2}}, if {{S1}} is generated in real-time while {{S2}}
gets a two hours delay from now, watermarks from {{S1}} will be totally
discarded since they are always higher than those of {{S2}}. Maybe we could
make watermarks from different streams distinguishable and apply/emit them all.
Specifically, I think we could add an extra field, which indicates the
corresponding rowtime field of a row, in the {{Watermark}} class. For a single
operator, there could be at most {{n}} (where {{n}} is equal to the number of
inputs) rowtime fields activated (that may be deduced from the query
conditions) and only watermarks corresponding to those fields will be *held
back/applied* in the operator. All the watermarks should be emitted to
downstream operators. As a consequence, all the timestamps (which are stored in
rows) and watermarks are reserved and users can operate on different rowtime
fields in different levels of a nested query.
As a running example, consider the following query.
{code:sql}
SELECT COUNT(S1.a)
OVER (PARTITION BY S1.key ORDER BY S2.rowtime RANGE BETWEEN 2 PRECEDING AND
CURRENT ROW)
FROM
(SELECT * FROM S1, S2
WHERE S1.key = S2.key AND S1.rowtime>=S2.rowtime - 10 SECONDS and
S1.rowtime<S2.rowtime + 10 SECONDS)
{code}
We can replace the {{ORDER BY S2.rowtime}} to {{ORDER BY S1.rowtime}} since
both their timestamps and watermarks are reserved.
What do you think?
> Enhance the operators to support holding back watermarks
> --------------------------------------------------------
>
> Key: FLINK-7245
> URL: https://issues.apache.org/jira/browse/FLINK-7245
> Project: Flink
> Issue Type: New Feature
> Components: DataStream API
> Reporter: Xingcan Cui
> Assignee: Xingcan Cui
>
> Currently the watermarks are applied and emitted by the
> {{AbstractStreamOperator}} instantly.
> {code:java}
> public void processWatermark(Watermark mark) throws Exception {
> if (timeServiceManager != null) {
> timeServiceManager.advanceWatermark(mark);
> }
> output.emitWatermark(mark);
> }
> {code}
> Some calculation results (with timestamp fields) triggered by these
> watermarks (e.g., join or aggregate results) may be regarded as delayed by
> the downstream operators since their timestamps must be less than or equal to
> the corresponding triggers.
> This issue aims to add another "working mode", which supports holding back
> watermarks, to current operators. These watermarks should be blocked and
> stored by the operators until all the corresponding new generated results are
> emitted.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)