[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171469#comment-16171469
 ] 

Fabian Hueske commented on FLINK-7548:
--------------------------------------

At the moment, we expect a {{TableSource}} to assign timestamps and watermarks 
itself.
The goal for this JIRA should be to move the timestamp and watermark generation 
into the Scan operator.
So the Scan operator would receive the {{DataStream}} from the {{TableSource}} 
that has no ({{StreamRecord}}) timestamps and watermarks and generate 
watermarks (and timestamps) based on that.

For this we need to provide information about the timestamp field and the 
generation of watermarks.
I collected a few requirements from discussions I had:

1. the timestamp field can be defined as one of the following:
  - An existing field of type {{LONG}}. The type of the existing field is 
converted to {{ROWTIME_INDICATOR}} (this conversion is only on the logical 
level, as timestamps are internally handled as LONG).
  - An existing field that must be converted using a scalar UDF with a single 
input parameter (of type of the existing field) that returns a {{LONG}}. The 
input field is replaced by the result of the UDF. The name of the input field 
is preserved and the logical type of the field becomes {{ROWTIME_INDICATOR}}.
  - A new field that is computed using a scalar UDF with one or more input 
parameters. The result of the UDF is of type {{LONG}} and appended as a new 
field to the schema of the row. The name of the new field must be provided and 
the logical type is {{ROWTIME_INDICATOR}}.

2. the watermarks are computed based on the (computed) timestamp field.
  - There are some common watermark strategies that should be supported:
     -- (periodic) ascending watermarks
     -- (periodic) bounded out-of-order watermarks
  - Custom watermark logic should be supported as well
     -- periodic watermarks
     -- punctuated watermarks
 
Additional thoughts / future requirements / open questions: 
- preserve type of time indicator fields. At the moment, a field of type 
{{ROWTIME_INDICATOR}} is exposed to the user as {{TIMESTAMP}}. However, a time 
field is initially be of type {{LONG}} such that the type exposed to the user 
is changed. We might want to add a time indicator that exposes itself as 
{{LONG}}.
- do we need support for time indicator fields in nested fields (with 
replacement).

What do you think of this [~wheat9], [~jark], [~xccui]? Would that meet your 
requirement?

Once we agree on the requirements (and scope) of this issue, we can continue to 
define the interface.

> Support watermark generation for TableSource
> --------------------------------------------
>
>                 Key: FLINK-7548
>                 URL: https://issues.apache.org/jira/browse/FLINK-7548
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>            Reporter: Jark Wu
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to