[
https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16103282#comment-16103282
]
Xingcan Cui commented on FLINK-7245:
------------------------------------
Hi [~fhueske], it took me a little time to comprehend how the rowtime works in
current Table/SQL API. To continue the work, I'd like to share more of my
understandings and questions that may be a little *detailed*. I wonder if you
could help confirm or answer them.
Suppose there's a class {{Order(a:Long, b:String)}}.
# When registering a rowtime with the API, e.g.,
{{tEnv.registerDataStream("OrderA", orderA, 'a.rowtime, 'b)}}, I think the
current logic should be that field {{a}} is shaded in the physical schema and
an extra indicator about the rowtime field is added to the logical schema. I
find the following snippet in {{StreamTableEnvironment.extractRowtime()}}.
{code:java}
if (mappedIdx < 0) {
throw new TableException(
s"The rowtime attribute can only replace a valid field. " +
s"${origName.getOrElse(name)} is not a field of type $streamType.")
}
{code}
However, when I tried {{tEnv.registerDataStream("OrderA", orderA, 'a, 'b,
'c.rowtime)}}, it can also be successfully registered with a field {{c}} added.
I know whether allowing the extra field or not both make sense, but is still
confused about that.
# When translating a SQL, the rowtime field is omitted by the initial
"{{Order}} to {{CRow}} operator".
# The planner checks if the rowtime field will be used in a SQL. If the result
turns to be true, this special field will be set with the {{ctx.timestamp()}}
method in the following operator with a generated function.
# The user should manually assign watermarks before registering the
datastream. Now that the rowtime field will be taken as a common field, shall
we consider adding a configurable {{DefaultWatermarkAssigner}} if it is not
provided?
Besides, I found a minor issue in the SQL.html document. It uses an identical
name "rowtime" for the field ( {{tableEnv.registerDataStream("Orders", ds,
"user, product, amount, proctime.proctime, rowtime.rowtime")}}). Readers may be
confused whether they should use the "field name" or the "rowtime" keyword in
the SQL.
Thanks, Xingcan
> Enhance the operators to support holding back watermarks
> --------------------------------------------------------
>
> Key: FLINK-7245
> URL: https://issues.apache.org/jira/browse/FLINK-7245
> Project: Flink
> Issue Type: New Feature
> Components: DataStream API
> Reporter: Xingcan Cui
> Assignee: Xingcan Cui
>
> Currently the watermarks are applied and emitted by the
> {{AbstractStreamOperator}} instantly.
> {code:java}
> public void processWatermark(Watermark mark) throws Exception {
> if (timeServiceManager != null) {
> timeServiceManager.advanceWatermark(mark);
> }
> output.emitWatermark(mark);
> }
> {code}
> Some calculation results (with timestamp fields) triggered by these
> watermarks (e.g., join or aggregate results) may be regarded as delayed by
> the downstream operators since their timestamps must be less than or equal to
> the corresponding triggers.
> This issue aims to add another "working mode", which supports holding back
> watermarks, to current operators. These watermarks should be blocked and
> stored by the operators until all the corresponding new generated results are
> emitted.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)