Flink CDC Issue Import created FLINK-34784:
----------------------------------------------

             Summary: [Bug] Datetime data inconsistency (off by 8 hours) in 
snapshot phase
                 Key: FLINK-34784
                 URL: https://issues.apache.org/jira/browse/FLINK-34784
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
            Reporter: Flink CDC Issue Import


### Search before asking

- [X] I searched in the 
[issues|https://github.com/ververica/flink-cdc-connectors/issues] and found 
nothing similar.


### Flink version

1.13.6/1.14

### Flink CDC version

2.2/2.3

### Database and its version

MySQL 5.7+ & 8.0+

### Minimal reproduce step

([#1739|https://github.com/apache/flink-cdc/issues/1739] | 
[FLINK-1739|https://issues.apache.org/jira/browse/FLINK-1739]) 
The issue still persists in snapshot phase and when MySQL is of CST timezone, 
and the reason is Debezium's Timestamp::toEpochMillis does not respect time 
zone:
![image|https://user-images.githubusercontent.com/20527912/230588397-e3d07c1b-1a5d-4659-b6b0-610ba36ac442.png)

PRECONDITION:Server/local time zone are Asia/Shanghai (UTC+8),MySQL time zone 
is CST
To reproduce the issue:
1. create a time with pk and datetime field and insert some random data:

```sql
CREATE TABLE `batchdatetime` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `dt` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
]

# insert data here

mysql> select * from batchdatetime;
+----+---------------------+
| id | dt                  |
+----+---------------------+
|  1 | 2016-10-15 02:15:15 |
|  2 | 2016-10-15 02:15:15 |
+----+---------------------+

```

2. create cdc job using stream api
```java
  public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
    env.setParallelism(1);
    MySqlSource<String> mysqlSource = new MySqlSourceBuilder<String>()
        .databaseList("aps_test")
        .tableList("aps_test.batchdatetime")
        .hostname("localhost")
        .username("root")
        .password("")
        .serverTimeZone("Asia/Shanghai")
        .includeSchemaChanges(true)
        .deserializer(new JsonDebeziumDeserializationSchema())
        .scanNewlyAddedTableEnabled(true)
        .build();

    DataStreamSource<String> source = env.fromSource(mysqlSource, 
WatermarkStrategy.noWatermarks(), "mysql-source");
    source.print();
    env.execute();
  }
```

3. Start the job and witness how debezium brings inconsistency to datetime 
columns:
![image|https://user-images.githubusercontent.com/20527912/230589178-54adf54d-a797-45f3-b156-ffd95cc76907.png)

 dateTime.toInstant(ZoneOffset.UTC).toEpochMilli() = 1476497715000

BUT 2016-10-15T02:15:15 corresponds to 1476468915000 in milliseconds in BOTH 
server and local time zone:
AND we can verify this using the following SQL:

```sql
mysql> select UNIX_TIMESTAMP(dt) * 1000 as aha from batchdatetime;
+---------------+
| aha           |
+---------------+
| 1476468915000 |
| 1476468915000 |
+---------------+
```

4. The correct way to convert datetime to timestamp is 
`dateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli(]`

and the +8 should've come from user specific dbz properties OR a default time 
zone of the system.


### What did you expect to see?

{"before":null,"after":{"id":1,"dt":1476468915000},"source":{"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"aps_test","sequence":null,"table":"batchdatetime","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1680861868595,"transaction":null}


### What did you see instead?

{"before":null,"after":{"id":1,"dt":1476497715000},"source":{"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"aps_test","sequence":null,"table":"batchdatetime","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1680861868595,"transaction":null}


### Anything else?

Critical data inconsistency issue.

### Are you willing to submit a PR?

- [X] I'm willing to submit a PR!

---------------- Imported from GitHub ----------------
Url: https://github.com/apache/flink-cdc/issues/2074
Created by: [qidian99|https://github.com/qidian99]
Labels: bug, 
Created at: Fri Apr 07 18:05:12 CST 2023
State: open




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to