[
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171469#comment-16171469
]
Fabian Hueske commented on FLINK-7548:
--------------------------------------
At the moment, we expect a {{TableSource}} to assign timestamps and watermarks
itself.
The goal for this JIRA should be to move the timestamp and watermark generation
into the Scan operator.
So the Scan operator would receive the {{DataStream}} from the {{TableSource}}
that has no ({{StreamRecord}}) timestamps and watermarks and generate
watermarks (and timestamps) based on that.
For this we need to provide information about the timestamp field and the
generation of watermarks.
I collected a few requirements from discussions I had:
1. the timestamp field can be defined as one of the following:
- An existing field of type {{LONG}}. The type of the existing field is
converted to {{ROWTIME_INDICATOR}} (this conversion is only on the logical
level, as timestamps are internally handled as LONG).
- An existing field that must be converted using a scalar UDF with a single
input parameter (of type of the existing field) that returns a {{LONG}}. The
input field is replaced by the result of the UDF. The name of the input field
is preserved and the logical type of the field becomes {{ROWTIME_INDICATOR}}.
- A new field that is computed using a scalar UDF with one or more input
parameters. The result of the UDF is of type {{LONG}} and appended as a new
field to the schema of the row. The name of the new field must be provided and
the logical type is {{ROWTIME_INDICATOR}}.
2. the watermarks are computed based on the (computed) timestamp field.
- There are some common watermark strategies that should be supported:
-- (periodic) ascending watermarks
-- (periodic) bounded out-of-order watermarks
- Custom watermark logic should be supported as well
-- periodic watermarks
-- punctuated watermarks
Additional thoughts / future requirements / open questions:
- preserve type of time indicator fields. At the moment, a field of type
{{ROWTIME_INDICATOR}} is exposed to the user as {{TIMESTAMP}}. However, a time
field is initially be of type {{LONG}} such that the type exposed to the user
is changed. We might want to add a time indicator that exposes itself as
{{LONG}}.
- do we need support for time indicator fields in nested fields (with
replacement).
What do you think of this [~wheat9], [~jark], [~xccui]? Would that meet your
requirement?
Once we agree on the requirements (and scope) of this issue, we can continue to
define the interface.
> Support watermark generation for TableSource
> --------------------------------------------
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
> Issue Type: Bug
> Components: Table API & SQL
> Reporter: Jark Wu
>
> As discussed in FLINK-7446, currently the TableSource only support to define
> rowtime field, but not support to extract watermarks from the rowtime field.
> We can provide a new interface called {{DefinedWatermark}}, which has two
> methods {{getRowtimeAttribute}} (can only be an existing field) and
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in
> strategies needs further discussion.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)