[
https://issues.apache.org/jira/browse/FLINK-35173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Qingsheng Ren reassigned FLINK-35173:
-------------------------------------
Assignee: ZhengYu Chen
> Debezium for Mysql connector Custom Time Serializer
> ----------------------------------------------------
>
> Key: FLINK-35173
> URL: https://issues.apache.org/jira/browse/FLINK-35173
> Project: Flink
> Issue Type: Improvement
> Components: Flink CDC
> Affects Versions: 3.1.0
> Reporter: ZhengYu Chen
> Assignee: ZhengYu Chen
> Priority: Major
> Labels: CDC, pull-request-available
> Fix For: 3.1.0
>
>
> Currently, Flink CDC Time encounters time type errors (including DateTime,
> Time, Date, TimeStamp) when using MySQL Connector
> (JsonDebeziumDeserializationSchema) as deserialization, and the converted
> time is wrong. The essential reason is that the timestamp returned by the
> bottom layer of debezium is UTC (such as io.debezium.time.Timestamp). The
> community has already had some
> [PR|https://github.com/apache/flink-cdc/pull/1366/files#diff-e129e9fae3eea0bb32f0019debb4932413c91088d6dae656e2ecb63913badae4],
> but they are not work.
> Now a way is provided to provide a solution based on Debezium's custom
> Convert interface
> (https://debezium.io/documentation/reference/1.9/development/converters.html),
> Users can choose to convert the above four time types into STRING according
> to the specified time format to ensure that users can correctly convert JSON
> when using the Flink DataStream API.
> When the user enables this converter, we need to configure it according to
> the parameters, That's some datastream use case:
> {code:java}
> Properties debeziumProperties = new Properties();
> debeziumProperties.setProperty("converters", "datetime");
> debeziumProperties.setProperty("datetime.database.type",
> DataBaseType.MYSQL.getType());
> debeziumProperties.setProperty("datetime.type",
> "cn.xxx.sources.cdc.MysqlDebeziumConverter");
> debeziumProperties.setProperty("datetime.format.date", "yyyy-MM-dd");
> debeziumProperties.setProperty("datetime.format.time", "HH:mm:ss");
> debeziumProperties.setProperty("datetime.format.datetime", "yyyy-MM-dd
> HH:mm:ss");
> debeziumProperties.setProperty("datetime.format.timestamp", "yyyy-MM-dd
> HH:mm:ss");
> debeziumProperties.setProperty("datetime.format.timestamp.zone", "UTC+8");
> MySqlSourceBuilder<String> builder = MySqlSource.<String>builder()
> .hostname(url[0])
> .port(Integer.parseInt(url[1]))
> .databaseList(table.getDatabase())
> .tableList(getTablePattern(table))
> .username(table.getUserName())
> .password(table.getPassword())
> .debeziumProperties(debeziumProperties); {code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)