[
https://issues.apache.org/jira/browse/FLINK-15220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Paul Lin updated FLINK-15220:
-----------------------------
Description:
KafkaTableSource supports all startup modes in DataStream API except
`startFromTimestamp`, but `startFromTimestamp` is a common and valid use case
in Table/SQL API as well.
The proposed changes are as follow:
h3. Table Descriptor
A new method should be added to Kafka table descriptor:
```
new Kafka().startFromTimestamp(long millisFromEpoch)
```
And the parameter would be milliseconds from epoch to stay aligned with
FlinkKafkaConsumerBase#setStartFromTimestamp(long startupOffsetsTimestamp).
Since Kafka 0.8/0.9 that doesn’t support timestamp would likely be deprecated,
we can assume users are using Kafka that supports timestamp by default, and
throws exceptions if users try to use timestamp startup mode with deprecated
Kafka versions during the property validation phase.
h3. YAML & DDL
YAML and DDL use string-based properties to describe tables, and the proposed
keys are as follow:
```
'connector.startup-mode' = 'timestamp',
'connector.startup-timestamp-millis' = '1576145410000',
'connector.startup-timestamp' = '2019-12-12 10:11:23.123'
```
The timestamp would need to be in form of milliseconds from epoch or
"yyyy-MM-dd HH:mm:ss[.SSS]". If both are provided, milliseconds from epoch
would be used.
was:KafkaTableSource supports all startup modes in DataStream API except
`startFromTimestamp`, but `startFromTimestamp` is a common and valid use case
in Table/SQL API as well.
> Add startFromTimestamp in KafkaTableSource
> ------------------------------------------
>
> Key: FLINK-15220
> URL: https://issues.apache.org/jira/browse/FLINK-15220
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Kafka
> Affects Versions: 1.10.0
> Reporter: Paul Lin
> Assignee: Paul Lin
> Priority: Major
>
> KafkaTableSource supports all startup modes in DataStream API except
> `startFromTimestamp`, but `startFromTimestamp` is a common and valid use case
> in Table/SQL API as well.
>
> The proposed changes are as follow:
> h3. Table Descriptor
> A new method should be added to Kafka table descriptor:
> ```
> new Kafka().startFromTimestamp(long millisFromEpoch)
> ```
> And the parameter would be milliseconds from epoch to stay aligned with
> FlinkKafkaConsumerBase#setStartFromTimestamp(long startupOffsetsTimestamp).
> Since Kafka 0.8/0.9 that doesn’t support timestamp would likely be
> deprecated, we can assume users are using Kafka that supports timestamp by
> default, and throws exceptions if users try to use timestamp startup mode
> with deprecated Kafka versions during the property validation phase.
> h3. YAML & DDL
> YAML and DDL use string-based properties to describe tables, and the proposed
> keys are as follow:
> ```
> 'connector.startup-mode' = 'timestamp',
> 'connector.startup-timestamp-millis' = '1576145410000',
> 'connector.startup-timestamp' = '2019-12-12 10:11:23.123'
> ```
> The timestamp would need to be in form of milliseconds from epoch or
> "yyyy-MM-dd HH:mm:ss[.SSS]". If both are provided, milliseconds from epoch
> would be used.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)