Flink CDC Issue Import created FLINK-34784: ----------------------------------------------
Summary: [Bug] Datetime data inconsistency (off by 8 hours) in snapshot phase Key: FLINK-34784 URL: https://issues.apache.org/jira/browse/FLINK-34784 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: Flink CDC Issue Import ### Search before asking - [X] I searched in the [issues|https://github.com/ververica/flink-cdc-connectors/issues] and found nothing similar. ### Flink version 1.13.6/1.14 ### Flink CDC version 2.2/2.3 ### Database and its version MySQL 5.7+ & 8.0+ ### Minimal reproduce step ([#1739|https://github.com/apache/flink-cdc/issues/1739] | [FLINK-1739|https://issues.apache.org/jira/browse/FLINK-1739]) The issue still persists in snapshot phase and when MySQL is of CST timezone, and the reason is Debezium's Timestamp::toEpochMillis does not respect time zone: ![image|https://user-images.githubusercontent.com/20527912/230588397-e3d07c1b-1a5d-4659-b6b0-610ba36ac442.png) PRECONDITION:Server/local time zone are Asia/Shanghai (UTC+8),MySQL time zone is CST To reproduce the issue: 1. create a time with pk and datetime field and insert some random data: ```sql CREATE TABLE `batchdatetime` ( `id` bigint NOT NULL AUTO_INCREMENT, `dt` datetime DEFAULT NULL, PRIMARY KEY (`id`) ] # insert data here mysql> select * from batchdatetime; +----+---------------------+ | id | dt | +----+---------------------+ | 1 | 2016-10-15 02:15:15 | | 2 | 2016-10-15 02:15:15 | +----+---------------------+ ``` 2. create cdc job using stream api ```java public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); env.setParallelism(1); MySqlSource<String> mysqlSource = new MySqlSourceBuilder<String>() .databaseList("aps_test") .tableList("aps_test.batchdatetime") .hostname("localhost") .username("root") .password("") .serverTimeZone("Asia/Shanghai") .includeSchemaChanges(true) .deserializer(new JsonDebeziumDeserializationSchema()) .scanNewlyAddedTableEnabled(true) .build(); DataStreamSource<String> source = env.fromSource(mysqlSource, WatermarkStrategy.noWatermarks(), "mysql-source"); source.print(); env.execute(); } ``` 3. Start the job and witness how debezium brings inconsistency to datetime columns: ![image|https://user-images.githubusercontent.com/20527912/230589178-54adf54d-a797-45f3-b156-ffd95cc76907.png) dateTime.toInstant(ZoneOffset.UTC).toEpochMilli() = 1476497715000 BUT 2016-10-15T02:15:15 corresponds to 1476468915000 in milliseconds in BOTH server and local time zone: AND we can verify this using the following SQL: ```sql mysql> select UNIX_TIMESTAMP(dt) * 1000 as aha from batchdatetime; +---------------+ | aha | +---------------+ | 1476468915000 | | 1476468915000 | +---------------+ ``` 4. The correct way to convert datetime to timestamp is `dateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli(]` and the +8 should've come from user specific dbz properties OR a default time zone of the system. ### What did you expect to see? {"before":null,"after":{"id":1,"dt":1476468915000},"source":{"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"aps_test","sequence":null,"table":"batchdatetime","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1680861868595,"transaction":null} ### What did you see instead? {"before":null,"after":{"id":1,"dt":1476497715000},"source":{"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"aps_test","sequence":null,"table":"batchdatetime","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1680861868595,"transaction":null} ### Anything else? Critical data inconsistency issue. ### Are you willing to submit a PR? - [X] I'm willing to submit a PR! ---------------- Imported from GitHub ---------------- Url: https://github.com/apache/flink-cdc/issues/2074 Created by: [qidian99|https://github.com/qidian99] Labels: bug, Created at: Fri Apr 07 18:05:12 CST 2023 State: open -- This message was sent by Atlassian Jira (v8.20.10#820010)