[
https://issues.apache.org/jira/browse/FLINK-15220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-15220:
-----------------------------------
Labels: pull-request-available (was: )
> Add startFromTimestamp in KafkaTableSource
> ------------------------------------------
>
> Key: FLINK-15220
> URL: https://issues.apache.org/jira/browse/FLINK-15220
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Kafka
> Affects Versions: 1.10.0
> Reporter: Paul Lin
> Assignee: Paul Lin
> Priority: Major
> Labels: pull-request-available
>
> KafkaTableSource supports all startup modes in DataStream API except
> `startFromTimestamp`, but `startFromTimestamp` is a common and valid use case
> in Table/SQL API as well.
>
> The proposed changes are as follow:
> h3. Table Descriptor
> A new method should be added to Kafka table descriptor:
> ```
> new Kafka().startFromTimestamp(long millisFromEpoch)
> ```
> And the parameter would be milliseconds from epoch to stay aligned with
> FlinkKafkaConsumerBase#setStartFromTimestamp(long startupOffsetsTimestamp).
> Since Kafka 0.8/0.9 that doesn’t support timestamp would likely be
> deprecated, we can assume users are using Kafka that supports timestamp by
> default, and throws exceptions if users try to use timestamp startup mode
> with deprecated Kafka versions during the property validation phase.
> h3. YAML & DDL
> YAML and DDL use string-based properties to describe tables, and the proposed
> keys are as follow:
> ```
> 'connector.startup-mode' = 'timestamp',
> 'connector.startup-timestamp-millis' = '1576145410000',
> 'connector.startup-timestamp' = '2019-12-12 10:11:23.123'
> ```
> The timestamp would need to be in form of milliseconds from epoch or
> "yyyy-MM-dd HH:mm:ss[.SSS]". If both are provided, a validation exception
> would be thrown.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)