This is an automated email from the ASF dual-hosted git repository.

ruanhang1993 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 4baee8c63 [FLINK-37000][pipeline-connector][mysql] Fix MySQL CDC could 
not handle datetime prior to 1970 properly (#3834)
4baee8c63 is described below

commit 4baee8c6369d3cfb58ff06ae3eb2042a61e8809b
Author: yuxiqian <34335406+yuxiq...@users.noreply.github.com>
AuthorDate: Tue Apr 8 16:19:58 2025 +0800

    [FLINK-37000][pipeline-connector][mysql] Fix MySQL CDC could not handle 
datetime prior to 1970 properly (#3834)
---
 .../source/MySqlAncientDateAndTimeITCase.java      | 412 ++++++++++++++++
 .../test/resources/ddl/ancient_date_and_time.sql   | 124 +++++
 .../docker/server-allow-ancient-date-time/my.cnf   |  58 +++
 .../event/DebeziumEventDeserializationSchema.java  |   6 +-
 .../table/RowDataDebeziumDeserializeSchema.java    |   9 +-
 .../source/MySqlAncientDateAndTimeITCase.java      | 521 +++++++++++++++++++++
 .../mysql/table/MySqlAncientDateAndTimeITCase.java | 363 ++++++++++++++
 .../test/resources/ddl/ancient_date_and_time.sql   | 124 +++++
 .../docker/server-allow-ancient-date-time/my.cnf   |  58 +++
 9 files changed, 1671 insertions(+), 4 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlAncientDateAndTimeITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlAncientDateAndTimeITCase.java
new file mode 100644
index 000000000..b630b5c8b
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlAncientDateAndTimeITCase.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.source;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.source.FlinkSourceProvider;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.utils.SchemaUtils;
+import 
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
+import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.CloseableIterator;
+
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.lifecycle.Startables;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Stream;
+
+/** Integration tests for MySQL Table source to handle ancient date and time 
records. */
+public class MySqlAncientDateAndTimeITCase extends MySqlSourceTestBase {
+    private static final Logger LOG = 
LoggerFactory.getLogger(MySqlAncientDateAndTimeITCase.class);
+
+    private static final String TEST_USER = "mysqluser";
+    private static final String TEST_PASSWORD = "mysqlpw";
+
+    // We need an extra "no_zero_in_date = false" config to insert malformed 
date and time records.
+    private static final MySqlContainer MYSQL_CONTAINER =
+            createMySqlContainer(MySqlVersion.V8_0, 
"docker/server-allow-ancient-date-time/my.cnf");
+
+    private final UniqueDatabase ancientDatabase =
+            new UniqueDatabase(MYSQL_CONTAINER, "ancient_date_and_time", 
TEST_USER, TEST_PASSWORD);
+
+    private final StreamExecutionEnvironment env =
+            StreamExecutionEnvironment.getExecutionEnvironment();
+
+    @BeforeClass
+    public static void beforeClass() {
+        LOG.info("Starting MySql container...");
+        Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+        LOG.info("Container MySql is started.");
+    }
+
+    @AfterClass
+    public static void afterClass() {
+        LOG.info("Stopping MySql containers...");
+        MYSQL_CONTAINER.stop();
+        LOG.info("Container MySql is stopped.");
+    }
+
+    @Before
+    public void before() {
+        env.setParallelism(DEFAULT_PARALLELISM);
+        env.enableCheckpointing(200);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        ancientDatabase.createAndInitialize();
+    }
+
+    @After
+    public void after() {
+        ancientDatabase.dropDatabase();
+    }
+
+    /**
+     * With the TimeAdjuster in Debezium, all date / time records between year 
0001 and 0099 will be
+     * shifted to 1971 ~ 2069.
+     */
+    @Test
+    public void testAncientDateAndTimeWithTimeAdjuster() throws Exception {
+        // LocalDate.ofEpochDay reference:
+        // 
+---------------------------------------------------------------------------------+
+        // | 17390     | 11323    | 11720    | 23072    | -557266  | -1        
 | 18261      |
+        // | 2017/8/12 | 2001/1/1 | 2002/2/2 | 2033/3/3 | 0444/4/4 | 
1969/12/31 | 2019/12/31 |
+        // 
+---------------------------------------------------------------------------------+
+        runGenericAncientDateAndTimeTest(
+                ancientDatabase,
+                true,
+                Arrays.asList(
+                        "[1, 17390, 2016-07-13T17:17:17, 
2015-06-14T17:17:17.100, 2014-05-15T17:17:17.120, 2013-04-16T17:17:17.123, 
2012-03-17T17:17:17.123400, 2011-02-18T17:17:17.123450, 
2010-01-19T17:17:17.123456]",
+                        "[2, null, null, null, null, null, null, null, null]",
+                        "[3, 11323, 2001-01-01T16:16:16, 
2001-01-01T16:16:16.100, 2001-01-01T16:16:16.120, 2001-01-01T16:16:16.123, 
2001-01-01T16:16:16.123400, 2001-01-01T16:16:16.123450, 
2001-01-01T16:16:16.123456]",
+                        "[4, 11720, 2002-02-02T15:15:15, 
2002-02-02T15:15:15.100, 2002-02-02T15:15:15.120, 2002-02-02T15:15:15.123, 
2002-02-02T15:15:15.123400, 2002-02-02T15:15:15.123450, 
2002-02-02T15:15:15.123456]",
+                        "[5, 23072, 2033-03-03T14:14:14, 
2033-03-03T14:14:14.100, 2033-03-03T14:14:14.120, 2033-03-03T14:14:14.123, 
2033-03-03T14:14:14.123400, 2033-03-03T14:14:14.123450, 
2033-03-03T14:14:14.123456]",
+                        "[6, -557266, 0444-04-04T13:13:13, 
0444-04-04T13:13:13.100, 0444-04-04T13:13:13.120, 0444-04-04T13:13:13.123, 
0444-04-04T13:13:13.123400, 0444-04-04T13:13:13.123450, 
0444-04-04T13:13:13.123456]",
+                        "[7, -1, 1969-12-31T12:12:12, 1969-12-31T12:12:12.100, 
1969-12-31T12:12:12.120, 1969-12-31T12:12:12.123, 1969-12-31T12:12:12.123400, 
1969-12-31T12:12:12.123450, 1969-12-31T12:12:12.123456]",
+                        "[8, 18261, 2019-12-31T23:11:11, 
2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123, 
2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450, 
2019-12-31T23:11:11.123456]"),
+                Arrays.asList(
+                        "[9, 17390, 2016-07-13T17:17:17, 
2015-06-14T17:17:17.100, 2014-05-15T17:17:17.120, 2013-04-16T17:17:17.123, 
2012-03-17T17:17:17.123400, 2011-02-18T17:17:17.123450, 
2010-01-19T17:17:17.123456]",
+                        "[10, null, null, null, null, null, null, null, null]",
+                        "[11, 11323, 2001-01-01T16:16:16, 
2001-01-01T16:16:16.100, 2001-01-01T16:16:16.120, 2001-01-01T16:16:16.123, 
2001-01-01T16:16:16.123400, 2001-01-01T16:16:16.123450, 
2001-01-01T16:16:16.123456]",
+                        "[12, 11720, 2002-02-02T15:15:15, 
2002-02-02T15:15:15.100, 2002-02-02T15:15:15.120, 2002-02-02T15:15:15.123, 
2002-02-02T15:15:15.123400, 2002-02-02T15:15:15.123450, 
2002-02-02T15:15:15.123456]",
+                        "[13, 23072, 2033-03-03T14:14:14, 
2033-03-03T14:14:14.100, 2033-03-03T14:14:14.120, 2033-03-03T14:14:14.123, 
2033-03-03T14:14:14.123400, 2033-03-03T14:14:14.123450, 
2033-03-03T14:14:14.123456]",
+                        "[14, -557266, 0444-04-04T13:13:13, 
0444-04-04T13:13:13.100, 0444-04-04T13:13:13.120, 0444-04-04T13:13:13.123, 
0444-04-04T13:13:13.123400, 0444-04-04T13:13:13.123450, 
0444-04-04T13:13:13.123456]",
+                        "[15, -1, 1969-12-31T12:12:12, 
1969-12-31T12:12:12.100, 1969-12-31T12:12:12.120, 1969-12-31T12:12:12.123, 
1969-12-31T12:12:12.123400, 1969-12-31T12:12:12.123450, 
1969-12-31T12:12:12.123456]",
+                        "[16, 18261, 2019-12-31T23:11:11, 
2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123, 
2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450, 
2019-12-31T23:11:11.123456]"));
+    }
+
+    @Test
+    public void testAncientDateAndTimeWithoutTimeAdjuster() throws Exception {
+        // LocalDate.ofEpochDay reference:
+        // 
+---------------------------------------------------------------------------------+
+        // | -713095   | -719162  | -718765  | -707413  | -557266  | -1        
 | 18261      |
+        // | 0017/8/12 | 0001/1/1 | 0002/2/2 | 0033/3/3 | 0444/4/4 | 
1969/12/31 | 2019/12/31 |
+        // 
+---------------------------------------------------------------------------------+
+        runGenericAncientDateAndTimeTest(
+                ancientDatabase,
+                false,
+                Arrays.asList(
+                        "[1, -713095, 0016-07-13T17:17:17, 
0015-06-14T17:17:17.100, 0014-05-15T17:17:17.120, 0013-04-16T17:17:17.123, 
0012-03-17T17:17:17.123400, 0011-02-18T17:17:17.123450, 
0010-01-19T17:17:17.123456]",
+                        "[2, null, null, null, null, null, null, null, null]",
+                        "[3, -719162, 0001-01-01T16:16:16, 
0001-01-01T16:16:16.100, 0001-01-01T16:16:16.120, 0001-01-01T16:16:16.123, 
0001-01-01T16:16:16.123400, 0001-01-01T16:16:16.123450, 
0001-01-01T16:16:16.123456]",
+                        "[4, -718765, 0002-02-02T15:15:15, 
0002-02-02T15:15:15.100, 0002-02-02T15:15:15.120, 0002-02-02T15:15:15.123, 
0002-02-02T15:15:15.123400, 0002-02-02T15:15:15.123450, 
0002-02-02T15:15:15.123456]",
+                        "[5, -707413, 0033-03-03T14:14:14, 
0033-03-03T14:14:14.100, 0033-03-03T14:14:14.120, 0033-03-03T14:14:14.123, 
0033-03-03T14:14:14.123400, 0033-03-03T14:14:14.123450, 
0033-03-03T14:14:14.123456]",
+                        "[6, -557266, 0444-04-04T13:13:13, 
0444-04-04T13:13:13.100, 0444-04-04T13:13:13.120, 0444-04-04T13:13:13.123, 
0444-04-04T13:13:13.123400, 0444-04-04T13:13:13.123450, 
0444-04-04T13:13:13.123456]",
+                        "[7, -1, 1969-12-31T12:12:12, 1969-12-31T12:12:12.100, 
1969-12-31T12:12:12.120, 1969-12-31T12:12:12.123, 1969-12-31T12:12:12.123400, 
1969-12-31T12:12:12.123450, 1969-12-31T12:12:12.123456]",
+                        "[8, 18261, 2019-12-31T23:11:11, 
2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123, 
2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450, 
2019-12-31T23:11:11.123456]"),
+                Arrays.asList(
+                        "[9, -713095, 0016-07-13T17:17:17, 
0015-06-14T17:17:17.100, 0014-05-15T17:17:17.120, 0013-04-16T17:17:17.123, 
0012-03-17T17:17:17.123400, 0011-02-18T17:17:17.123450, 
0010-01-19T17:17:17.123456]",
+                        "[10, null, null, null, null, null, null, null, null]",
+                        "[11, -719162, 0001-01-01T16:16:16, 
0001-01-01T16:16:16.100, 0001-01-01T16:16:16.120, 0001-01-01T16:16:16.123, 
0001-01-01T16:16:16.123400, 0001-01-01T16:16:16.123450, 
0001-01-01T16:16:16.123456]",
+                        "[12, -718765, 0002-02-02T15:15:15, 
0002-02-02T15:15:15.100, 0002-02-02T15:15:15.120, 0002-02-02T15:15:15.123, 
0002-02-02T15:15:15.123400, 0002-02-02T15:15:15.123450, 
0002-02-02T15:15:15.123456]",
+                        "[13, -707413, 0033-03-03T14:14:14, 
0033-03-03T14:14:14.100, 0033-03-03T14:14:14.120, 0033-03-03T14:14:14.123, 
0033-03-03T14:14:14.123400, 0033-03-03T14:14:14.123450, 
0033-03-03T14:14:14.123456]",
+                        "[14, -557266, 0444-04-04T13:13:13, 
0444-04-04T13:13:13.100, 0444-04-04T13:13:13.120, 0444-04-04T13:13:13.123, 
0444-04-04T13:13:13.123400, 0444-04-04T13:13:13.123450, 
0444-04-04T13:13:13.123456]",
+                        "[15, -1, 1969-12-31T12:12:12, 
1969-12-31T12:12:12.100, 1969-12-31T12:12:12.120, 1969-12-31T12:12:12.123, 
1969-12-31T12:12:12.123400, 1969-12-31T12:12:12.123450, 
1969-12-31T12:12:12.123456]",
+                        "[16, 18261, 2019-12-31T23:11:11, 
2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123, 
2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450, 
2019-12-31T23:11:11.123456]"));
+    }
+
+    private void runGenericAncientDateAndTimeTest(
+            UniqueDatabase database,
+            boolean enableTimeAdjuster,
+            List<String> expectedSnapshotResults,
+            List<String> expectedStreamingResults)
+            throws Exception {
+        Schema ancientSchema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT().notNull())
+                        .physicalColumn("date_col", DataTypes.DATE(), null, 
"0017-08-12")
+                        .physicalColumn(
+                                "datetime_0_col",
+                                DataTypes.TIMESTAMP(0),
+                                null,
+                                "0016-07-13 17:17:17")
+                        .physicalColumn(
+                                "datetime_1_col",
+                                DataTypes.TIMESTAMP(1),
+                                null,
+                                "0015-06-14 17:17:17.1")
+                        .physicalColumn(
+                                "datetime_2_col",
+                                DataTypes.TIMESTAMP(2),
+                                null,
+                                "0014-05-15 17:17:17.12")
+                        .physicalColumn(
+                                "datetime_3_col",
+                                DataTypes.TIMESTAMP(3),
+                                null,
+                                "0013-04-16 17:17:17.123")
+                        .physicalColumn(
+                                "datetime_4_col",
+                                DataTypes.TIMESTAMP(4),
+                                null,
+                                "0012-03-17 17:17:17.1234")
+                        .physicalColumn(
+                                "datetime_5_col",
+                                DataTypes.TIMESTAMP(5),
+                                null,
+                                "0011-02-18 17:17:17.12345")
+                        .physicalColumn(
+                                "datetime_6_col",
+                                DataTypes.TIMESTAMP(6),
+                                null,
+                                "0010-01-19 17:17:17.123456")
+                        .primaryKey("id")
+                        .build();
+        List<RecordData.FieldGetter> ancientSchemaFieldGetters =
+                SchemaUtils.createFieldGetters(ancientSchema);
+        CreateTableEvent ancientCreateTableEvent =
+                new CreateTableEvent(
+                        TableId.tableId(ancientDatabase.getDatabaseName(), 
"ancient_times"),
+                        ancientSchema);
+        try (CloseableIterator<Event> iterator =
+                env.fromSource(
+                                getFlinkSourceProvider(
+                                                new String[] {"ancient_times"},
+                                                database,
+                                                enableTimeAdjuster)
+                                        .getSource(),
+                                WatermarkStrategy.noWatermarks(),
+                                "Event-Source")
+                        .executeAndCollect()) {
+
+            {
+                Tuple2<List<Event>, List<CreateTableEvent>> snapshotResults =
+                        fetchResultsAndCreateTableEvent(iterator, 
expectedSnapshotResults.size());
+                
Assertions.assertThat(snapshotResults.f1).isSubsetOf(ancientCreateTableEvent);
+                Assertions.assertThat(snapshotResults.f0)
+                        .map(evt -> (DataChangeEvent) evt)
+                        .map(
+                                evt ->
+                                        SchemaUtils.restoreOriginalData(
+                                                        evt.after(), 
ancientSchemaFieldGetters)
+                                                .toString())
+                        
.containsExactlyInAnyOrderElementsOf(expectedSnapshotResults);
+            }
+
+            createBinlogEvents(ancientDatabase);
+
+            {
+                Tuple2<List<Event>, List<CreateTableEvent>> streamingResults =
+                        fetchResultsAndCreateTableEvent(iterator, 
expectedSnapshotResults.size());
+                
Assertions.assertThat(streamingResults.f1).isSubsetOf(ancientCreateTableEvent);
+                Assertions.assertThat(streamingResults.f0)
+                        .map(evt -> (DataChangeEvent) evt)
+                        .map(
+                                evt ->
+                                        SchemaUtils.restoreOriginalData(
+                                                        evt.after(), 
ancientSchemaFieldGetters)
+                                                .toString())
+                        
.containsExactlyInAnyOrderElementsOf(expectedStreamingResults);
+            }
+        }
+    }
+
+    private FlinkSourceProvider getFlinkSourceProvider(
+            String[] captureTables, UniqueDatabase database, boolean 
enableTimeAdjuster) {
+        String[] captureTableIds =
+                Arrays.stream(captureTables)
+                        .map(tableName -> database.getDatabaseName() + "." + 
tableName)
+                        .toArray(String[]::new);
+
+        Properties dbzProperties = new Properties();
+        dbzProperties.put("enable.time.adjuster", 
String.valueOf(enableTimeAdjuster));
+
+        MySqlSourceConfigFactory configFactory =
+                new MySqlSourceConfigFactory()
+                        .startupOptions(StartupOptions.initial())
+                        .databaseList(database.getDatabaseName())
+                        .tableList(captureTableIds)
+                        .includeSchemaChanges(false)
+                        .hostname(database.getHost())
+                        .port(database.getDatabasePort())
+                        .splitSize(10)
+                        .fetchSize(2)
+                        .username(database.getUsername())
+                        .password(database.getPassword())
+                        .serverTimeZone(ZoneId.of("UTC").toString())
+                        
.serverId(MySqSourceTestUtils.getServerId(env.getParallelism()))
+                        .debeziumProperties(dbzProperties);
+        return (FlinkSourceProvider) new 
MySqlDataSource(configFactory).getEventSourceProvider();
+    }
+
+    private static void createBinlogEvents(UniqueDatabase database) throws 
SQLException {
+        // Test reading identical data in binlog stage again
+        try (Connection connection = database.getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+            statement.execute(
+                    "INSERT INTO ancient_times VALUES (\n"
+                            + "    DEFAULT,\n"
+                            + "    DEFAULT,\n"
+                            + "    DEFAULT,\n"
+                            + "    DEFAULT,\n"
+                            + "    DEFAULT,\n"
+                            + "    DEFAULT,\n"
+                            + "    DEFAULT,\n"
+                            + "    DEFAULT,\n"
+                            + "    DEFAULT\n"
+                            + ");");
+            statement.execute(
+                    "INSERT INTO ancient_times VALUES (\n"
+                            + "    DEFAULT,\n"
+                            + "    '0000-00-00',\n"
+                            + "    '0000-00-00 00:00:00',\n"
+                            + "    '0000-00-00 00:00:00.0',\n"
+                            + "    '0000-00-00 00:00:00.00',\n"
+                            + "    '0000-00-00 00:00:00.000',\n"
+                            + "    '0000-00-00 00:00:00.0000',\n"
+                            + "    '0000-00-00 00:00:00.00000',\n"
+                            + "    '0000-00-00 00:00:00.000000'\n"
+                            + ");");
+            statement.execute(
+                    "INSERT INTO ancient_times VALUES (\n"
+                            + "    DEFAULT,\n"
+                            + "    '0001-01-01',\n"
+                            + "    '0001-01-01 16:16:16',\n"
+                            + "    '0001-01-01 16:16:16.1',\n"
+                            + "    '0001-01-01 16:16:16.12',\n"
+                            + "    '0001-01-01 16:16:16.123',\n"
+                            + "    '0001-01-01 16:16:16.1234',\n"
+                            + "    '0001-01-01 16:16:16.12345',\n"
+                            + "    '0001-01-01 16:16:16.123456'\n"
+                            + ");");
+            statement.execute(
+                    "INSERT INTO ancient_times VALUES (\n"
+                            + "    DEFAULT,\n"
+                            + "    '0002-02-02',\n"
+                            + "    '0002-02-02 15:15:15',\n"
+                            + "    '0002-02-02 15:15:15.1',\n"
+                            + "    '0002-02-02 15:15:15.12',\n"
+                            + "    '0002-02-02 15:15:15.123',\n"
+                            + "    '0002-02-02 15:15:15.1234',\n"
+                            + "    '0002-02-02 15:15:15.12345',\n"
+                            + "    '0002-02-02 15:15:15.123456'\n"
+                            + ");");
+            statement.execute(
+                    "INSERT INTO ancient_times VALUES (\n"
+                            + "    DEFAULT,\n"
+                            + "    '0033-03-03',\n"
+                            + "    '0033-03-03 14:14:14',\n"
+                            + "    '0033-03-03 14:14:14.1',\n"
+                            + "    '0033-03-03 14:14:14.12',\n"
+                            + "    '0033-03-03 14:14:14.123',\n"
+                            + "    '0033-03-03 14:14:14.1234',\n"
+                            + "    '0033-03-03 14:14:14.12345',\n"
+                            + "    '0033-03-03 14:14:14.123456'\n"
+                            + ");");
+            statement.execute(
+                    "INSERT INTO ancient_times VALUES (\n"
+                            + "    DEFAULT,\n"
+                            + "    '0444-04-04',\n"
+                            + "    '0444-04-04 13:13:13',\n"
+                            + "    '0444-04-04 13:13:13.1',\n"
+                            + "    '0444-04-04 13:13:13.12',\n"
+                            + "    '0444-04-04 13:13:13.123',\n"
+                            + "    '0444-04-04 13:13:13.1234',\n"
+                            + "    '0444-04-04 13:13:13.12345',\n"
+                            + "    '0444-04-04 13:13:13.123456'\n"
+                            + ");");
+            statement.execute(
+                    "INSERT INTO ancient_times VALUES (\n"
+                            + "    DEFAULT,\n"
+                            + "    '1969-12-31',\n"
+                            + "    '1969-12-31 12:12:12',\n"
+                            + "    '1969-12-31 12:12:12.1',\n"
+                            + "    '1969-12-31 12:12:12.12',\n"
+                            + "    '1969-12-31 12:12:12.123',\n"
+                            + "    '1969-12-31 12:12:12.1234',\n"
+                            + "    '1969-12-31 12:12:12.12345',\n"
+                            + "    '1969-12-31 12:12:12.123456'\n"
+                            + ");");
+            statement.execute(
+                    "INSERT INTO ancient_times VALUES (\n"
+                            + "    DEFAULT,\n"
+                            + "    '2019-12-31',\n"
+                            + "    '2019-12-31 23:11:11',\n"
+                            + "    '2019-12-31 23:11:11.1',\n"
+                            + "    '2019-12-31 23:11:11.12',\n"
+                            + "    '2019-12-31 23:11:11.123',\n"
+                            + "    '2019-12-31 23:11:11.1234',\n"
+                            + "    '2019-12-31 23:11:11.12345',\n"
+                            + "    '2019-12-31 23:11:11.123456'\n"
+                            + ");");
+        }
+    }
+
+    public static <T> Tuple2<List<T>, List<CreateTableEvent>> 
fetchResultsAndCreateTableEvent(
+            Iterator<T> iter, int size) {
+        List<T> result = new ArrayList<>(size);
+        List<CreateTableEvent> createTableEvents = new ArrayList<>();
+        while (size > 0 && iter.hasNext()) {
+            T event = iter.next();
+            if (event instanceof CreateTableEvent) {
+                createTableEvents.add((CreateTableEvent) event);
+            } else {
+                result.add(event);
+                size--;
+            }
+        }
+        return Tuple2.of(result, createTableEvents);
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/ancient_date_and_time.sql
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/ancient_date_and_time.sql
new file mode 100644
index 000000000..93832b9b7
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/ancient_date_and_time.sql
@@ -0,0 +1,124 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--      http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+CREATE TABLE ancient_times
+(
+    id             INT NOT NULL AUTO_INCREMENT,
+    date_col       DATE         DEFAULT '0017-08-12',
+    datetime_0_col DATETIME(0)  DEFAULT '0016-07-13 17:17:17',
+    datetime_1_col DATETIME(1)  DEFAULT '0015-06-14 17:17:17.1',
+    datetime_2_col DATETIME(2)  DEFAULT '0014-05-15 17:17:17.12',
+    datetime_3_col DATETIME(3)  DEFAULT '0013-04-16 17:17:17.123',
+    datetime_4_col DATETIME(4)  DEFAULT '0012-03-17 17:17:17.1234',
+    datetime_5_col DATETIME(5)  DEFAULT '0011-02-18 17:17:17.12345',
+    datetime_6_col DATETIME(6)  DEFAULT '0010-01-19 17:17:17.123456',
+    PRIMARY KEY (id)
+);
+
+INSERT INTO ancient_times VALUES (
+    DEFAULT,
+    DEFAULT,
+    DEFAULT,
+    DEFAULT,
+    DEFAULT,
+    DEFAULT,
+    DEFAULT,
+    DEFAULT,
+    DEFAULT
+);
+
+INSERT INTO ancient_times VALUES (
+    DEFAULT,
+    '0000-00-00',
+    '0000-00-00 00:00:00',
+    '0000-00-00 00:00:00.0',
+    '0000-00-00 00:00:00.00',
+    '0000-00-00 00:00:00.000',
+    '0000-00-00 00:00:00.0000',
+    '0000-00-00 00:00:00.00000',
+    '0000-00-00 00:00:00.000000'
+);
+
+INSERT INTO ancient_times VALUES (
+    DEFAULT,
+    '0001-01-01',
+    '0001-01-01 16:16:16',
+    '0001-01-01 16:16:16.1',
+    '0001-01-01 16:16:16.12',
+    '0001-01-01 16:16:16.123',
+    '0001-01-01 16:16:16.1234',
+    '0001-01-01 16:16:16.12345',
+    '0001-01-01 16:16:16.123456'
+);
+
+INSERT INTO ancient_times VALUES (
+    DEFAULT,
+    '0002-02-02',
+    '0002-02-02 15:15:15',
+    '0002-02-02 15:15:15.1',
+    '0002-02-02 15:15:15.12',
+    '0002-02-02 15:15:15.123',
+    '0002-02-02 15:15:15.1234',
+    '0002-02-02 15:15:15.12345',
+    '0002-02-02 15:15:15.123456'
+);
+
+INSERT INTO ancient_times VALUES (
+    DEFAULT,
+    '0033-03-03',
+    '0033-03-03 14:14:14',
+    '0033-03-03 14:14:14.1',
+    '0033-03-03 14:14:14.12',
+    '0033-03-03 14:14:14.123',
+    '0033-03-03 14:14:14.1234',
+    '0033-03-03 14:14:14.12345',
+    '0033-03-03 14:14:14.123456'
+);
+
+INSERT INTO ancient_times VALUES (
+    DEFAULT,
+    '0444-04-04',
+    '0444-04-04 13:13:13',
+    '0444-04-04 13:13:13.1',
+    '0444-04-04 13:13:13.12',
+    '0444-04-04 13:13:13.123',
+    '0444-04-04 13:13:13.1234',
+    '0444-04-04 13:13:13.12345',
+    '0444-04-04 13:13:13.123456'
+);
+
+INSERT INTO ancient_times VALUES (
+    DEFAULT,
+    '1969-12-31',
+    '1969-12-31 12:12:12',
+    '1969-12-31 12:12:12.1',
+    '1969-12-31 12:12:12.12',
+    '1969-12-31 12:12:12.123',
+    '1969-12-31 12:12:12.1234',
+    '1969-12-31 12:12:12.12345',
+    '1969-12-31 12:12:12.123456'
+);
+
+INSERT INTO ancient_times VALUES (
+    DEFAULT,
+    '2019-12-31',
+    '2019-12-31 23:11:11',
+    '2019-12-31 23:11:11.1',
+    '2019-12-31 23:11:11.12',
+    '2019-12-31 23:11:11.123',
+    '2019-12-31 23:11:11.1234',
+    '2019-12-31 23:11:11.12345',
+    '2019-12-31 23:11:11.123456'
+);
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/docker/server-allow-ancient-date-time/my.cnf
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/docker/server-allow-ancient-date-time/my.cnf
new file mode 100644
index 000000000..ca0483780
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/docker/server-allow-ancient-date-time/my.cnf
@@ -0,0 +1,58 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#   http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# For advice on how to change settings please see
+# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
+
+[mysqld]
+#
+# Remove leading # and set to the amount of RAM for the most important data
+# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
+# innodb_buffer_pool_size = 128M
+#
+# Remove leading # to turn on a very important data integrity option: logging
+# changes to the binary log between backups.
+# log_bin
+#
+# Remove leading # to set options mainly useful for reporting servers.
+# The server defaults are faster for transactions and fast SELECTs.
+# Adjust sizes as needed, experiment to find the optimal values.
+# join_buffer_size = 128M
+# sort_buffer_size = 2M
+# read_rnd_buffer_size = 2M
+skip-host-cache
+skip-name-resolve
+#datadir=/var/lib/mysql
+#socket=/var/lib/mysql/mysql.sock
+secure-file-priv=/var/lib/mysql
+user=mysql
+
+# Disabling symbolic-links is recommended to prevent assorted security risks
+symbolic-links=0
+
+#log-error=/var/log/mysqld.log
+#pid-file=/var/run/mysqld/mysqld.pid
+
+# ----------------------------------------------
+# Enable the binlog for replication & CDC
+# ----------------------------------------------
+
+# Enable binary replication log and set the prefix, expiration, and log format.
+# The prefix is arbitrary, expiration can be short for integration tests but 
would
+# be longer on a production system. Row-level info is required for ingest to 
work.
+# Server ID is required, but this will vary on production systems
+server-id         = 223344
+log_bin           = mysql-bin
+expire_logs_days  = 1
+binlog_format     = row
+sql_mode          = "STRICT_TRANS_TABLES,NO_ENGINE_SUBSTITUTION"
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java
index f77d1aa18..ad7ec458e 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java
@@ -309,10 +309,12 @@ public abstract class DebeziumEventDeserializationSchema 
extends SourceRecordEve
                     return TimestampData.fromMillis((Long) dbzObj);
                 case MicroTimestamp.SCHEMA_NAME:
                     long micro = (long) dbzObj;
-                    return TimestampData.fromMillis(micro / 1000, (int) (micro 
% 1000 * 1000));
+                    return TimestampData.fromMillis(
+                            Math.floorDiv(micro, 1000), (int) 
(Math.floorMod(micro, 1000) * 1000));
                 case NanoTimestamp.SCHEMA_NAME:
                     long nano = (long) dbzObj;
-                    return TimestampData.fromMillis(nano / 1000_000, (int) 
(nano % 1000_000));
+                    return TimestampData.fromMillis(
+                            Math.floorDiv(nano, 1000_000), (int) 
(Math.floorMod(nano, 1000_000)));
             }
         }
         throw new IllegalArgumentException(
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
index 6a9642075..b68dcd171 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
@@ -481,12 +481,17 @@ public final class RowDataDebeziumDeserializeSchema
                             return TimestampData.fromEpochMillis((Long) 
dbzObj);
                         case MicroTimestamp.SCHEMA_NAME:
                             long micro = (long) dbzObj;
+                            // Use Math#floorDiv and Math#floorMod instead of 
`/` and `%`, because
+                            // timestamp number could be negative if we're 
handling timestamps prior
+                            // to 1970.
                             return TimestampData.fromEpochMillis(
-                                    micro / 1000, (int) (micro % 1000 * 1000));
+                                    Math.floorDiv(micro, 1000),
+                                    (int) (Math.floorMod(micro, 1000) * 1000));
                         case NanoTimestamp.SCHEMA_NAME:
                             long nano = (long) dbzObj;
                             return TimestampData.fromEpochMillis(
-                                    nano / 1000_000, (int) (nano % 1000_000));
+                                    Math.floorDiv(nano, 1000_000),
+                                    (int) (Math.floorMod(nano, 1000_000)));
                     }
                 }
                 LocalDateTime localDateTime =
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlAncientDateAndTimeITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlAncientDateAndTimeITCase.java
new file mode 100644
index 000000000..ca1a91b4e
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlAncientDateAndTimeITCase.java
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.source;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
+import org.apache.flink.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.RowUtils;
+import org.apache.flink.util.CloseableIterator;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.lifecycle.Startables;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** Integration tests for MySQL Table source to handle ancient date and time 
records. */
+public class MySqlAncientDateAndTimeITCase extends MySqlSourceTestBase {
+    private static final Logger LOG = 
LoggerFactory.getLogger(MySqlAncientDateAndTimeITCase.class);
+
+    private static final String TEST_USER = "mysqluser";
+    private static final String TEST_PASSWORD = "mysqlpw";
+
+    // We need an extra "no_zero_in_date = false" config to insert malformed 
date and time records.
+    private static final MySqlContainer MYSQL_CONTAINER =
+            createMySqlContainer(MySqlVersion.V8_0, 
"docker/server-allow-ancient-date-time/my.cnf");
+
+    private final UniqueDatabase ancientDatabase =
+            new UniqueDatabase(MYSQL_CONTAINER, "ancient_date_and_time", 
TEST_USER, TEST_PASSWORD);
+
+    private final StreamExecutionEnvironment env =
+            StreamExecutionEnvironment.getExecutionEnvironment();
+
+    @BeforeClass
+    public static void beforeClass() {
+        LOG.info("Starting MySql container...");
+        Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+        LOG.info("Container MySql is started.");
+    }
+
+    @AfterClass
+    public static void afterClass() {
+        LOG.info("Stopping MySql containers...");
+        MYSQL_CONTAINER.stop();
+        LOG.info("Container MySql is stopped.");
+    }
+
+    @Before
+    public void before() {
+        TestValuesTableFactory.clearAllData();
+        env.setParallelism(DEFAULT_PARALLELISM);
+        env.enableCheckpointing(200);
+        ancientDatabase.createAndInitialize();
+    }
+
+    @After
+    public void after() {
+        ancientDatabase.dropDatabase();
+    }
+
+    /**
+     * With the TimeAdjuster in Debezium, all date / time records between year 
0001 and 0099 will be
+     * shifted to 1971 ~ 2069.
+     */
+    @Test
+    public void 
testAncientDateAndTimeWithTimeAdjusterWithRowDataDeserializer() throws 
Exception {
+        // LocalDate.ofEpochDay reference:
+        // 
+---------------------------------------------------------------------------------+
+        // | 17390     | 11323    | 11720    | 23072    | -557266  | -1        
 | 18261      |
+        // | 2017/8/12 | 2001/1/1 | 2002/2/2 | 2033/3/3 | 0444/4/4 | 
1969/12/31 | 2019/12/31 |
+        // 
+---------------------------------------------------------------------------------+
+        runGenericAncientDateAndTimeTest(
+                MYSQL_CONTAINER,
+                ancientDatabase,
+                true,
+                DeserializerType.ROW_DATA,
+                Arrays.asList(
+                        "+I[1, 17390, 2016-07-13T17:17:17, 
2015-06-14T17:17:17.100, 2014-05-15T17:17:17.120, 2013-04-16T17:17:17.123, 
2012-03-17T17:17:17.123400, 2011-02-18T17:17:17.123450, 
2010-01-19T17:17:17.123456]",
+                        "+I[2, null, null, null, null, null, null, null, 
null]",
+                        "+I[3, 11323, 2001-01-01T16:16:16, 
2001-01-01T16:16:16.100, 2001-01-01T16:16:16.120, 2001-01-01T16:16:16.123, 
2001-01-01T16:16:16.123400, 2001-01-01T16:16:16.123450, 
2001-01-01T16:16:16.123456]",
+                        "+I[4, 11720, 2002-02-02T15:15:15, 
2002-02-02T15:15:15.100, 2002-02-02T15:15:15.120, 2002-02-02T15:15:15.123, 
2002-02-02T15:15:15.123400, 2002-02-02T15:15:15.123450, 
2002-02-02T15:15:15.123456]",
+                        "+I[5, 23072, 2033-03-03T14:14:14, 
2033-03-03T14:14:14.100, 2033-03-03T14:14:14.120, 2033-03-03T14:14:14.123, 
2033-03-03T14:14:14.123400, 2033-03-03T14:14:14.123450, 
2033-03-03T14:14:14.123456]",
+                        "+I[6, -557266, 0444-04-04T13:13:13, 
0444-04-04T13:13:13.100, 0444-04-04T13:13:13.120, 0444-04-04T13:13:13.123, 
0444-04-04T13:13:13.123400, 0444-04-04T13:13:13.123450, 
0444-04-04T13:13:13.123456]",
+                        "+I[7, -1, 1969-12-31T12:12:12, 
1969-12-31T12:12:12.100, 1969-12-31T12:12:12.120, 1969-12-31T12:12:12.123, 
1969-12-31T12:12:12.123400, 1969-12-31T12:12:12.123450, 
1969-12-31T12:12:12.123456]",
+                        "+I[8, 18261, 2019-12-31T23:11:11, 
2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123, 
2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450, 
2019-12-31T23:11:11.123456]"));
+    }
+
+    @Test
+    public void 
testAncientDateAndTimeWithoutTimeAdjusterWithRowDataDeserializer()
+            throws Exception {
+        // LocalDate.ofEpochDay reference:
+        // 
+---------------------------------------------------------------------------------+
+        // | -713095   | -719162  | -718765  | -707413  | -557266  | -1        
 | 18261      |
+        // | 0017/8/12 | 0001/1/1 | 0002/2/2 | 0033/3/3 | 0444/4/4 | 
1969/12/31 | 2019/12/31 |
+        // 
+---------------------------------------------------------------------------------+
+        runGenericAncientDateAndTimeTest(
+                MYSQL_CONTAINER,
+                ancientDatabase,
+                false,
+                DeserializerType.ROW_DATA,
+                Arrays.asList(
+                        "+I[1, -713095, 0016-07-13T17:17:17, 
0015-06-14T17:17:17.100, 0014-05-15T17:17:17.120, 0013-04-16T17:17:17.123, 
0012-03-17T17:17:17.123400, 0011-02-18T17:17:17.123450, 
0010-01-19T17:17:17.123456]",
+                        "+I[2, null, null, null, null, null, null, null, 
null]",
+                        "+I[3, -719162, 0001-01-01T16:16:16, 
0001-01-01T16:16:16.100, 0001-01-01T16:16:16.120, 0001-01-01T16:16:16.123, 
0001-01-01T16:16:16.123400, 0001-01-01T16:16:16.123450, 
0001-01-01T16:16:16.123456]",
+                        "+I[4, -718765, 0002-02-02T15:15:15, 
0002-02-02T15:15:15.100, 0002-02-02T15:15:15.120, 0002-02-02T15:15:15.123, 
0002-02-02T15:15:15.123400, 0002-02-02T15:15:15.123450, 
0002-02-02T15:15:15.123456]",
+                        "+I[5, -707413, 0033-03-03T14:14:14, 
0033-03-03T14:14:14.100, 0033-03-03T14:14:14.120, 0033-03-03T14:14:14.123, 
0033-03-03T14:14:14.123400, 0033-03-03T14:14:14.123450, 
0033-03-03T14:14:14.123456]",
+                        "+I[6, -557266, 0444-04-04T13:13:13, 
0444-04-04T13:13:13.100, 0444-04-04T13:13:13.120, 0444-04-04T13:13:13.123, 
0444-04-04T13:13:13.123400, 0444-04-04T13:13:13.123450, 
0444-04-04T13:13:13.123456]",
+                        "+I[7, -1, 1969-12-31T12:12:12, 
1969-12-31T12:12:12.100, 1969-12-31T12:12:12.120, 1969-12-31T12:12:12.123, 
1969-12-31T12:12:12.123400, 1969-12-31T12:12:12.123450, 
1969-12-31T12:12:12.123456]",
+                        "+I[8, 18261, 2019-12-31T23:11:11, 
2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123, 
2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450, 
2019-12-31T23:11:11.123456]"));
+    }
+
+    @Test
+    public void testAncientDateAndTimeWithTimeAdjusterWithJsonDeserializer() 
throws Exception {
+        // LocalDate.ofEpochDay reference:
+        //
+        // 
+---------------------------------------------------------------------------------+
+        // | 17390     | 11323    | 11720    | 23072    | -557266  | -1        
 | 18261      |
+        // | 2017/8/12 | 2001/1/1 | 2002/2/2 | 2033/3/3 | 0444/4/4 | 
1969/12/31 | 2019/12/31 |
+        // 
+---------------------------------------------------------------------------------+
+        //
+        // LocalDateTime.ofEpochSecond reference:
+        //
+        // Row 1:
+        //    1468430237000 -> 2016-07-13T17:17:17
+        //    1434302237100 -> 2015-06-14T17:17:17.100
+        //    1400174237120 -> 2014-05-15T17:17:17.120
+        //    1366132637123 -> 2013-04-16T17:17:17.123
+        // 1332004637123400 -> 2012-03-17T17:17:17.123400
+        // 1298049437123450 -> 2011-02-18T17:17:17.123450
+        // 1263921437123456 -> 2010-01-19T17:17:17.123456
+        //
+        // Row 2:
+        //   (null)
+        //
+        // Row 3:
+        //    978365776000 -> 2001-01-01T16:16:16
+        //    978365776100 -> 2001-01-01T16:16:16.100
+        //    978365776120 -> 2001-01-01T16:16:16.120
+        //    978365776123 -> 2001-01-01T16:16:16.123
+        // 978365776123400 -> 2001-01-01T16:16:16.123400
+        // 978365776123450 -> 2001-01-01T16:16:16.123450
+        // 978365776123456 -> 2001-01-01T16:16:16.123456
+        //
+        // Row 4:
+        //    1012662915000 -> 2002-02-02T15:15:15
+        //    1012662915100 -> 2002-02-02T15:15:15.100
+        //    1012662915120 -> 2002-02-02T15:15:15.120
+        //    1012662915123 -> 2002-02-02T15:15:15.123
+        // 1012662915123400 -> 2002-02-02T15:15:15.123400
+        // 1012662915123450 -> 2002-02-02T15:15:15.123450
+        // 1012662915123456 -> 2002-02-02T15:15:15.123456
+        //
+        // Row 5:
+        //    1993472054000 -> 2033-03-03T14:14:14
+        //    1993472054100 -> 2033-03-03T14:14:14.100
+        //    1993472054120 -> 2033-03-03T14:14:14.120
+        //    1993472054123 -> 2033-03-03T14:14:14.123
+        // 1993472054123400 -> 2033-03-03T14:14:14.123400
+        // 1993472054123450 -> 2033-03-03T14:14:14.123450
+        // 1993472054123456 -> 2033-03-03T14:14:14.123456
+        //
+        // Row 6:
+        //    -48147734807000 -> 0444-04-04T13:13:13
+        //    -48147734806900 -> 0444-04-04T13:13:13.100
+        //    -48147734806880 -> 0444-04-04T13:13:13.120
+        //    -48147734806877 -> 0444-04-04T13:13:13.123
+        // -48147734806876600 -> 0444-04-04T13:13:13.000123400
+        // -48147734806876550 -> 0444-04-04T13:13:13.000123450
+        // -48147734806876544 -> 0444-04-04T13:13:13.000123456
+        //
+        // Row 7:
+        //    -42468000 -> 1969-12-31T12:12:12
+        //    -42467900 -> 1969-12-31T12:12:12.100
+        //    -42467880 -> 1969-12-31T12:12:12.120
+        //    -42467877 -> 1969-12-31T12:12:12.123
+        // -42467876600 -> 1969-12-31T12:12:12.123400
+        // -42467876550 -> 1969-12-31T12:12:12.123450
+        // -42467876544 -> 1969-12-31T12:12:12.123456
+        //
+        // Row 8:
+        //    1577833871000 -> 2019-12-31T23:11:11
+        //    1577833871100 -> 2019-12-31T23:11:11.100
+        //    1577833871120 -> 2019-12-31T23:11:11.120
+        //    1577833871123 -> 2019-12-31T23:11:11.123
+        // 1577833871123400 -> 2019-12-31T23:11:11.123400
+        // 1577833871123450 -> 2019-12-31T23:11:11.123450
+        // 1577833871123456 -> 2019-12-31T23:11:11.123456
+
+        runGenericAncientDateAndTimeTest(
+                MYSQL_CONTAINER,
+                ancientDatabase,
+                true,
+                DeserializerType.JSON,
+                Arrays.asList(
+                        
"{\"id\":\"AQ==\",\"date_col\":17390,\"datetime_0_col\":1468430237000,\"datetime_1_col\":1434302237100,\"datetime_2_col\":1400174237120,\"datetime_3_col\":1366132637123,\"datetime_4_col\":1332004637123400,\"datetime_5_col\":1298049437123450,\"datetime_6_col\":1263921437123456}",
+                        
"{\"id\":\"Ag==\",\"date_col\":null,\"datetime_0_col\":null,\"datetime_1_col\":null,\"datetime_2_col\":null,\"datetime_3_col\":null,\"datetime_4_col\":null,\"datetime_5_col\":null,\"datetime_6_col\":null}",
+                        
"{\"id\":\"Aw==\",\"date_col\":11323,\"datetime_0_col\":978365776000,\"datetime_1_col\":978365776100,\"datetime_2_col\":978365776120,\"datetime_3_col\":978365776123,\"datetime_4_col\":978365776123400,\"datetime_5_col\":978365776123450,\"datetime_6_col\":978365776123456}",
+                        
"{\"id\":\"BA==\",\"date_col\":11720,\"datetime_0_col\":1012662915000,\"datetime_1_col\":1012662915100,\"datetime_2_col\":1012662915120,\"datetime_3_col\":1012662915123,\"datetime_4_col\":1012662915123400,\"datetime_5_col\":1012662915123450,\"datetime_6_col\":1012662915123456}",
+                        
"{\"id\":\"BQ==\",\"date_col\":23072,\"datetime_0_col\":1993472054000,\"datetime_1_col\":1993472054100,\"datetime_2_col\":1993472054120,\"datetime_3_col\":1993472054123,\"datetime_4_col\":1993472054123400,\"datetime_5_col\":1993472054123450,\"datetime_6_col\":1993472054123456}",
+                        
"{\"id\":\"Bg==\",\"date_col\":-557266,\"datetime_0_col\":-48147734807000,\"datetime_1_col\":-48147734806900,\"datetime_2_col\":-48147734806880,\"datetime_3_col\":-48147734806877,\"datetime_4_col\":-48147734806876600,\"datetime_5_col\":-48147734806876550,\"datetime_6_col\":-48147734806876544}",
+                        
"{\"id\":\"Bw==\",\"date_col\":-1,\"datetime_0_col\":-42468000,\"datetime_1_col\":-42467900,\"datetime_2_col\":-42467880,\"datetime_3_col\":-42467877,\"datetime_4_col\":-42467876600,\"datetime_5_col\":-42467876550,\"datetime_6_col\":-42467876544}",
+                        
"{\"id\":\"CA==\",\"date_col\":18261,\"datetime_0_col\":1577833871000,\"datetime_1_col\":1577833871100,\"datetime_2_col\":1577833871120,\"datetime_3_col\":1577833871123,\"datetime_4_col\":1577833871123400,\"datetime_5_col\":1577833871123450,\"datetime_6_col\":1577833871123456}"));
+    }
+
+    @Test
+    public void 
testAncientDateAndTimeWithoutTimeAdjusterWithJsonDeserializer() throws 
Exception {
+        // LocalDate.ofEpochDay reference:
+        //
+        // 
+---------------------------------------------------------------------------------+
+        // | -713095   | -719162  | -718765  | -707413  | -557266  | -1        
 | 18261      |
+        // | 0017/8/12 | 0001/1/1 | 0002/2/2 | 0033/3/3 | 0444/4/4 | 
1969/12/31 | 2019/12/31 |
+        // 
+---------------------------------------------------------------------------------+
+        //
+        // LocalDateTime.ofEpochSecond reference:
+        //
+        // Row 1:
+        //    -61645473763000 -> 0016-07-13T17:17:17
+        //    -61679601762900 -> 0015-06-14T17:17:17.100
+        //    -61713729762880 -> 0014-05-15T17:17:17.120
+        //    -61747771362877 -> 0013-04-16T17:17:17.123
+        // -61781899362876600 -> 0012-03-17T17:17:17.123400
+        // -61815854562876550 -> 0011-02-18T17:17:17.123450
+        // -61849982562876544 -> 0010-01-19T17:17:17.123456
+        //
+        // Row 2:
+        //   (null)
+        //
+        // Row 3:
+        //    -62135538224000 -> 0001-01-01T16:16:16
+        //    -62135538223900 -> 0001-01-01T16:16:16.100
+        //    -62135538223880 -> 0001-01-01T16:16:16.120
+        //    -62135538223877 -> 0001-01-01T16:16:16.123
+        // -62135538223876600 -> 0001-01-01T16:16:16.123400
+        // -62135538223876550 -> 0001-01-01T16:16:16.123450
+        // -62135538223876544 -> 0001-01-01T16:16:16.123456
+        //
+        // Row 4:
+        //    -62101241085000 -> 0002-02-02T15:15:15
+        //    -62101241084900 -> 0002-02-02T15:15:15.100
+        //    -62101241084880 -> 0002-02-02T15:15:15.120
+        //    -62101241084877 -> 0002-02-02T15:15:15.123
+        // -62101241084876600 -> 0002-02-02T15:15:15.123400
+        // -62101241084876550 -> 0002-02-02T15:15:15.123450
+        // -62101241084876544 -> 0002-02-02T15:15:15.123456
+        //
+        // Row 5:
+        //    -61120431946000 -> 0033-03-03T14:14:14
+        //    -61120431945900 -> 0033-03-03T14:14:14.100
+        //    -61120431945880 -> 0033-03-03T14:14:14.120
+        //    -61120431945877 -> 0033-03-03T14:14:14.123
+        // -61120431945876600 -> 0033-03-03T14:14:14.123400
+        // -61120431945876550 -> 0033-03-03T14:14:14.123450
+        // -61120431945876544 -> 0033-03-03T14:14:14.123456
+        //
+        //
+        // Row 6:
+        //    -48147734807000 -> 0444-04-04T13:13:13
+        //    -48147734806900 -> 0444-04-04T13:13:13.100
+        //    -48147734806880 -> 0444-04-04T13:13:13.120
+        //    -48147734806877 -> 0444-04-04T13:13:13.123
+        // -48147734806876600 -> 0444-04-04T13:13:13.000123400
+        // -48147734806876550 -> 0444-04-04T13:13:13.000123450
+        // -48147734806876544 -> 0444-04-04T13:13:13.000123456
+        //
+        // Row 7:
+        //    -42468000 -> 1969-12-31T12:12:12
+        //    -42467900 -> 1969-12-31T12:12:12.100
+        //    -42467880 -> 1969-12-31T12:12:12.120
+        //    -42467877 -> 1969-12-31T12:12:12.123
+        // -42467876600 -> 1969-12-31T12:12:12.123400
+        // -42467876550 -> 1969-12-31T12:12:12.123450
+        // -42467876544 -> 1969-12-31T12:12:12.123456
+        //
+        // Row 8:
+        //    1577833871000 -> 2019-12-31T23:11:11
+        //    1577833871100 -> 2019-12-31T23:11:11.100
+        //    1577833871120 -> 2019-12-31T23:11:11.120
+        //    1577833871123 -> 2019-12-31T23:11:11.123
+        // 1577833871123400 -> 2019-12-31T23:11:11.123400
+        // 1577833871123450 -> 2019-12-31T23:11:11.123450
+        // 1577833871123456 -> 2019-12-31T23:11:11.123456
+
+        runGenericAncientDateAndTimeTest(
+                MYSQL_CONTAINER,
+                ancientDatabase,
+                false,
+                DeserializerType.JSON,
+                Arrays.asList(
+                        
"{\"id\":\"AQ==\",\"date_col\":-713095,\"datetime_0_col\":-61645473763000,\"datetime_1_col\":-61679601762900,\"datetime_2_col\":-61713729762880,\"datetime_3_col\":-61747771362877,\"datetime_4_col\":-61781899362876600,\"datetime_5_col\":-61815854562876550,\"datetime_6_col\":-61849982562876544}",
+                        
"{\"id\":\"Ag==\",\"date_col\":null,\"datetime_0_col\":null,\"datetime_1_col\":null,\"datetime_2_col\":null,\"datetime_3_col\":null,\"datetime_4_col\":null,\"datetime_5_col\":null,\"datetime_6_col\":null}",
+                        
"{\"id\":\"Aw==\",\"date_col\":-719162,\"datetime_0_col\":-62135538224000,\"datetime_1_col\":-62135538223900,\"datetime_2_col\":-62135538223880,\"datetime_3_col\":-62135538223877,\"datetime_4_col\":-62135538223876600,\"datetime_5_col\":-62135538223876550,\"datetime_6_col\":-62135538223876544}",
+                        
"{\"id\":\"BA==\",\"date_col\":-718765,\"datetime_0_col\":-62101241085000,\"datetime_1_col\":-62101241084900,\"datetime_2_col\":-62101241084880,\"datetime_3_col\":-62101241084877,\"datetime_4_col\":-62101241084876600,\"datetime_5_col\":-62101241084876550,\"datetime_6_col\":-62101241084876544}",
+                        
"{\"id\":\"BQ==\",\"date_col\":-707413,\"datetime_0_col\":-61120431946000,\"datetime_1_col\":-61120431945900,\"datetime_2_col\":-61120431945880,\"datetime_3_col\":-61120431945877,\"datetime_4_col\":-61120431945876600,\"datetime_5_col\":-61120431945876550,\"datetime_6_col\":-61120431945876544}",
+                        
"{\"id\":\"Bg==\",\"date_col\":-557266,\"datetime_0_col\":-48147734807000,\"datetime_1_col\":-48147734806900,\"datetime_2_col\":-48147734806880,\"datetime_3_col\":-48147734806877,\"datetime_4_col\":-48147734806876600,\"datetime_5_col\":-48147734806876550,\"datetime_6_col\":-48147734806876544}",
+                        
"{\"id\":\"Bw==\",\"date_col\":-1,\"datetime_0_col\":-42468000,\"datetime_1_col\":-42467900,\"datetime_2_col\":-42467880,\"datetime_3_col\":-42467877,\"datetime_4_col\":-42467876600,\"datetime_5_col\":-42467876550,\"datetime_6_col\":-42467876544}",
+                        
"{\"id\":\"CA==\",\"date_col\":18261,\"datetime_0_col\":1577833871000,\"datetime_1_col\":1577833871100,\"datetime_2_col\":1577833871120,\"datetime_3_col\":1577833871123,\"datetime_4_col\":1577833871123400,\"datetime_5_col\":1577833871123450,\"datetime_6_col\":1577833871123456}"));
+    }
+
+    private void runGenericAncientDateAndTimeTest(
+            MySqlContainer container,
+            UniqueDatabase database,
+            boolean enableTimeAdjuster,
+            DeserializerType deserializerType,
+            List<String> expectedResults)
+            throws Exception {
+
+        switch (deserializerType) {
+            case ROW_DATA:
+                {
+                    // Build deserializer
+                    DataType dataType =
+                            DataTypes.ROW(
+                                    DataTypes.FIELD("id", DataTypes.INT()),
+                                    DataTypes.FIELD("date_col", 
DataTypes.DATE()),
+                                    DataTypes.FIELD("datetime_0_col", 
DataTypes.TIMESTAMP(0)),
+                                    DataTypes.FIELD("datetime_1_col", 
DataTypes.TIMESTAMP(1)),
+                                    DataTypes.FIELD("datetime_2_col", 
DataTypes.TIMESTAMP(2)),
+                                    DataTypes.FIELD("datetime_3_col", 
DataTypes.TIMESTAMP(3)),
+                                    DataTypes.FIELD("datetime_4_col", 
DataTypes.TIMESTAMP(4)),
+                                    DataTypes.FIELD("datetime_5_col", 
DataTypes.TIMESTAMP(5)),
+                                    DataTypes.FIELD("datetime_6_col", 
DataTypes.TIMESTAMP(6)));
+                    LogicalType logicalType = 
TypeConversions.fromDataToLogicalType(dataType);
+                    InternalTypeInfo<RowData> typeInfo = 
InternalTypeInfo.of(logicalType);
+                    RowDataDebeziumDeserializeSchema deserializer =
+                            RowDataDebeziumDeserializeSchema.newBuilder()
+                                    .setPhysicalRowType((RowType) 
dataType.getLogicalType())
+                                    .setResultTypeInfo(typeInfo)
+                                    .build();
+
+                    Properties dbzProperties = new Properties();
+                    dbzProperties.put("enable.time.adjuster", 
String.valueOf(enableTimeAdjuster));
+                    // Build source
+                    MySqlSource<RowData> mySqlSource =
+                            MySqlSource.<RowData>builder()
+                                    .hostname(container.getHost())
+                                    .port(container.getDatabasePort())
+                                    .databaseList(database.getDatabaseName())
+                                    .serverTimeZone("UTC")
+                                    .tableList(database.getDatabaseName() + 
".ancient_times")
+                                    .username(database.getUsername())
+                                    .password(database.getPassword())
+                                    .serverId(getServerId())
+                                    .deserializer(deserializer)
+                                    .startupOptions(StartupOptions.initial())
+                                    .debeziumProperties(dbzProperties)
+                                    .build();
+
+                    try (CloseableIterator<RowData> iterator =
+                            env.fromSource(
+                                            mySqlSource,
+                                            WatermarkStrategy.noWatermarks(),
+                                            "Fetch results")
+                                    .executeAndCollect()) {
+                        List<RowData> results = fetchRows(iterator, 
expectedResults.size());
+                        
Assertions.assertThat(convertRowDataToRowString(results))
+                                
.containsExactlyInAnyOrderElementsOf(expectedResults);
+                    }
+                }
+                break;
+            case JSON:
+                {
+                    JsonDebeziumDeserializationSchema deserializer =
+                            new JsonDebeziumDeserializationSchema();
+
+                    Properties dbzProperties = new Properties();
+                    dbzProperties.put("enable.time.adjuster", 
String.valueOf(enableTimeAdjuster));
+                    // Build source
+                    MySqlSource<String> mySqlSource =
+                            MySqlSource.<String>builder()
+                                    .hostname(container.getHost())
+                                    .port(container.getDatabasePort())
+                                    .databaseList(database.getDatabaseName())
+                                    .serverTimeZone("UTC")
+                                    .tableList(database.getDatabaseName() + 
".ancient_times")
+                                    .username(database.getUsername())
+                                    .password(database.getPassword())
+                                    .serverId(getServerId())
+                                    .deserializer(deserializer)
+                                    .startupOptions(StartupOptions.initial())
+                                    .debeziumProperties(dbzProperties)
+                                    .build();
+
+                    try (CloseableIterator<String> iterator =
+                            env.fromSource(
+                                            mySqlSource,
+                                            WatermarkStrategy.noWatermarks(),
+                                            "Fetch results")
+                                    .executeAndCollect()) {
+                        List<String> results = fetchRows(iterator, 
expectedResults.size());
+                        Assertions.assertThat(convertJsonToRowString(results))
+                                
.containsExactlyInAnyOrderElementsOf(expectedResults);
+                    }
+                }
+                break;
+            default:
+                throw new IllegalArgumentException(
+                        "Unknown deserializer type: " + deserializerType);
+        }
+    }
+
+    private static <T> List<T> fetchRows(Iterator<T> iter, int size) {
+        List<T> rows = new ArrayList<>(size);
+        while (size > 0 && iter.hasNext()) {
+            T row = iter.next();
+            rows.add(row);
+            size--;
+        }
+        return rows;
+    }
+
+    private static List<String> convertRowDataToRowString(List<RowData> rows) {
+        LinkedHashMap<String, Integer> map = new LinkedHashMap<>();
+        map.put("id_col", 0);
+        map.put("date_col", 1);
+        map.put("datetime_0_col", 2);
+        map.put("datetime_1_col", 3);
+        map.put("datetime_2_col", 4);
+        map.put("datetime_3_col", 5);
+        map.put("datetime_4_col", 6);
+        map.put("datetime_5_col", 7);
+        map.put("datetime_6_col", 8);
+        return rows.stream()
+                .map(
+                        row ->
+                                RowUtils.createRowWithNamedPositions(
+                                                row.getRowKind(),
+                                                new Object[] {
+                                                    wrap(row, 0, 
RowData::getInt),
+                                                    wrap(row, 1, 
RowData::getInt),
+                                                    wrap(row, 2, (r, i) -> 
r.getTimestamp(i, 0)),
+                                                    wrap(row, 3, (r, i) -> 
r.getTimestamp(i, 1)),
+                                                    wrap(row, 4, (r, i) -> 
r.getTimestamp(i, 2)),
+                                                    wrap(row, 5, (r, i) -> 
r.getTimestamp(i, 3)),
+                                                    wrap(row, 6, (r, i) -> 
r.getTimestamp(i, 4)),
+                                                    wrap(row, 7, (r, i) -> 
r.getTimestamp(i, 5)),
+                                                    wrap(row, 8, (r, i) -> 
r.getTimestamp(i, 6))
+                                                },
+                                                map)
+                                        .toString())
+                .collect(Collectors.toList());
+    }
+
+    private static List<String> convertJsonToRowString(List<String> rows) {
+        ObjectMapper mapper = new ObjectMapper();
+        return rows.stream()
+                .map(
+                        row -> {
+                            try {
+                                JsonNode node = mapper.readTree(row);
+                                return node.get("after").toString();
+                            } catch (JsonProcessingException e) {
+                                throw new RuntimeException(e);
+                            }
+                        })
+                .collect(Collectors.toList());
+    }
+
+    private static <T> Object wrap(RowData row, int index, BiFunction<RowData, 
Integer, T> getter) {
+        if (row.isNullAt(index)) {
+            return null;
+        }
+        return getter.apply(row, index);
+    }
+
+    private String getServerId() {
+        final Random random = new Random();
+        int serverId = random.nextInt(100) + 5400;
+        return serverId + "-" + (serverId + env.getParallelism());
+    }
+
+    enum DeserializerType {
+        JSON,
+        ROW_DATA
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlAncientDateAndTimeITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlAncientDateAndTimeITCase.java
new file mode 100644
index 000000000..b9ceaa783
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlAncientDateAndTimeITCase.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.table;
+
+import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.lifecycle.Startables;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Stream;
+
+import static org.apache.flink.api.common.JobStatus.RUNNING;
+
+/** Integration tests for MySQL Table source to handle ancient date and time 
records. */
+@RunWith(Parameterized.class)
+public class MySqlAncientDateAndTimeITCase extends MySqlSourceTestBase {
+    private static final Logger LOG = 
LoggerFactory.getLogger(MySqlAncientDateAndTimeITCase.class);
+
+    private static final String TEST_USER = "mysqluser";
+    private static final String TEST_PASSWORD = "mysqlpw";
+
+    // We need an extra "no_zero_in_date = false" config to insert malformed 
date and time records.
+    private static final MySqlContainer MYSQL_CONTAINER =
+            createMySqlContainer(MySqlVersion.V8_0, 
"docker/server-allow-ancient-date-time/my.cnf");
+
+    private final UniqueDatabase ancientDatabase =
+            new UniqueDatabase(MYSQL_CONTAINER, "ancient_date_and_time", 
TEST_USER, TEST_PASSWORD);
+
+    private final StreamExecutionEnvironment env =
+            StreamExecutionEnvironment.getExecutionEnvironment();
+    private final StreamTableEnvironment tEnv =
+            StreamTableEnvironment.create(
+                    env, 
EnvironmentSettings.newInstance().inStreamingMode().build());
+
+    private final boolean incrementalSnapshot;
+
+    @Parameterized.Parameters(name = "incrementalSnapshot: {0}")
+    public static Object[] parameters() {
+        return new Object[][] {new Object[] {false}, new Object[] {true}};
+    }
+
+    public MySqlAncientDateAndTimeITCase(boolean incrementalSnapshot) {
+        this.incrementalSnapshot = incrementalSnapshot;
+    }
+
+    @BeforeClass
+    public static void beforeClass() {
+        LOG.info("Starting MySql container...");
+        Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+        LOG.info("Container MySql is started.");
+    }
+
+    @AfterClass
+    public static void afterClass() {
+        LOG.info("Stopping MySql containers...");
+        MYSQL_CONTAINER.stop();
+        LOG.info("Container MySql is stopped.");
+    }
+
+    @Before
+    public void before() {
+        TestValuesTableFactory.clearAllData();
+        if (incrementalSnapshot) {
+            env.setParallelism(DEFAULT_PARALLELISM);
+            env.enableCheckpointing(200);
+        } else {
+            env.setParallelism(1);
+        }
+        ancientDatabase.createAndInitialize();
+    }
+
+    @After
+    public void after() {
+        ancientDatabase.dropDatabase();
+    }
+
+    /**
+     * With the TimeAdjuster in Debezium, all date / time records between year 
0001 and 0099 will be
+     * shifted to 1971 ~ 2069.
+     */
+    @Test
+    public void testAncientDateAndTimeWithTimeAdjuster() throws Exception {
+        runGenericAncientDateAndTimeTest(
+                MYSQL_CONTAINER,
+                ancientDatabase,
+                incrementalSnapshot,
+                true,
+                Arrays.asList(
+                        "+I[1, 2017-08-12, 2016-07-13T17:17:17, 
2015-06-14T17:17:17.100, 2014-05-15T17:17:17.120, 2013-04-16T17:17:17.123, 
2012-03-17T17:17:17.123400, 2011-02-18T17:17:17.123450, 
2010-01-19T17:17:17.123456]",
+                        "+I[2, null, null, null, null, null, null, null, 
null]",
+                        "+I[3, 2001-01-01, 2001-01-01T16:16:16, 
2001-01-01T16:16:16.100, 2001-01-01T16:16:16.120, 2001-01-01T16:16:16.123, 
2001-01-01T16:16:16.123400, 2001-01-01T16:16:16.123450, 
2001-01-01T16:16:16.123456]",
+                        "+I[4, 2002-02-02, 2002-02-02T15:15:15, 
2002-02-02T15:15:15.100, 2002-02-02T15:15:15.120, 2002-02-02T15:15:15.123, 
2002-02-02T15:15:15.123400, 2002-02-02T15:15:15.123450, 
2002-02-02T15:15:15.123456]",
+                        "+I[5, 2033-03-03, 2033-03-03T14:14:14, 
2033-03-03T14:14:14.100, 2033-03-03T14:14:14.120, 2033-03-03T14:14:14.123, 
2033-03-03T14:14:14.123400, 2033-03-03T14:14:14.123450, 
2033-03-03T14:14:14.123456]",
+                        "+I[6, 0444-04-04, 0444-04-04T13:13:13, 
0444-04-04T13:13:13.100, 0444-04-04T13:13:13.120, 0444-04-04T13:13:13.123, 
0444-04-04T13:13:13.123400, 0444-04-04T13:13:13.123450, 
0444-04-04T13:13:13.123456]",
+                        "+I[7, 1969-12-31, 1969-12-31T12:12:12, 
1969-12-31T12:12:12.100, 1969-12-31T12:12:12.120, 1969-12-31T12:12:12.123, 
1969-12-31T12:12:12.123400, 1969-12-31T12:12:12.123450, 
1969-12-31T12:12:12.123456]",
+                        "+I[8, 2019-12-31, 2019-12-31T23:11:11, 
2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123, 
2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450, 
2019-12-31T23:11:11.123456]"),
+                Arrays.asList(
+                        "+I[9, 2017-08-12, 2016-07-13T17:17:17, 
2015-06-14T17:17:17.100, 2014-05-15T17:17:17.120, 2013-04-16T17:17:17.123, 
2012-03-17T17:17:17.123400, 2011-02-18T17:17:17.123450, 
2010-01-19T17:17:17.123456]",
+                        "+I[10, null, null, null, null, null, null, null, 
null]",
+                        "+I[11, 2001-01-01, 2001-01-01T16:16:16, 
2001-01-01T16:16:16.100, 2001-01-01T16:16:16.120, 2001-01-01T16:16:16.123, 
2001-01-01T16:16:16.123400, 2001-01-01T16:16:16.123450, 
2001-01-01T16:16:16.123456]",
+                        "+I[12, 2002-02-02, 2002-02-02T15:15:15, 
2002-02-02T15:15:15.100, 2002-02-02T15:15:15.120, 2002-02-02T15:15:15.123, 
2002-02-02T15:15:15.123400, 2002-02-02T15:15:15.123450, 
2002-02-02T15:15:15.123456]",
+                        "+I[13, 2033-03-03, 2033-03-03T14:14:14, 
2033-03-03T14:14:14.100, 2033-03-03T14:14:14.120, 2033-03-03T14:14:14.123, 
2033-03-03T14:14:14.123400, 2033-03-03T14:14:14.123450, 
2033-03-03T14:14:14.123456]",
+                        "+I[14, 0444-04-04, 0444-04-04T13:13:13, 
0444-04-04T13:13:13.100, 0444-04-04T13:13:13.120, 0444-04-04T13:13:13.123, 
0444-04-04T13:13:13.123400, 0444-04-04T13:13:13.123450, 
0444-04-04T13:13:13.123456]",
+                        "+I[15, 1969-12-31, 1969-12-31T12:12:12, 
1969-12-31T12:12:12.100, 1969-12-31T12:12:12.120, 1969-12-31T12:12:12.123, 
1969-12-31T12:12:12.123400, 1969-12-31T12:12:12.123450, 
1969-12-31T12:12:12.123456]",
+                        "+I[16, 2019-12-31, 2019-12-31T23:11:11, 
2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123, 
2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450, 
2019-12-31T23:11:11.123456]"));
+    }
+
+    @Test
+    public void testAncientDateAndTimeWithoutTimeAdjuster() throws Exception {
+        runGenericAncientDateAndTimeTest(
+                MYSQL_CONTAINER,
+                ancientDatabase,
+                incrementalSnapshot,
+                false,
+                Arrays.asList(
+                        "+I[1, 0017-08-12, 0016-07-13T17:17:17, 
0015-06-14T17:17:17.100, 0014-05-15T17:17:17.120, 0013-04-16T17:17:17.123, 
0012-03-17T17:17:17.123400, 0011-02-18T17:17:17.123450, 
0010-01-19T17:17:17.123456]",
+                        "+I[2, null, null, null, null, null, null, null, 
null]",
+                        "+I[3, 0001-01-01, 0001-01-01T16:16:16, 
0001-01-01T16:16:16.100, 0001-01-01T16:16:16.120, 0001-01-01T16:16:16.123, 
0001-01-01T16:16:16.123400, 0001-01-01T16:16:16.123450, 
0001-01-01T16:16:16.123456]",
+                        "+I[4, 0002-02-02, 0002-02-02T15:15:15, 
0002-02-02T15:15:15.100, 0002-02-02T15:15:15.120, 0002-02-02T15:15:15.123, 
0002-02-02T15:15:15.123400, 0002-02-02T15:15:15.123450, 
0002-02-02T15:15:15.123456]",
+                        "+I[5, 0033-03-03, 0033-03-03T14:14:14, 
0033-03-03T14:14:14.100, 0033-03-03T14:14:14.120, 0033-03-03T14:14:14.123, 
0033-03-03T14:14:14.123400, 0033-03-03T14:14:14.123450, 
0033-03-03T14:14:14.123456]",
+                        "+I[6, 0444-04-04, 0444-04-04T13:13:13, 
0444-04-04T13:13:13.100, 0444-04-04T13:13:13.120, 0444-04-04T13:13:13.123, 
0444-04-04T13:13:13.123400, 0444-04-04T13:13:13.123450, 
0444-04-04T13:13:13.123456]",
+                        "+I[7, 1969-12-31, 1969-12-31T12:12:12, 
1969-12-31T12:12:12.100, 1969-12-31T12:12:12.120, 1969-12-31T12:12:12.123, 
1969-12-31T12:12:12.123400, 1969-12-31T12:12:12.123450, 
1969-12-31T12:12:12.123456]",
+                        "+I[8, 2019-12-31, 2019-12-31T23:11:11, 
2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123, 
2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450, 
2019-12-31T23:11:11.123456]"),
+                Arrays.asList(
+                        "+I[9, 0017-08-12, 0016-07-13T17:17:17, 
0015-06-14T17:17:17.100, 0014-05-15T17:17:17.120, 0013-04-16T17:17:17.123, 
0012-03-17T17:17:17.123400, 0011-02-18T17:17:17.123450, 
0010-01-19T17:17:17.123456]",
+                        "+I[10, null, null, null, null, null, null, null, 
null]",
+                        "+I[11, 0001-01-01, 0001-01-01T16:16:16, 
0001-01-01T16:16:16.100, 0001-01-01T16:16:16.120, 0001-01-01T16:16:16.123, 
0001-01-01T16:16:16.123400, 0001-01-01T16:16:16.123450, 
0001-01-01T16:16:16.123456]",
+                        "+I[12, 0002-02-02, 0002-02-02T15:15:15, 
0002-02-02T15:15:15.100, 0002-02-02T15:15:15.120, 0002-02-02T15:15:15.123, 
0002-02-02T15:15:15.123400, 0002-02-02T15:15:15.123450, 
0002-02-02T15:15:15.123456]",
+                        "+I[13, 0033-03-03, 0033-03-03T14:14:14, 
0033-03-03T14:14:14.100, 0033-03-03T14:14:14.120, 0033-03-03T14:14:14.123, 
0033-03-03T14:14:14.123400, 0033-03-03T14:14:14.123450, 
0033-03-03T14:14:14.123456]",
+                        "+I[14, 0444-04-04, 0444-04-04T13:13:13, 
0444-04-04T13:13:13.100, 0444-04-04T13:13:13.120, 0444-04-04T13:13:13.123, 
0444-04-04T13:13:13.123400, 0444-04-04T13:13:13.123450, 
0444-04-04T13:13:13.123456]",
+                        "+I[15, 1969-12-31, 1969-12-31T12:12:12, 
1969-12-31T12:12:12.100, 1969-12-31T12:12:12.120, 1969-12-31T12:12:12.123, 
1969-12-31T12:12:12.123400, 1969-12-31T12:12:12.123450, 
1969-12-31T12:12:12.123456]",
+                        "+I[16, 2019-12-31, 2019-12-31T23:11:11, 
2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123, 
2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450, 
2019-12-31T23:11:11.123456]"));
+    }
+
+    private void runGenericAncientDateAndTimeTest(
+            MySqlContainer container,
+            UniqueDatabase database,
+            boolean incrementalSnapshot,
+            boolean enableTimeAdjuster,
+            List<String> expectedSnapshotResults,
+            List<String> expectedStreamingResults)
+            throws Exception {
+        String sourceDDL =
+                String.format(
+                        "CREATE TABLE ancient_db ("
+                                + " `id` INT NOT NULL,"
+                                + " date_col DATE,"
+                                + " datetime_0_col TIMESTAMP(0),"
+                                + " datetime_1_col TIMESTAMP(1),"
+                                + " datetime_2_col TIMESTAMP(2),"
+                                + " datetime_3_col TIMESTAMP(3),"
+                                + " datetime_4_col TIMESTAMP(4),"
+                                + " datetime_5_col TIMESTAMP(5),"
+                                + " datetime_6_col TIMESTAMP(6),"
+                                + " primary key (`id`) not enforced"
+                                + ") WITH ("
+                                + " 'connector' = 'mysql-cdc',"
+                                + " 'hostname' = '%s',"
+                                + " 'port' = '%s',"
+                                + " 'username' = '%s',"
+                                + " 'password' = '%s',"
+                                + " 'database-name' = '%s',"
+                                + " 'table-name' = '%s',"
+                                + " 'scan.incremental.snapshot.enabled' = 
'%s',"
+                                + " 'server-time-zone' = 'UTC',"
+                                + " 'server-id' = '%s',"
+                                + " 'debezium.enable.time.adjuster' = '%s'"
+                                + ")",
+                        container.getHost(),
+                        container.getDatabasePort(),
+                        TEST_USER,
+                        TEST_PASSWORD,
+                        database.getDatabaseName(),
+                        "ancient_times",
+                        incrementalSnapshot,
+                        getServerId(),
+                        enableTimeAdjuster);
+
+        tEnv.executeSql(sourceDDL);
+
+        TableResult result = tEnv.executeSql("SELECT * FROM ancient_db");
+        do {
+            Thread.sleep(5000L);
+        } while (result.getJobClient().get().getJobStatus().get() != RUNNING);
+
+        CloseableIterator<Row> iterator = result.collect();
+
+        List<String> expectedRows = new ArrayList<>(expectedSnapshotResults);
+
+        Assertions.assertThat(fetchRows(iterator, expectedRows.size()))
+                .containsExactlyInAnyOrderElementsOf(expectedRows);
+
+        createBinlogEvents(database);
+
+        Assertions.assertThat(fetchRows(iterator, 
expectedStreamingResults.size()))
+                .containsExactlyInAnyOrderElementsOf(expectedStreamingResults);
+        result.getJobClient().get().cancel().get();
+    }
+
+    private static void createBinlogEvents(UniqueDatabase database) throws 
SQLException {
+        // Test reading identical data in binlog stage again
+        try (Connection connection = database.getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+            statement.execute(
+                    "INSERT INTO ancient_times VALUES (\n"
+                            + "    DEFAULT,\n"
+                            + "    '0017-08-12',\n"
+                            + "    '0016-07-13 17:17:17',\n"
+                            + "    '0015-06-14 17:17:17.1',\n"
+                            + "    '0014-05-15 17:17:17.12',\n"
+                            + "    '0013-04-16 17:17:17.123',\n"
+                            + "    '0012-03-17 17:17:17.1234',\n"
+                            + "    '0011-02-18 17:17:17.12345',\n"
+                            + "    '0010-01-19 17:17:17.123456'\n"
+                            + ");");
+            statement.execute(
+                    "INSERT INTO ancient_times VALUES (\n"
+                            + "    DEFAULT,\n"
+                            + "    '0000-00-00',\n"
+                            + "    '0000-00-00 00:00:00',\n"
+                            + "    '0000-00-00 00:00:00.0',\n"
+                            + "    '0000-00-00 00:00:00.00',\n"
+                            + "    '0000-00-00 00:00:00.000',\n"
+                            + "    '0000-00-00 00:00:00.0000',\n"
+                            + "    '0000-00-00 00:00:00.00000',\n"
+                            + "    '0000-00-00 00:00:00.000000'\n"
+                            + ");");
+            statement.execute(
+                    "INSERT INTO ancient_times VALUES (\n"
+                            + "    DEFAULT,\n"
+                            + "    '0001-01-01',\n"
+                            + "    '0001-01-01 16:16:16',\n"
+                            + "    '0001-01-01 16:16:16.1',\n"
+                            + "    '0001-01-01 16:16:16.12',\n"
+                            + "    '0001-01-01 16:16:16.123',\n"
+                            + "    '0001-01-01 16:16:16.1234',\n"
+                            + "    '0001-01-01 16:16:16.12345',\n"
+                            + "    '0001-01-01 16:16:16.123456'\n"
+                            + ");");
+            statement.execute(
+                    "INSERT INTO ancient_times VALUES (\n"
+                            + "    DEFAULT,\n"
+                            + "    '0002-02-02',\n"
+                            + "    '0002-02-02 15:15:15',\n"
+                            + "    '0002-02-02 15:15:15.1',\n"
+                            + "    '0002-02-02 15:15:15.12',\n"
+                            + "    '0002-02-02 15:15:15.123',\n"
+                            + "    '0002-02-02 15:15:15.1234',\n"
+                            + "    '0002-02-02 15:15:15.12345',\n"
+                            + "    '0002-02-02 15:15:15.123456'\n"
+                            + ");");
+            statement.execute(
+                    "INSERT INTO ancient_times VALUES (\n"
+                            + "    DEFAULT,\n"
+                            + "    '0033-03-03',\n"
+                            + "    '0033-03-03 14:14:14',\n"
+                            + "    '0033-03-03 14:14:14.1',\n"
+                            + "    '0033-03-03 14:14:14.12',\n"
+                            + "    '0033-03-03 14:14:14.123',\n"
+                            + "    '0033-03-03 14:14:14.1234',\n"
+                            + "    '0033-03-03 14:14:14.12345',\n"
+                            + "    '0033-03-03 14:14:14.123456'\n"
+                            + ");");
+            statement.execute(
+                    "INSERT INTO ancient_times VALUES (\n"
+                            + "    DEFAULT,\n"
+                            + "    '0444-04-04',\n"
+                            + "    '0444-04-04 13:13:13',\n"
+                            + "    '0444-04-04 13:13:13.1',\n"
+                            + "    '0444-04-04 13:13:13.12',\n"
+                            + "    '0444-04-04 13:13:13.123',\n"
+                            + "    '0444-04-04 13:13:13.1234',\n"
+                            + "    '0444-04-04 13:13:13.12345',\n"
+                            + "    '0444-04-04 13:13:13.123456'\n"
+                            + ");");
+            statement.execute(
+                    "INSERT INTO ancient_times VALUES (\n"
+                            + "    DEFAULT,\n"
+                            + "    '1969-12-31',\n"
+                            + "    '1969-12-31 12:12:12',\n"
+                            + "    '1969-12-31 12:12:12.1',\n"
+                            + "    '1969-12-31 12:12:12.12',\n"
+                            + "    '1969-12-31 12:12:12.123',\n"
+                            + "    '1969-12-31 12:12:12.1234',\n"
+                            + "    '1969-12-31 12:12:12.12345',\n"
+                            + "    '1969-12-31 12:12:12.123456'\n"
+                            + ");");
+            statement.execute(
+                    "INSERT INTO ancient_times VALUES (\n"
+                            + "    DEFAULT,\n"
+                            + "    '2019-12-31',\n"
+                            + "    '2019-12-31 23:11:11',\n"
+                            + "    '2019-12-31 23:11:11.1',\n"
+                            + "    '2019-12-31 23:11:11.12',\n"
+                            + "    '2019-12-31 23:11:11.123',\n"
+                            + "    '2019-12-31 23:11:11.1234',\n"
+                            + "    '2019-12-31 23:11:11.12345',\n"
+                            + "    '2019-12-31 23:11:11.123456'\n"
+                            + ");");
+        }
+    }
+
+    private static List<String> fetchRows(Iterator<Row> iter, int size) {
+        List<String> rows = new ArrayList<>(size);
+        while (size > 0 && iter.hasNext()) {
+            Row row = iter.next();
+            rows.add(row.toString());
+            size--;
+        }
+        return rows;
+    }
+
+    private String getServerId() {
+        final Random random = new Random();
+        int serverId = random.nextInt(100) + 5400;
+        if (incrementalSnapshot) {
+            return serverId + "-" + (serverId + env.getParallelism());
+        }
+        return String.valueOf(serverId);
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/ancient_date_and_time.sql
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/ancient_date_and_time.sql
new file mode 100644
index 000000000..e4ff4e595
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/ancient_date_and_time.sql
@@ -0,0 +1,124 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--      http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+CREATE TABLE ancient_times
+(
+    id             SERIAL,
+    date_col       DATE,
+    datetime_0_col DATETIME(0),
+    datetime_1_col DATETIME(1),
+    datetime_2_col DATETIME(2),
+    datetime_3_col DATETIME(3),
+    datetime_4_col DATETIME(4),
+    datetime_5_col DATETIME(5),
+    datetime_6_col DATETIME(6),
+    PRIMARY KEY (id)
+);
+
+INSERT INTO ancient_times VALUES (
+    DEFAULT,
+    '0017-08-12',
+    '0016-07-13 17:17:17',
+    '0015-06-14 17:17:17.1',
+    '0014-05-15 17:17:17.12',
+    '0013-04-16 17:17:17.123',
+    '0012-03-17 17:17:17.1234',
+    '0011-02-18 17:17:17.12345',
+    '0010-01-19 17:17:17.123456'
+);
+
+INSERT INTO ancient_times VALUES (
+    DEFAULT,
+    '0000-00-00',
+    '0000-00-00 00:00:00',
+    '0000-00-00 00:00:00.0',
+    '0000-00-00 00:00:00.00',
+    '0000-00-00 00:00:00.000',
+    '0000-00-00 00:00:00.0000',
+    '0000-00-00 00:00:00.00000',
+    '0000-00-00 00:00:00.000000'
+);
+
+INSERT INTO ancient_times VALUES (
+    DEFAULT,
+    '0001-01-01',
+    '0001-01-01 16:16:16',
+    '0001-01-01 16:16:16.1',
+    '0001-01-01 16:16:16.12',
+    '0001-01-01 16:16:16.123',
+    '0001-01-01 16:16:16.1234',
+    '0001-01-01 16:16:16.12345',
+    '0001-01-01 16:16:16.123456'
+);
+
+INSERT INTO ancient_times VALUES (
+    DEFAULT,
+    '0002-02-02',
+    '0002-02-02 15:15:15',
+    '0002-02-02 15:15:15.1',
+    '0002-02-02 15:15:15.12',
+    '0002-02-02 15:15:15.123',
+    '0002-02-02 15:15:15.1234',
+    '0002-02-02 15:15:15.12345',
+    '0002-02-02 15:15:15.123456'
+);
+
+INSERT INTO ancient_times VALUES (
+    DEFAULT,
+    '0033-03-03',
+    '0033-03-03 14:14:14',
+    '0033-03-03 14:14:14.1',
+    '0033-03-03 14:14:14.12',
+    '0033-03-03 14:14:14.123',
+    '0033-03-03 14:14:14.1234',
+    '0033-03-03 14:14:14.12345',
+    '0033-03-03 14:14:14.123456'
+);
+
+INSERT INTO ancient_times VALUES (
+    DEFAULT,
+    '0444-04-04',
+    '0444-04-04 13:13:13',
+    '0444-04-04 13:13:13.1',
+    '0444-04-04 13:13:13.12',
+    '0444-04-04 13:13:13.123',
+    '0444-04-04 13:13:13.1234',
+    '0444-04-04 13:13:13.12345',
+    '0444-04-04 13:13:13.123456'
+);
+
+INSERT INTO ancient_times VALUES (
+    DEFAULT,
+    '1969-12-31',
+    '1969-12-31 12:12:12',
+    '1969-12-31 12:12:12.1',
+    '1969-12-31 12:12:12.12',
+    '1969-12-31 12:12:12.123',
+    '1969-12-31 12:12:12.1234',
+    '1969-12-31 12:12:12.12345',
+    '1969-12-31 12:12:12.123456'
+);
+
+INSERT INTO ancient_times VALUES (
+    DEFAULT,
+    '2019-12-31',
+    '2019-12-31 23:11:11',
+    '2019-12-31 23:11:11.1',
+    '2019-12-31 23:11:11.12',
+    '2019-12-31 23:11:11.123',
+    '2019-12-31 23:11:11.1234',
+    '2019-12-31 23:11:11.12345',
+    '2019-12-31 23:11:11.123456'
+);
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/docker/server-allow-ancient-date-time/my.cnf
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/docker/server-allow-ancient-date-time/my.cnf
new file mode 100644
index 000000000..ca0483780
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/docker/server-allow-ancient-date-time/my.cnf
@@ -0,0 +1,58 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#   http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# For advice on how to change settings please see
+# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
+
+[mysqld]
+#
+# Remove leading # and set to the amount of RAM for the most important data
+# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
+# innodb_buffer_pool_size = 128M
+#
+# Remove leading # to turn on a very important data integrity option: logging
+# changes to the binary log between backups.
+# log_bin
+#
+# Remove leading # to set options mainly useful for reporting servers.
+# The server defaults are faster for transactions and fast SELECTs.
+# Adjust sizes as needed, experiment to find the optimal values.
+# join_buffer_size = 128M
+# sort_buffer_size = 2M
+# read_rnd_buffer_size = 2M
+skip-host-cache
+skip-name-resolve
+#datadir=/var/lib/mysql
+#socket=/var/lib/mysql/mysql.sock
+secure-file-priv=/var/lib/mysql
+user=mysql
+
+# Disabling symbolic-links is recommended to prevent assorted security risks
+symbolic-links=0
+
+#log-error=/var/log/mysqld.log
+#pid-file=/var/run/mysqld/mysqld.pid
+
+# ----------------------------------------------
+# Enable the binlog for replication & CDC
+# ----------------------------------------------
+
+# Enable binary replication log and set the prefix, expiration, and log format.
+# The prefix is arbitrary, expiration can be short for integration tests but 
would
+# be longer on a production system. Row-level info is required for ingest to 
work.
+# Server ID is required, but this will vary on production systems
+server-id         = 223344
+log_bin           = mysql-bin
+expire_logs_days  = 1
+binlog_format     = row
+sql_mode          = "STRICT_TRANS_TABLES,NO_ENGINE_SUBSTITUTION"

Reply via email to