This is an automated email from the ASF dual-hosted git repository. ruanhang1993 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push: new 4baee8c63 [FLINK-37000][pipeline-connector][mysql] Fix MySQL CDC could not handle datetime prior to 1970 properly (#3834) 4baee8c63 is described below commit 4baee8c6369d3cfb58ff06ae3eb2042a61e8809b Author: yuxiqian <34335406+yuxiq...@users.noreply.github.com> AuthorDate: Tue Apr 8 16:19:58 2025 +0800 [FLINK-37000][pipeline-connector][mysql] Fix MySQL CDC could not handle datetime prior to 1970 properly (#3834) --- .../source/MySqlAncientDateAndTimeITCase.java | 412 ++++++++++++++++ .../test/resources/ddl/ancient_date_and_time.sql | 124 +++++ .../docker/server-allow-ancient-date-time/my.cnf | 58 +++ .../event/DebeziumEventDeserializationSchema.java | 6 +- .../table/RowDataDebeziumDeserializeSchema.java | 9 +- .../source/MySqlAncientDateAndTimeITCase.java | 521 +++++++++++++++++++++ .../mysql/table/MySqlAncientDateAndTimeITCase.java | 363 ++++++++++++++ .../test/resources/ddl/ancient_date_and_time.sql | 124 +++++ .../docker/server-allow-ancient-date-time/my.cnf | 58 +++ 9 files changed, 1671 insertions(+), 4 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlAncientDateAndTimeITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlAncientDateAndTimeITCase.java new file mode 100644 index 000000000..b630b5c8b --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlAncientDateAndTimeITCase.java @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.source.FlinkSourceProvider; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.common.utils.SchemaUtils; +import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; +import org.apache.flink.cdc.connectors.mysql.table.StartupOptions; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.CloseableIterator; + +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.lifecycle.Startables; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.stream.Stream; + +/** Integration tests for MySQL Table source to handle ancient date and time records. */ +public class MySqlAncientDateAndTimeITCase extends MySqlSourceTestBase { + private static final Logger LOG = LoggerFactory.getLogger(MySqlAncientDateAndTimeITCase.class); + + private static final String TEST_USER = "mysqluser"; + private static final String TEST_PASSWORD = "mysqlpw"; + + // We need an extra "no_zero_in_date = false" config to insert malformed date and time records. + private static final MySqlContainer MYSQL_CONTAINER = + createMySqlContainer(MySqlVersion.V8_0, "docker/server-allow-ancient-date-time/my.cnf"); + + private final UniqueDatabase ancientDatabase = + new UniqueDatabase(MYSQL_CONTAINER, "ancient_date_and_time", TEST_USER, TEST_PASSWORD); + + private final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + + @BeforeClass + public static void beforeClass() { + LOG.info("Starting MySql container..."); + Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join(); + LOG.info("Container MySql is started."); + } + + @AfterClass + public static void afterClass() { + LOG.info("Stopping MySql containers..."); + MYSQL_CONTAINER.stop(); + LOG.info("Container MySql is stopped."); + } + + @Before + public void before() { + env.setParallelism(DEFAULT_PARALLELISM); + env.enableCheckpointing(200); + env.setRestartStrategy(RestartStrategies.noRestart()); + ancientDatabase.createAndInitialize(); + } + + @After + public void after() { + ancientDatabase.dropDatabase(); + } + + /** + * With the TimeAdjuster in Debezium, all date / time records between year 0001 and 0099 will be + * shifted to 1971 ~ 2069. + */ + @Test + public void testAncientDateAndTimeWithTimeAdjuster() throws Exception { + // LocalDate.ofEpochDay reference: + // +---------------------------------------------------------------------------------+ + // | 17390 | 11323 | 11720 | 23072 | -557266 | -1 | 18261 | + // | 2017/8/12 | 2001/1/1 | 2002/2/2 | 2033/3/3 | 0444/4/4 | 1969/12/31 | 2019/12/31 | + // +---------------------------------------------------------------------------------+ + runGenericAncientDateAndTimeTest( + ancientDatabase, + true, + Arrays.asList( + "[1, 17390, 2016-07-13T17:17:17, 2015-06-14T17:17:17.100, 2014-05-15T17:17:17.120, 2013-04-16T17:17:17.123, 2012-03-17T17:17:17.123400, 2011-02-18T17:17:17.123450, 2010-01-19T17:17:17.123456]", + "[2, null, null, null, null, null, null, null, null]", + "[3, 11323, 2001-01-01T16:16:16, 2001-01-01T16:16:16.100, 2001-01-01T16:16:16.120, 2001-01-01T16:16:16.123, 2001-01-01T16:16:16.123400, 2001-01-01T16:16:16.123450, 2001-01-01T16:16:16.123456]", + "[4, 11720, 2002-02-02T15:15:15, 2002-02-02T15:15:15.100, 2002-02-02T15:15:15.120, 2002-02-02T15:15:15.123, 2002-02-02T15:15:15.123400, 2002-02-02T15:15:15.123450, 2002-02-02T15:15:15.123456]", + "[5, 23072, 2033-03-03T14:14:14, 2033-03-03T14:14:14.100, 2033-03-03T14:14:14.120, 2033-03-03T14:14:14.123, 2033-03-03T14:14:14.123400, 2033-03-03T14:14:14.123450, 2033-03-03T14:14:14.123456]", + "[6, -557266, 0444-04-04T13:13:13, 0444-04-04T13:13:13.100, 0444-04-04T13:13:13.120, 0444-04-04T13:13:13.123, 0444-04-04T13:13:13.123400, 0444-04-04T13:13:13.123450, 0444-04-04T13:13:13.123456]", + "[7, -1, 1969-12-31T12:12:12, 1969-12-31T12:12:12.100, 1969-12-31T12:12:12.120, 1969-12-31T12:12:12.123, 1969-12-31T12:12:12.123400, 1969-12-31T12:12:12.123450, 1969-12-31T12:12:12.123456]", + "[8, 18261, 2019-12-31T23:11:11, 2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123, 2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450, 2019-12-31T23:11:11.123456]"), + Arrays.asList( + "[9, 17390, 2016-07-13T17:17:17, 2015-06-14T17:17:17.100, 2014-05-15T17:17:17.120, 2013-04-16T17:17:17.123, 2012-03-17T17:17:17.123400, 2011-02-18T17:17:17.123450, 2010-01-19T17:17:17.123456]", + "[10, null, null, null, null, null, null, null, null]", + "[11, 11323, 2001-01-01T16:16:16, 2001-01-01T16:16:16.100, 2001-01-01T16:16:16.120, 2001-01-01T16:16:16.123, 2001-01-01T16:16:16.123400, 2001-01-01T16:16:16.123450, 2001-01-01T16:16:16.123456]", + "[12, 11720, 2002-02-02T15:15:15, 2002-02-02T15:15:15.100, 2002-02-02T15:15:15.120, 2002-02-02T15:15:15.123, 2002-02-02T15:15:15.123400, 2002-02-02T15:15:15.123450, 2002-02-02T15:15:15.123456]", + "[13, 23072, 2033-03-03T14:14:14, 2033-03-03T14:14:14.100, 2033-03-03T14:14:14.120, 2033-03-03T14:14:14.123, 2033-03-03T14:14:14.123400, 2033-03-03T14:14:14.123450, 2033-03-03T14:14:14.123456]", + "[14, -557266, 0444-04-04T13:13:13, 0444-04-04T13:13:13.100, 0444-04-04T13:13:13.120, 0444-04-04T13:13:13.123, 0444-04-04T13:13:13.123400, 0444-04-04T13:13:13.123450, 0444-04-04T13:13:13.123456]", + "[15, -1, 1969-12-31T12:12:12, 1969-12-31T12:12:12.100, 1969-12-31T12:12:12.120, 1969-12-31T12:12:12.123, 1969-12-31T12:12:12.123400, 1969-12-31T12:12:12.123450, 1969-12-31T12:12:12.123456]", + "[16, 18261, 2019-12-31T23:11:11, 2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123, 2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450, 2019-12-31T23:11:11.123456]")); + } + + @Test + public void testAncientDateAndTimeWithoutTimeAdjuster() throws Exception { + // LocalDate.ofEpochDay reference: + // +---------------------------------------------------------------------------------+ + // | -713095 | -719162 | -718765 | -707413 | -557266 | -1 | 18261 | + // | 0017/8/12 | 0001/1/1 | 0002/2/2 | 0033/3/3 | 0444/4/4 | 1969/12/31 | 2019/12/31 | + // +---------------------------------------------------------------------------------+ + runGenericAncientDateAndTimeTest( + ancientDatabase, + false, + Arrays.asList( + "[1, -713095, 0016-07-13T17:17:17, 0015-06-14T17:17:17.100, 0014-05-15T17:17:17.120, 0013-04-16T17:17:17.123, 0012-03-17T17:17:17.123400, 0011-02-18T17:17:17.123450, 0010-01-19T17:17:17.123456]", + "[2, null, null, null, null, null, null, null, null]", + "[3, -719162, 0001-01-01T16:16:16, 0001-01-01T16:16:16.100, 0001-01-01T16:16:16.120, 0001-01-01T16:16:16.123, 0001-01-01T16:16:16.123400, 0001-01-01T16:16:16.123450, 0001-01-01T16:16:16.123456]", + "[4, -718765, 0002-02-02T15:15:15, 0002-02-02T15:15:15.100, 0002-02-02T15:15:15.120, 0002-02-02T15:15:15.123, 0002-02-02T15:15:15.123400, 0002-02-02T15:15:15.123450, 0002-02-02T15:15:15.123456]", + "[5, -707413, 0033-03-03T14:14:14, 0033-03-03T14:14:14.100, 0033-03-03T14:14:14.120, 0033-03-03T14:14:14.123, 0033-03-03T14:14:14.123400, 0033-03-03T14:14:14.123450, 0033-03-03T14:14:14.123456]", + "[6, -557266, 0444-04-04T13:13:13, 0444-04-04T13:13:13.100, 0444-04-04T13:13:13.120, 0444-04-04T13:13:13.123, 0444-04-04T13:13:13.123400, 0444-04-04T13:13:13.123450, 0444-04-04T13:13:13.123456]", + "[7, -1, 1969-12-31T12:12:12, 1969-12-31T12:12:12.100, 1969-12-31T12:12:12.120, 1969-12-31T12:12:12.123, 1969-12-31T12:12:12.123400, 1969-12-31T12:12:12.123450, 1969-12-31T12:12:12.123456]", + "[8, 18261, 2019-12-31T23:11:11, 2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123, 2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450, 2019-12-31T23:11:11.123456]"), + Arrays.asList( + "[9, -713095, 0016-07-13T17:17:17, 0015-06-14T17:17:17.100, 0014-05-15T17:17:17.120, 0013-04-16T17:17:17.123, 0012-03-17T17:17:17.123400, 0011-02-18T17:17:17.123450, 0010-01-19T17:17:17.123456]", + "[10, null, null, null, null, null, null, null, null]", + "[11, -719162, 0001-01-01T16:16:16, 0001-01-01T16:16:16.100, 0001-01-01T16:16:16.120, 0001-01-01T16:16:16.123, 0001-01-01T16:16:16.123400, 0001-01-01T16:16:16.123450, 0001-01-01T16:16:16.123456]", + "[12, -718765, 0002-02-02T15:15:15, 0002-02-02T15:15:15.100, 0002-02-02T15:15:15.120, 0002-02-02T15:15:15.123, 0002-02-02T15:15:15.123400, 0002-02-02T15:15:15.123450, 0002-02-02T15:15:15.123456]", + "[13, -707413, 0033-03-03T14:14:14, 0033-03-03T14:14:14.100, 0033-03-03T14:14:14.120, 0033-03-03T14:14:14.123, 0033-03-03T14:14:14.123400, 0033-03-03T14:14:14.123450, 0033-03-03T14:14:14.123456]", + "[14, -557266, 0444-04-04T13:13:13, 0444-04-04T13:13:13.100, 0444-04-04T13:13:13.120, 0444-04-04T13:13:13.123, 0444-04-04T13:13:13.123400, 0444-04-04T13:13:13.123450, 0444-04-04T13:13:13.123456]", + "[15, -1, 1969-12-31T12:12:12, 1969-12-31T12:12:12.100, 1969-12-31T12:12:12.120, 1969-12-31T12:12:12.123, 1969-12-31T12:12:12.123400, 1969-12-31T12:12:12.123450, 1969-12-31T12:12:12.123456]", + "[16, 18261, 2019-12-31T23:11:11, 2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123, 2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450, 2019-12-31T23:11:11.123456]")); + } + + private void runGenericAncientDateAndTimeTest( + UniqueDatabase database, + boolean enableTimeAdjuster, + List<String> expectedSnapshotResults, + List<String> expectedStreamingResults) + throws Exception { + Schema ancientSchema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT().notNull()) + .physicalColumn("date_col", DataTypes.DATE(), null, "0017-08-12") + .physicalColumn( + "datetime_0_col", + DataTypes.TIMESTAMP(0), + null, + "0016-07-13 17:17:17") + .physicalColumn( + "datetime_1_col", + DataTypes.TIMESTAMP(1), + null, + "0015-06-14 17:17:17.1") + .physicalColumn( + "datetime_2_col", + DataTypes.TIMESTAMP(2), + null, + "0014-05-15 17:17:17.12") + .physicalColumn( + "datetime_3_col", + DataTypes.TIMESTAMP(3), + null, + "0013-04-16 17:17:17.123") + .physicalColumn( + "datetime_4_col", + DataTypes.TIMESTAMP(4), + null, + "0012-03-17 17:17:17.1234") + .physicalColumn( + "datetime_5_col", + DataTypes.TIMESTAMP(5), + null, + "0011-02-18 17:17:17.12345") + .physicalColumn( + "datetime_6_col", + DataTypes.TIMESTAMP(6), + null, + "0010-01-19 17:17:17.123456") + .primaryKey("id") + .build(); + List<RecordData.FieldGetter> ancientSchemaFieldGetters = + SchemaUtils.createFieldGetters(ancientSchema); + CreateTableEvent ancientCreateTableEvent = + new CreateTableEvent( + TableId.tableId(ancientDatabase.getDatabaseName(), "ancient_times"), + ancientSchema); + try (CloseableIterator<Event> iterator = + env.fromSource( + getFlinkSourceProvider( + new String[] {"ancient_times"}, + database, + enableTimeAdjuster) + .getSource(), + WatermarkStrategy.noWatermarks(), + "Event-Source") + .executeAndCollect()) { + + { + Tuple2<List<Event>, List<CreateTableEvent>> snapshotResults = + fetchResultsAndCreateTableEvent(iterator, expectedSnapshotResults.size()); + Assertions.assertThat(snapshotResults.f1).isSubsetOf(ancientCreateTableEvent); + Assertions.assertThat(snapshotResults.f0) + .map(evt -> (DataChangeEvent) evt) + .map( + evt -> + SchemaUtils.restoreOriginalData( + evt.after(), ancientSchemaFieldGetters) + .toString()) + .containsExactlyInAnyOrderElementsOf(expectedSnapshotResults); + } + + createBinlogEvents(ancientDatabase); + + { + Tuple2<List<Event>, List<CreateTableEvent>> streamingResults = + fetchResultsAndCreateTableEvent(iterator, expectedSnapshotResults.size()); + Assertions.assertThat(streamingResults.f1).isSubsetOf(ancientCreateTableEvent); + Assertions.assertThat(streamingResults.f0) + .map(evt -> (DataChangeEvent) evt) + .map( + evt -> + SchemaUtils.restoreOriginalData( + evt.after(), ancientSchemaFieldGetters) + .toString()) + .containsExactlyInAnyOrderElementsOf(expectedStreamingResults); + } + } + } + + private FlinkSourceProvider getFlinkSourceProvider( + String[] captureTables, UniqueDatabase database, boolean enableTimeAdjuster) { + String[] captureTableIds = + Arrays.stream(captureTables) + .map(tableName -> database.getDatabaseName() + "." + tableName) + .toArray(String[]::new); + + Properties dbzProperties = new Properties(); + dbzProperties.put("enable.time.adjuster", String.valueOf(enableTimeAdjuster)); + + MySqlSourceConfigFactory configFactory = + new MySqlSourceConfigFactory() + .startupOptions(StartupOptions.initial()) + .databaseList(database.getDatabaseName()) + .tableList(captureTableIds) + .includeSchemaChanges(false) + .hostname(database.getHost()) + .port(database.getDatabasePort()) + .splitSize(10) + .fetchSize(2) + .username(database.getUsername()) + .password(database.getPassword()) + .serverTimeZone(ZoneId.of("UTC").toString()) + .serverId(MySqSourceTestUtils.getServerId(env.getParallelism())) + .debeziumProperties(dbzProperties); + return (FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider(); + } + + private static void createBinlogEvents(UniqueDatabase database) throws SQLException { + // Test reading identical data in binlog stage again + try (Connection connection = database.getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + "INSERT INTO ancient_times VALUES (\n" + + " DEFAULT,\n" + + " DEFAULT,\n" + + " DEFAULT,\n" + + " DEFAULT,\n" + + " DEFAULT,\n" + + " DEFAULT,\n" + + " DEFAULT,\n" + + " DEFAULT,\n" + + " DEFAULT\n" + + ");"); + statement.execute( + "INSERT INTO ancient_times VALUES (\n" + + " DEFAULT,\n" + + " '0000-00-00',\n" + + " '0000-00-00 00:00:00',\n" + + " '0000-00-00 00:00:00.0',\n" + + " '0000-00-00 00:00:00.00',\n" + + " '0000-00-00 00:00:00.000',\n" + + " '0000-00-00 00:00:00.0000',\n" + + " '0000-00-00 00:00:00.00000',\n" + + " '0000-00-00 00:00:00.000000'\n" + + ");"); + statement.execute( + "INSERT INTO ancient_times VALUES (\n" + + " DEFAULT,\n" + + " '0001-01-01',\n" + + " '0001-01-01 16:16:16',\n" + + " '0001-01-01 16:16:16.1',\n" + + " '0001-01-01 16:16:16.12',\n" + + " '0001-01-01 16:16:16.123',\n" + + " '0001-01-01 16:16:16.1234',\n" + + " '0001-01-01 16:16:16.12345',\n" + + " '0001-01-01 16:16:16.123456'\n" + + ");"); + statement.execute( + "INSERT INTO ancient_times VALUES (\n" + + " DEFAULT,\n" + + " '0002-02-02',\n" + + " '0002-02-02 15:15:15',\n" + + " '0002-02-02 15:15:15.1',\n" + + " '0002-02-02 15:15:15.12',\n" + + " '0002-02-02 15:15:15.123',\n" + + " '0002-02-02 15:15:15.1234',\n" + + " '0002-02-02 15:15:15.12345',\n" + + " '0002-02-02 15:15:15.123456'\n" + + ");"); + statement.execute( + "INSERT INTO ancient_times VALUES (\n" + + " DEFAULT,\n" + + " '0033-03-03',\n" + + " '0033-03-03 14:14:14',\n" + + " '0033-03-03 14:14:14.1',\n" + + " '0033-03-03 14:14:14.12',\n" + + " '0033-03-03 14:14:14.123',\n" + + " '0033-03-03 14:14:14.1234',\n" + + " '0033-03-03 14:14:14.12345',\n" + + " '0033-03-03 14:14:14.123456'\n" + + ");"); + statement.execute( + "INSERT INTO ancient_times VALUES (\n" + + " DEFAULT,\n" + + " '0444-04-04',\n" + + " '0444-04-04 13:13:13',\n" + + " '0444-04-04 13:13:13.1',\n" + + " '0444-04-04 13:13:13.12',\n" + + " '0444-04-04 13:13:13.123',\n" + + " '0444-04-04 13:13:13.1234',\n" + + " '0444-04-04 13:13:13.12345',\n" + + " '0444-04-04 13:13:13.123456'\n" + + ");"); + statement.execute( + "INSERT INTO ancient_times VALUES (\n" + + " DEFAULT,\n" + + " '1969-12-31',\n" + + " '1969-12-31 12:12:12',\n" + + " '1969-12-31 12:12:12.1',\n" + + " '1969-12-31 12:12:12.12',\n" + + " '1969-12-31 12:12:12.123',\n" + + " '1969-12-31 12:12:12.1234',\n" + + " '1969-12-31 12:12:12.12345',\n" + + " '1969-12-31 12:12:12.123456'\n" + + ");"); + statement.execute( + "INSERT INTO ancient_times VALUES (\n" + + " DEFAULT,\n" + + " '2019-12-31',\n" + + " '2019-12-31 23:11:11',\n" + + " '2019-12-31 23:11:11.1',\n" + + " '2019-12-31 23:11:11.12',\n" + + " '2019-12-31 23:11:11.123',\n" + + " '2019-12-31 23:11:11.1234',\n" + + " '2019-12-31 23:11:11.12345',\n" + + " '2019-12-31 23:11:11.123456'\n" + + ");"); + } + } + + public static <T> Tuple2<List<T>, List<CreateTableEvent>> fetchResultsAndCreateTableEvent( + Iterator<T> iter, int size) { + List<T> result = new ArrayList<>(size); + List<CreateTableEvent> createTableEvents = new ArrayList<>(); + while (size > 0 && iter.hasNext()) { + T event = iter.next(); + if (event instanceof CreateTableEvent) { + createTableEvents.add((CreateTableEvent) event); + } else { + result.add(event); + size--; + } + } + return Tuple2.of(result, createTableEvents); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/ancient_date_and_time.sql b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/ancient_date_and_time.sql new file mode 100644 index 000000000..93832b9b7 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/ancient_date_and_time.sql @@ -0,0 +1,124 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +CREATE TABLE ancient_times +( + id INT NOT NULL AUTO_INCREMENT, + date_col DATE DEFAULT '0017-08-12', + datetime_0_col DATETIME(0) DEFAULT '0016-07-13 17:17:17', + datetime_1_col DATETIME(1) DEFAULT '0015-06-14 17:17:17.1', + datetime_2_col DATETIME(2) DEFAULT '0014-05-15 17:17:17.12', + datetime_3_col DATETIME(3) DEFAULT '0013-04-16 17:17:17.123', + datetime_4_col DATETIME(4) DEFAULT '0012-03-17 17:17:17.1234', + datetime_5_col DATETIME(5) DEFAULT '0011-02-18 17:17:17.12345', + datetime_6_col DATETIME(6) DEFAULT '0010-01-19 17:17:17.123456', + PRIMARY KEY (id) +); + +INSERT INTO ancient_times VALUES ( + DEFAULT, + DEFAULT, + DEFAULT, + DEFAULT, + DEFAULT, + DEFAULT, + DEFAULT, + DEFAULT, + DEFAULT +); + +INSERT INTO ancient_times VALUES ( + DEFAULT, + '0000-00-00', + '0000-00-00 00:00:00', + '0000-00-00 00:00:00.0', + '0000-00-00 00:00:00.00', + '0000-00-00 00:00:00.000', + '0000-00-00 00:00:00.0000', + '0000-00-00 00:00:00.00000', + '0000-00-00 00:00:00.000000' +); + +INSERT INTO ancient_times VALUES ( + DEFAULT, + '0001-01-01', + '0001-01-01 16:16:16', + '0001-01-01 16:16:16.1', + '0001-01-01 16:16:16.12', + '0001-01-01 16:16:16.123', + '0001-01-01 16:16:16.1234', + '0001-01-01 16:16:16.12345', + '0001-01-01 16:16:16.123456' +); + +INSERT INTO ancient_times VALUES ( + DEFAULT, + '0002-02-02', + '0002-02-02 15:15:15', + '0002-02-02 15:15:15.1', + '0002-02-02 15:15:15.12', + '0002-02-02 15:15:15.123', + '0002-02-02 15:15:15.1234', + '0002-02-02 15:15:15.12345', + '0002-02-02 15:15:15.123456' +); + +INSERT INTO ancient_times VALUES ( + DEFAULT, + '0033-03-03', + '0033-03-03 14:14:14', + '0033-03-03 14:14:14.1', + '0033-03-03 14:14:14.12', + '0033-03-03 14:14:14.123', + '0033-03-03 14:14:14.1234', + '0033-03-03 14:14:14.12345', + '0033-03-03 14:14:14.123456' +); + +INSERT INTO ancient_times VALUES ( + DEFAULT, + '0444-04-04', + '0444-04-04 13:13:13', + '0444-04-04 13:13:13.1', + '0444-04-04 13:13:13.12', + '0444-04-04 13:13:13.123', + '0444-04-04 13:13:13.1234', + '0444-04-04 13:13:13.12345', + '0444-04-04 13:13:13.123456' +); + +INSERT INTO ancient_times VALUES ( + DEFAULT, + '1969-12-31', + '1969-12-31 12:12:12', + '1969-12-31 12:12:12.1', + '1969-12-31 12:12:12.12', + '1969-12-31 12:12:12.123', + '1969-12-31 12:12:12.1234', + '1969-12-31 12:12:12.12345', + '1969-12-31 12:12:12.123456' +); + +INSERT INTO ancient_times VALUES ( + DEFAULT, + '2019-12-31', + '2019-12-31 23:11:11', + '2019-12-31 23:11:11.1', + '2019-12-31 23:11:11.12', + '2019-12-31 23:11:11.123', + '2019-12-31 23:11:11.1234', + '2019-12-31 23:11:11.12345', + '2019-12-31 23:11:11.123456' +); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/docker/server-allow-ancient-date-time/my.cnf b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/docker/server-allow-ancient-date-time/my.cnf new file mode 100644 index 000000000..ca0483780 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/docker/server-allow-ancient-date-time/my.cnf @@ -0,0 +1,58 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# For advice on how to change settings please see +# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html + +[mysqld] +# +# Remove leading # and set to the amount of RAM for the most important data +# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%. +# innodb_buffer_pool_size = 128M +# +# Remove leading # to turn on a very important data integrity option: logging +# changes to the binary log between backups. +# log_bin +# +# Remove leading # to set options mainly useful for reporting servers. +# The server defaults are faster for transactions and fast SELECTs. +# Adjust sizes as needed, experiment to find the optimal values. +# join_buffer_size = 128M +# sort_buffer_size = 2M +# read_rnd_buffer_size = 2M +skip-host-cache +skip-name-resolve +#datadir=/var/lib/mysql +#socket=/var/lib/mysql/mysql.sock +secure-file-priv=/var/lib/mysql +user=mysql + +# Disabling symbolic-links is recommended to prevent assorted security risks +symbolic-links=0 + +#log-error=/var/log/mysqld.log +#pid-file=/var/run/mysqld/mysqld.pid + +# ---------------------------------------------- +# Enable the binlog for replication & CDC +# ---------------------------------------------- + +# Enable binary replication log and set the prefix, expiration, and log format. +# The prefix is arbitrary, expiration can be short for integration tests but would +# be longer on a production system. Row-level info is required for ingest to work. +# Server ID is required, but this will vary on production systems +server-id = 223344 +log_bin = mysql-bin +expire_logs_days = 1 +binlog_format = row +sql_mode = "STRICT_TRANS_TABLES,NO_ENGINE_SUBSTITUTION" diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java index f77d1aa18..ad7ec458e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java @@ -309,10 +309,12 @@ public abstract class DebeziumEventDeserializationSchema extends SourceRecordEve return TimestampData.fromMillis((Long) dbzObj); case MicroTimestamp.SCHEMA_NAME: long micro = (long) dbzObj; - return TimestampData.fromMillis(micro / 1000, (int) (micro % 1000 * 1000)); + return TimestampData.fromMillis( + Math.floorDiv(micro, 1000), (int) (Math.floorMod(micro, 1000) * 1000)); case NanoTimestamp.SCHEMA_NAME: long nano = (long) dbzObj; - return TimestampData.fromMillis(nano / 1000_000, (int) (nano % 1000_000)); + return TimestampData.fromMillis( + Math.floorDiv(nano, 1000_000), (int) (Math.floorMod(nano, 1000_000))); } } throw new IllegalArgumentException( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java index 6a9642075..b68dcd171 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java @@ -481,12 +481,17 @@ public final class RowDataDebeziumDeserializeSchema return TimestampData.fromEpochMillis((Long) dbzObj); case MicroTimestamp.SCHEMA_NAME: long micro = (long) dbzObj; + // Use Math#floorDiv and Math#floorMod instead of `/` and `%`, because + // timestamp number could be negative if we're handling timestamps prior + // to 1970. return TimestampData.fromEpochMillis( - micro / 1000, (int) (micro % 1000 * 1000)); + Math.floorDiv(micro, 1000), + (int) (Math.floorMod(micro, 1000) * 1000)); case NanoTimestamp.SCHEMA_NAME: long nano = (long) dbzObj; return TimestampData.fromEpochMillis( - nano / 1000_000, (int) (nano % 1000_000)); + Math.floorDiv(nano, 1000_000), + (int) (Math.floorMod(nano, 1000_000))); } } LocalDateTime localDateTime = diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlAncientDateAndTimeITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlAncientDateAndTimeITCase.java new file mode 100644 index 000000000..ca1a91b4e --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlAncientDateAndTimeITCase.java @@ -0,0 +1,521 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.cdc.connectors.mysql.table.StartupOptions; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema; +import org.apache.flink.cdc.debezium.table.RowDataDebeziumDeserializeSchema; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.TypeConversions; +import org.apache.flink.types.RowUtils; +import org.apache.flink.util.CloseableIterator; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.lifecycle.Startables; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Properties; +import java.util.Random; +import java.util.function.BiFunction; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** Integration tests for MySQL Table source to handle ancient date and time records. */ +public class MySqlAncientDateAndTimeITCase extends MySqlSourceTestBase { + private static final Logger LOG = LoggerFactory.getLogger(MySqlAncientDateAndTimeITCase.class); + + private static final String TEST_USER = "mysqluser"; + private static final String TEST_PASSWORD = "mysqlpw"; + + // We need an extra "no_zero_in_date = false" config to insert malformed date and time records. + private static final MySqlContainer MYSQL_CONTAINER = + createMySqlContainer(MySqlVersion.V8_0, "docker/server-allow-ancient-date-time/my.cnf"); + + private final UniqueDatabase ancientDatabase = + new UniqueDatabase(MYSQL_CONTAINER, "ancient_date_and_time", TEST_USER, TEST_PASSWORD); + + private final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + + @BeforeClass + public static void beforeClass() { + LOG.info("Starting MySql container..."); + Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join(); + LOG.info("Container MySql is started."); + } + + @AfterClass + public static void afterClass() { + LOG.info("Stopping MySql containers..."); + MYSQL_CONTAINER.stop(); + LOG.info("Container MySql is stopped."); + } + + @Before + public void before() { + TestValuesTableFactory.clearAllData(); + env.setParallelism(DEFAULT_PARALLELISM); + env.enableCheckpointing(200); + ancientDatabase.createAndInitialize(); + } + + @After + public void after() { + ancientDatabase.dropDatabase(); + } + + /** + * With the TimeAdjuster in Debezium, all date / time records between year 0001 and 0099 will be + * shifted to 1971 ~ 2069. + */ + @Test + public void testAncientDateAndTimeWithTimeAdjusterWithRowDataDeserializer() throws Exception { + // LocalDate.ofEpochDay reference: + // +---------------------------------------------------------------------------------+ + // | 17390 | 11323 | 11720 | 23072 | -557266 | -1 | 18261 | + // | 2017/8/12 | 2001/1/1 | 2002/2/2 | 2033/3/3 | 0444/4/4 | 1969/12/31 | 2019/12/31 | + // +---------------------------------------------------------------------------------+ + runGenericAncientDateAndTimeTest( + MYSQL_CONTAINER, + ancientDatabase, + true, + DeserializerType.ROW_DATA, + Arrays.asList( + "+I[1, 17390, 2016-07-13T17:17:17, 2015-06-14T17:17:17.100, 2014-05-15T17:17:17.120, 2013-04-16T17:17:17.123, 2012-03-17T17:17:17.123400, 2011-02-18T17:17:17.123450, 2010-01-19T17:17:17.123456]", + "+I[2, null, null, null, null, null, null, null, null]", + "+I[3, 11323, 2001-01-01T16:16:16, 2001-01-01T16:16:16.100, 2001-01-01T16:16:16.120, 2001-01-01T16:16:16.123, 2001-01-01T16:16:16.123400, 2001-01-01T16:16:16.123450, 2001-01-01T16:16:16.123456]", + "+I[4, 11720, 2002-02-02T15:15:15, 2002-02-02T15:15:15.100, 2002-02-02T15:15:15.120, 2002-02-02T15:15:15.123, 2002-02-02T15:15:15.123400, 2002-02-02T15:15:15.123450, 2002-02-02T15:15:15.123456]", + "+I[5, 23072, 2033-03-03T14:14:14, 2033-03-03T14:14:14.100, 2033-03-03T14:14:14.120, 2033-03-03T14:14:14.123, 2033-03-03T14:14:14.123400, 2033-03-03T14:14:14.123450, 2033-03-03T14:14:14.123456]", + "+I[6, -557266, 0444-04-04T13:13:13, 0444-04-04T13:13:13.100, 0444-04-04T13:13:13.120, 0444-04-04T13:13:13.123, 0444-04-04T13:13:13.123400, 0444-04-04T13:13:13.123450, 0444-04-04T13:13:13.123456]", + "+I[7, -1, 1969-12-31T12:12:12, 1969-12-31T12:12:12.100, 1969-12-31T12:12:12.120, 1969-12-31T12:12:12.123, 1969-12-31T12:12:12.123400, 1969-12-31T12:12:12.123450, 1969-12-31T12:12:12.123456]", + "+I[8, 18261, 2019-12-31T23:11:11, 2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123, 2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450, 2019-12-31T23:11:11.123456]")); + } + + @Test + public void testAncientDateAndTimeWithoutTimeAdjusterWithRowDataDeserializer() + throws Exception { + // LocalDate.ofEpochDay reference: + // +---------------------------------------------------------------------------------+ + // | -713095 | -719162 | -718765 | -707413 | -557266 | -1 | 18261 | + // | 0017/8/12 | 0001/1/1 | 0002/2/2 | 0033/3/3 | 0444/4/4 | 1969/12/31 | 2019/12/31 | + // +---------------------------------------------------------------------------------+ + runGenericAncientDateAndTimeTest( + MYSQL_CONTAINER, + ancientDatabase, + false, + DeserializerType.ROW_DATA, + Arrays.asList( + "+I[1, -713095, 0016-07-13T17:17:17, 0015-06-14T17:17:17.100, 0014-05-15T17:17:17.120, 0013-04-16T17:17:17.123, 0012-03-17T17:17:17.123400, 0011-02-18T17:17:17.123450, 0010-01-19T17:17:17.123456]", + "+I[2, null, null, null, null, null, null, null, null]", + "+I[3, -719162, 0001-01-01T16:16:16, 0001-01-01T16:16:16.100, 0001-01-01T16:16:16.120, 0001-01-01T16:16:16.123, 0001-01-01T16:16:16.123400, 0001-01-01T16:16:16.123450, 0001-01-01T16:16:16.123456]", + "+I[4, -718765, 0002-02-02T15:15:15, 0002-02-02T15:15:15.100, 0002-02-02T15:15:15.120, 0002-02-02T15:15:15.123, 0002-02-02T15:15:15.123400, 0002-02-02T15:15:15.123450, 0002-02-02T15:15:15.123456]", + "+I[5, -707413, 0033-03-03T14:14:14, 0033-03-03T14:14:14.100, 0033-03-03T14:14:14.120, 0033-03-03T14:14:14.123, 0033-03-03T14:14:14.123400, 0033-03-03T14:14:14.123450, 0033-03-03T14:14:14.123456]", + "+I[6, -557266, 0444-04-04T13:13:13, 0444-04-04T13:13:13.100, 0444-04-04T13:13:13.120, 0444-04-04T13:13:13.123, 0444-04-04T13:13:13.123400, 0444-04-04T13:13:13.123450, 0444-04-04T13:13:13.123456]", + "+I[7, -1, 1969-12-31T12:12:12, 1969-12-31T12:12:12.100, 1969-12-31T12:12:12.120, 1969-12-31T12:12:12.123, 1969-12-31T12:12:12.123400, 1969-12-31T12:12:12.123450, 1969-12-31T12:12:12.123456]", + "+I[8, 18261, 2019-12-31T23:11:11, 2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123, 2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450, 2019-12-31T23:11:11.123456]")); + } + + @Test + public void testAncientDateAndTimeWithTimeAdjusterWithJsonDeserializer() throws Exception { + // LocalDate.ofEpochDay reference: + // + // +---------------------------------------------------------------------------------+ + // | 17390 | 11323 | 11720 | 23072 | -557266 | -1 | 18261 | + // | 2017/8/12 | 2001/1/1 | 2002/2/2 | 2033/3/3 | 0444/4/4 | 1969/12/31 | 2019/12/31 | + // +---------------------------------------------------------------------------------+ + // + // LocalDateTime.ofEpochSecond reference: + // + // Row 1: + // 1468430237000 -> 2016-07-13T17:17:17 + // 1434302237100 -> 2015-06-14T17:17:17.100 + // 1400174237120 -> 2014-05-15T17:17:17.120 + // 1366132637123 -> 2013-04-16T17:17:17.123 + // 1332004637123400 -> 2012-03-17T17:17:17.123400 + // 1298049437123450 -> 2011-02-18T17:17:17.123450 + // 1263921437123456 -> 2010-01-19T17:17:17.123456 + // + // Row 2: + // (null) + // + // Row 3: + // 978365776000 -> 2001-01-01T16:16:16 + // 978365776100 -> 2001-01-01T16:16:16.100 + // 978365776120 -> 2001-01-01T16:16:16.120 + // 978365776123 -> 2001-01-01T16:16:16.123 + // 978365776123400 -> 2001-01-01T16:16:16.123400 + // 978365776123450 -> 2001-01-01T16:16:16.123450 + // 978365776123456 -> 2001-01-01T16:16:16.123456 + // + // Row 4: + // 1012662915000 -> 2002-02-02T15:15:15 + // 1012662915100 -> 2002-02-02T15:15:15.100 + // 1012662915120 -> 2002-02-02T15:15:15.120 + // 1012662915123 -> 2002-02-02T15:15:15.123 + // 1012662915123400 -> 2002-02-02T15:15:15.123400 + // 1012662915123450 -> 2002-02-02T15:15:15.123450 + // 1012662915123456 -> 2002-02-02T15:15:15.123456 + // + // Row 5: + // 1993472054000 -> 2033-03-03T14:14:14 + // 1993472054100 -> 2033-03-03T14:14:14.100 + // 1993472054120 -> 2033-03-03T14:14:14.120 + // 1993472054123 -> 2033-03-03T14:14:14.123 + // 1993472054123400 -> 2033-03-03T14:14:14.123400 + // 1993472054123450 -> 2033-03-03T14:14:14.123450 + // 1993472054123456 -> 2033-03-03T14:14:14.123456 + // + // Row 6: + // -48147734807000 -> 0444-04-04T13:13:13 + // -48147734806900 -> 0444-04-04T13:13:13.100 + // -48147734806880 -> 0444-04-04T13:13:13.120 + // -48147734806877 -> 0444-04-04T13:13:13.123 + // -48147734806876600 -> 0444-04-04T13:13:13.000123400 + // -48147734806876550 -> 0444-04-04T13:13:13.000123450 + // -48147734806876544 -> 0444-04-04T13:13:13.000123456 + // + // Row 7: + // -42468000 -> 1969-12-31T12:12:12 + // -42467900 -> 1969-12-31T12:12:12.100 + // -42467880 -> 1969-12-31T12:12:12.120 + // -42467877 -> 1969-12-31T12:12:12.123 + // -42467876600 -> 1969-12-31T12:12:12.123400 + // -42467876550 -> 1969-12-31T12:12:12.123450 + // -42467876544 -> 1969-12-31T12:12:12.123456 + // + // Row 8: + // 1577833871000 -> 2019-12-31T23:11:11 + // 1577833871100 -> 2019-12-31T23:11:11.100 + // 1577833871120 -> 2019-12-31T23:11:11.120 + // 1577833871123 -> 2019-12-31T23:11:11.123 + // 1577833871123400 -> 2019-12-31T23:11:11.123400 + // 1577833871123450 -> 2019-12-31T23:11:11.123450 + // 1577833871123456 -> 2019-12-31T23:11:11.123456 + + runGenericAncientDateAndTimeTest( + MYSQL_CONTAINER, + ancientDatabase, + true, + DeserializerType.JSON, + Arrays.asList( + "{\"id\":\"AQ==\",\"date_col\":17390,\"datetime_0_col\":1468430237000,\"datetime_1_col\":1434302237100,\"datetime_2_col\":1400174237120,\"datetime_3_col\":1366132637123,\"datetime_4_col\":1332004637123400,\"datetime_5_col\":1298049437123450,\"datetime_6_col\":1263921437123456}", + "{\"id\":\"Ag==\",\"date_col\":null,\"datetime_0_col\":null,\"datetime_1_col\":null,\"datetime_2_col\":null,\"datetime_3_col\":null,\"datetime_4_col\":null,\"datetime_5_col\":null,\"datetime_6_col\":null}", + "{\"id\":\"Aw==\",\"date_col\":11323,\"datetime_0_col\":978365776000,\"datetime_1_col\":978365776100,\"datetime_2_col\":978365776120,\"datetime_3_col\":978365776123,\"datetime_4_col\":978365776123400,\"datetime_5_col\":978365776123450,\"datetime_6_col\":978365776123456}", + "{\"id\":\"BA==\",\"date_col\":11720,\"datetime_0_col\":1012662915000,\"datetime_1_col\":1012662915100,\"datetime_2_col\":1012662915120,\"datetime_3_col\":1012662915123,\"datetime_4_col\":1012662915123400,\"datetime_5_col\":1012662915123450,\"datetime_6_col\":1012662915123456}", + "{\"id\":\"BQ==\",\"date_col\":23072,\"datetime_0_col\":1993472054000,\"datetime_1_col\":1993472054100,\"datetime_2_col\":1993472054120,\"datetime_3_col\":1993472054123,\"datetime_4_col\":1993472054123400,\"datetime_5_col\":1993472054123450,\"datetime_6_col\":1993472054123456}", + "{\"id\":\"Bg==\",\"date_col\":-557266,\"datetime_0_col\":-48147734807000,\"datetime_1_col\":-48147734806900,\"datetime_2_col\":-48147734806880,\"datetime_3_col\":-48147734806877,\"datetime_4_col\":-48147734806876600,\"datetime_5_col\":-48147734806876550,\"datetime_6_col\":-48147734806876544}", + "{\"id\":\"Bw==\",\"date_col\":-1,\"datetime_0_col\":-42468000,\"datetime_1_col\":-42467900,\"datetime_2_col\":-42467880,\"datetime_3_col\":-42467877,\"datetime_4_col\":-42467876600,\"datetime_5_col\":-42467876550,\"datetime_6_col\":-42467876544}", + "{\"id\":\"CA==\",\"date_col\":18261,\"datetime_0_col\":1577833871000,\"datetime_1_col\":1577833871100,\"datetime_2_col\":1577833871120,\"datetime_3_col\":1577833871123,\"datetime_4_col\":1577833871123400,\"datetime_5_col\":1577833871123450,\"datetime_6_col\":1577833871123456}")); + } + + @Test + public void testAncientDateAndTimeWithoutTimeAdjusterWithJsonDeserializer() throws Exception { + // LocalDate.ofEpochDay reference: + // + // +---------------------------------------------------------------------------------+ + // | -713095 | -719162 | -718765 | -707413 | -557266 | -1 | 18261 | + // | 0017/8/12 | 0001/1/1 | 0002/2/2 | 0033/3/3 | 0444/4/4 | 1969/12/31 | 2019/12/31 | + // +---------------------------------------------------------------------------------+ + // + // LocalDateTime.ofEpochSecond reference: + // + // Row 1: + // -61645473763000 -> 0016-07-13T17:17:17 + // -61679601762900 -> 0015-06-14T17:17:17.100 + // -61713729762880 -> 0014-05-15T17:17:17.120 + // -61747771362877 -> 0013-04-16T17:17:17.123 + // -61781899362876600 -> 0012-03-17T17:17:17.123400 + // -61815854562876550 -> 0011-02-18T17:17:17.123450 + // -61849982562876544 -> 0010-01-19T17:17:17.123456 + // + // Row 2: + // (null) + // + // Row 3: + // -62135538224000 -> 0001-01-01T16:16:16 + // -62135538223900 -> 0001-01-01T16:16:16.100 + // -62135538223880 -> 0001-01-01T16:16:16.120 + // -62135538223877 -> 0001-01-01T16:16:16.123 + // -62135538223876600 -> 0001-01-01T16:16:16.123400 + // -62135538223876550 -> 0001-01-01T16:16:16.123450 + // -62135538223876544 -> 0001-01-01T16:16:16.123456 + // + // Row 4: + // -62101241085000 -> 0002-02-02T15:15:15 + // -62101241084900 -> 0002-02-02T15:15:15.100 + // -62101241084880 -> 0002-02-02T15:15:15.120 + // -62101241084877 -> 0002-02-02T15:15:15.123 + // -62101241084876600 -> 0002-02-02T15:15:15.123400 + // -62101241084876550 -> 0002-02-02T15:15:15.123450 + // -62101241084876544 -> 0002-02-02T15:15:15.123456 + // + // Row 5: + // -61120431946000 -> 0033-03-03T14:14:14 + // -61120431945900 -> 0033-03-03T14:14:14.100 + // -61120431945880 -> 0033-03-03T14:14:14.120 + // -61120431945877 -> 0033-03-03T14:14:14.123 + // -61120431945876600 -> 0033-03-03T14:14:14.123400 + // -61120431945876550 -> 0033-03-03T14:14:14.123450 + // -61120431945876544 -> 0033-03-03T14:14:14.123456 + // + // + // Row 6: + // -48147734807000 -> 0444-04-04T13:13:13 + // -48147734806900 -> 0444-04-04T13:13:13.100 + // -48147734806880 -> 0444-04-04T13:13:13.120 + // -48147734806877 -> 0444-04-04T13:13:13.123 + // -48147734806876600 -> 0444-04-04T13:13:13.000123400 + // -48147734806876550 -> 0444-04-04T13:13:13.000123450 + // -48147734806876544 -> 0444-04-04T13:13:13.000123456 + // + // Row 7: + // -42468000 -> 1969-12-31T12:12:12 + // -42467900 -> 1969-12-31T12:12:12.100 + // -42467880 -> 1969-12-31T12:12:12.120 + // -42467877 -> 1969-12-31T12:12:12.123 + // -42467876600 -> 1969-12-31T12:12:12.123400 + // -42467876550 -> 1969-12-31T12:12:12.123450 + // -42467876544 -> 1969-12-31T12:12:12.123456 + // + // Row 8: + // 1577833871000 -> 2019-12-31T23:11:11 + // 1577833871100 -> 2019-12-31T23:11:11.100 + // 1577833871120 -> 2019-12-31T23:11:11.120 + // 1577833871123 -> 2019-12-31T23:11:11.123 + // 1577833871123400 -> 2019-12-31T23:11:11.123400 + // 1577833871123450 -> 2019-12-31T23:11:11.123450 + // 1577833871123456 -> 2019-12-31T23:11:11.123456 + + runGenericAncientDateAndTimeTest( + MYSQL_CONTAINER, + ancientDatabase, + false, + DeserializerType.JSON, + Arrays.asList( + "{\"id\":\"AQ==\",\"date_col\":-713095,\"datetime_0_col\":-61645473763000,\"datetime_1_col\":-61679601762900,\"datetime_2_col\":-61713729762880,\"datetime_3_col\":-61747771362877,\"datetime_4_col\":-61781899362876600,\"datetime_5_col\":-61815854562876550,\"datetime_6_col\":-61849982562876544}", + "{\"id\":\"Ag==\",\"date_col\":null,\"datetime_0_col\":null,\"datetime_1_col\":null,\"datetime_2_col\":null,\"datetime_3_col\":null,\"datetime_4_col\":null,\"datetime_5_col\":null,\"datetime_6_col\":null}", + "{\"id\":\"Aw==\",\"date_col\":-719162,\"datetime_0_col\":-62135538224000,\"datetime_1_col\":-62135538223900,\"datetime_2_col\":-62135538223880,\"datetime_3_col\":-62135538223877,\"datetime_4_col\":-62135538223876600,\"datetime_5_col\":-62135538223876550,\"datetime_6_col\":-62135538223876544}", + "{\"id\":\"BA==\",\"date_col\":-718765,\"datetime_0_col\":-62101241085000,\"datetime_1_col\":-62101241084900,\"datetime_2_col\":-62101241084880,\"datetime_3_col\":-62101241084877,\"datetime_4_col\":-62101241084876600,\"datetime_5_col\":-62101241084876550,\"datetime_6_col\":-62101241084876544}", + "{\"id\":\"BQ==\",\"date_col\":-707413,\"datetime_0_col\":-61120431946000,\"datetime_1_col\":-61120431945900,\"datetime_2_col\":-61120431945880,\"datetime_3_col\":-61120431945877,\"datetime_4_col\":-61120431945876600,\"datetime_5_col\":-61120431945876550,\"datetime_6_col\":-61120431945876544}", + "{\"id\":\"Bg==\",\"date_col\":-557266,\"datetime_0_col\":-48147734807000,\"datetime_1_col\":-48147734806900,\"datetime_2_col\":-48147734806880,\"datetime_3_col\":-48147734806877,\"datetime_4_col\":-48147734806876600,\"datetime_5_col\":-48147734806876550,\"datetime_6_col\":-48147734806876544}", + "{\"id\":\"Bw==\",\"date_col\":-1,\"datetime_0_col\":-42468000,\"datetime_1_col\":-42467900,\"datetime_2_col\":-42467880,\"datetime_3_col\":-42467877,\"datetime_4_col\":-42467876600,\"datetime_5_col\":-42467876550,\"datetime_6_col\":-42467876544}", + "{\"id\":\"CA==\",\"date_col\":18261,\"datetime_0_col\":1577833871000,\"datetime_1_col\":1577833871100,\"datetime_2_col\":1577833871120,\"datetime_3_col\":1577833871123,\"datetime_4_col\":1577833871123400,\"datetime_5_col\":1577833871123450,\"datetime_6_col\":1577833871123456}")); + } + + private void runGenericAncientDateAndTimeTest( + MySqlContainer container, + UniqueDatabase database, + boolean enableTimeAdjuster, + DeserializerType deserializerType, + List<String> expectedResults) + throws Exception { + + switch (deserializerType) { + case ROW_DATA: + { + // Build deserializer + DataType dataType = + DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.INT()), + DataTypes.FIELD("date_col", DataTypes.DATE()), + DataTypes.FIELD("datetime_0_col", DataTypes.TIMESTAMP(0)), + DataTypes.FIELD("datetime_1_col", DataTypes.TIMESTAMP(1)), + DataTypes.FIELD("datetime_2_col", DataTypes.TIMESTAMP(2)), + DataTypes.FIELD("datetime_3_col", DataTypes.TIMESTAMP(3)), + DataTypes.FIELD("datetime_4_col", DataTypes.TIMESTAMP(4)), + DataTypes.FIELD("datetime_5_col", DataTypes.TIMESTAMP(5)), + DataTypes.FIELD("datetime_6_col", DataTypes.TIMESTAMP(6))); + LogicalType logicalType = TypeConversions.fromDataToLogicalType(dataType); + InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(logicalType); + RowDataDebeziumDeserializeSchema deserializer = + RowDataDebeziumDeserializeSchema.newBuilder() + .setPhysicalRowType((RowType) dataType.getLogicalType()) + .setResultTypeInfo(typeInfo) + .build(); + + Properties dbzProperties = new Properties(); + dbzProperties.put("enable.time.adjuster", String.valueOf(enableTimeAdjuster)); + // Build source + MySqlSource<RowData> mySqlSource = + MySqlSource.<RowData>builder() + .hostname(container.getHost()) + .port(container.getDatabasePort()) + .databaseList(database.getDatabaseName()) + .serverTimeZone("UTC") + .tableList(database.getDatabaseName() + ".ancient_times") + .username(database.getUsername()) + .password(database.getPassword()) + .serverId(getServerId()) + .deserializer(deserializer) + .startupOptions(StartupOptions.initial()) + .debeziumProperties(dbzProperties) + .build(); + + try (CloseableIterator<RowData> iterator = + env.fromSource( + mySqlSource, + WatermarkStrategy.noWatermarks(), + "Fetch results") + .executeAndCollect()) { + List<RowData> results = fetchRows(iterator, expectedResults.size()); + Assertions.assertThat(convertRowDataToRowString(results)) + .containsExactlyInAnyOrderElementsOf(expectedResults); + } + } + break; + case JSON: + { + JsonDebeziumDeserializationSchema deserializer = + new JsonDebeziumDeserializationSchema(); + + Properties dbzProperties = new Properties(); + dbzProperties.put("enable.time.adjuster", String.valueOf(enableTimeAdjuster)); + // Build source + MySqlSource<String> mySqlSource = + MySqlSource.<String>builder() + .hostname(container.getHost()) + .port(container.getDatabasePort()) + .databaseList(database.getDatabaseName()) + .serverTimeZone("UTC") + .tableList(database.getDatabaseName() + ".ancient_times") + .username(database.getUsername()) + .password(database.getPassword()) + .serverId(getServerId()) + .deserializer(deserializer) + .startupOptions(StartupOptions.initial()) + .debeziumProperties(dbzProperties) + .build(); + + try (CloseableIterator<String> iterator = + env.fromSource( + mySqlSource, + WatermarkStrategy.noWatermarks(), + "Fetch results") + .executeAndCollect()) { + List<String> results = fetchRows(iterator, expectedResults.size()); + Assertions.assertThat(convertJsonToRowString(results)) + .containsExactlyInAnyOrderElementsOf(expectedResults); + } + } + break; + default: + throw new IllegalArgumentException( + "Unknown deserializer type: " + deserializerType); + } + } + + private static <T> List<T> fetchRows(Iterator<T> iter, int size) { + List<T> rows = new ArrayList<>(size); + while (size > 0 && iter.hasNext()) { + T row = iter.next(); + rows.add(row); + size--; + } + return rows; + } + + private static List<String> convertRowDataToRowString(List<RowData> rows) { + LinkedHashMap<String, Integer> map = new LinkedHashMap<>(); + map.put("id_col", 0); + map.put("date_col", 1); + map.put("datetime_0_col", 2); + map.put("datetime_1_col", 3); + map.put("datetime_2_col", 4); + map.put("datetime_3_col", 5); + map.put("datetime_4_col", 6); + map.put("datetime_5_col", 7); + map.put("datetime_6_col", 8); + return rows.stream() + .map( + row -> + RowUtils.createRowWithNamedPositions( + row.getRowKind(), + new Object[] { + wrap(row, 0, RowData::getInt), + wrap(row, 1, RowData::getInt), + wrap(row, 2, (r, i) -> r.getTimestamp(i, 0)), + wrap(row, 3, (r, i) -> r.getTimestamp(i, 1)), + wrap(row, 4, (r, i) -> r.getTimestamp(i, 2)), + wrap(row, 5, (r, i) -> r.getTimestamp(i, 3)), + wrap(row, 6, (r, i) -> r.getTimestamp(i, 4)), + wrap(row, 7, (r, i) -> r.getTimestamp(i, 5)), + wrap(row, 8, (r, i) -> r.getTimestamp(i, 6)) + }, + map) + .toString()) + .collect(Collectors.toList()); + } + + private static List<String> convertJsonToRowString(List<String> rows) { + ObjectMapper mapper = new ObjectMapper(); + return rows.stream() + .map( + row -> { + try { + JsonNode node = mapper.readTree(row); + return node.get("after").toString(); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toList()); + } + + private static <T> Object wrap(RowData row, int index, BiFunction<RowData, Integer, T> getter) { + if (row.isNullAt(index)) { + return null; + } + return getter.apply(row, index); + } + + private String getServerId() { + final Random random = new Random(); + int serverId = random.nextInt(100) + 5400; + return serverId + "-" + (serverId + env.getParallelism()); + } + + enum DeserializerType { + JSON, + ROW_DATA + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlAncientDateAndTimeITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlAncientDateAndTimeITCase.java new file mode 100644 index 000000000..b9ceaa783 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlAncientDateAndTimeITCase.java @@ -0,0 +1,363 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.table; + +import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; + +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.lifecycle.Startables; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.stream.Stream; + +import static org.apache.flink.api.common.JobStatus.RUNNING; + +/** Integration tests for MySQL Table source to handle ancient date and time records. */ +@RunWith(Parameterized.class) +public class MySqlAncientDateAndTimeITCase extends MySqlSourceTestBase { + private static final Logger LOG = LoggerFactory.getLogger(MySqlAncientDateAndTimeITCase.class); + + private static final String TEST_USER = "mysqluser"; + private static final String TEST_PASSWORD = "mysqlpw"; + + // We need an extra "no_zero_in_date = false" config to insert malformed date and time records. + private static final MySqlContainer MYSQL_CONTAINER = + createMySqlContainer(MySqlVersion.V8_0, "docker/server-allow-ancient-date-time/my.cnf"); + + private final UniqueDatabase ancientDatabase = + new UniqueDatabase(MYSQL_CONTAINER, "ancient_date_and_time", TEST_USER, TEST_PASSWORD); + + private final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + private final StreamTableEnvironment tEnv = + StreamTableEnvironment.create( + env, EnvironmentSettings.newInstance().inStreamingMode().build()); + + private final boolean incrementalSnapshot; + + @Parameterized.Parameters(name = "incrementalSnapshot: {0}") + public static Object[] parameters() { + return new Object[][] {new Object[] {false}, new Object[] {true}}; + } + + public MySqlAncientDateAndTimeITCase(boolean incrementalSnapshot) { + this.incrementalSnapshot = incrementalSnapshot; + } + + @BeforeClass + public static void beforeClass() { + LOG.info("Starting MySql container..."); + Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join(); + LOG.info("Container MySql is started."); + } + + @AfterClass + public static void afterClass() { + LOG.info("Stopping MySql containers..."); + MYSQL_CONTAINER.stop(); + LOG.info("Container MySql is stopped."); + } + + @Before + public void before() { + TestValuesTableFactory.clearAllData(); + if (incrementalSnapshot) { + env.setParallelism(DEFAULT_PARALLELISM); + env.enableCheckpointing(200); + } else { + env.setParallelism(1); + } + ancientDatabase.createAndInitialize(); + } + + @After + public void after() { + ancientDatabase.dropDatabase(); + } + + /** + * With the TimeAdjuster in Debezium, all date / time records between year 0001 and 0099 will be + * shifted to 1971 ~ 2069. + */ + @Test + public void testAncientDateAndTimeWithTimeAdjuster() throws Exception { + runGenericAncientDateAndTimeTest( + MYSQL_CONTAINER, + ancientDatabase, + incrementalSnapshot, + true, + Arrays.asList( + "+I[1, 2017-08-12, 2016-07-13T17:17:17, 2015-06-14T17:17:17.100, 2014-05-15T17:17:17.120, 2013-04-16T17:17:17.123, 2012-03-17T17:17:17.123400, 2011-02-18T17:17:17.123450, 2010-01-19T17:17:17.123456]", + "+I[2, null, null, null, null, null, null, null, null]", + "+I[3, 2001-01-01, 2001-01-01T16:16:16, 2001-01-01T16:16:16.100, 2001-01-01T16:16:16.120, 2001-01-01T16:16:16.123, 2001-01-01T16:16:16.123400, 2001-01-01T16:16:16.123450, 2001-01-01T16:16:16.123456]", + "+I[4, 2002-02-02, 2002-02-02T15:15:15, 2002-02-02T15:15:15.100, 2002-02-02T15:15:15.120, 2002-02-02T15:15:15.123, 2002-02-02T15:15:15.123400, 2002-02-02T15:15:15.123450, 2002-02-02T15:15:15.123456]", + "+I[5, 2033-03-03, 2033-03-03T14:14:14, 2033-03-03T14:14:14.100, 2033-03-03T14:14:14.120, 2033-03-03T14:14:14.123, 2033-03-03T14:14:14.123400, 2033-03-03T14:14:14.123450, 2033-03-03T14:14:14.123456]", + "+I[6, 0444-04-04, 0444-04-04T13:13:13, 0444-04-04T13:13:13.100, 0444-04-04T13:13:13.120, 0444-04-04T13:13:13.123, 0444-04-04T13:13:13.123400, 0444-04-04T13:13:13.123450, 0444-04-04T13:13:13.123456]", + "+I[7, 1969-12-31, 1969-12-31T12:12:12, 1969-12-31T12:12:12.100, 1969-12-31T12:12:12.120, 1969-12-31T12:12:12.123, 1969-12-31T12:12:12.123400, 1969-12-31T12:12:12.123450, 1969-12-31T12:12:12.123456]", + "+I[8, 2019-12-31, 2019-12-31T23:11:11, 2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123, 2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450, 2019-12-31T23:11:11.123456]"), + Arrays.asList( + "+I[9, 2017-08-12, 2016-07-13T17:17:17, 2015-06-14T17:17:17.100, 2014-05-15T17:17:17.120, 2013-04-16T17:17:17.123, 2012-03-17T17:17:17.123400, 2011-02-18T17:17:17.123450, 2010-01-19T17:17:17.123456]", + "+I[10, null, null, null, null, null, null, null, null]", + "+I[11, 2001-01-01, 2001-01-01T16:16:16, 2001-01-01T16:16:16.100, 2001-01-01T16:16:16.120, 2001-01-01T16:16:16.123, 2001-01-01T16:16:16.123400, 2001-01-01T16:16:16.123450, 2001-01-01T16:16:16.123456]", + "+I[12, 2002-02-02, 2002-02-02T15:15:15, 2002-02-02T15:15:15.100, 2002-02-02T15:15:15.120, 2002-02-02T15:15:15.123, 2002-02-02T15:15:15.123400, 2002-02-02T15:15:15.123450, 2002-02-02T15:15:15.123456]", + "+I[13, 2033-03-03, 2033-03-03T14:14:14, 2033-03-03T14:14:14.100, 2033-03-03T14:14:14.120, 2033-03-03T14:14:14.123, 2033-03-03T14:14:14.123400, 2033-03-03T14:14:14.123450, 2033-03-03T14:14:14.123456]", + "+I[14, 0444-04-04, 0444-04-04T13:13:13, 0444-04-04T13:13:13.100, 0444-04-04T13:13:13.120, 0444-04-04T13:13:13.123, 0444-04-04T13:13:13.123400, 0444-04-04T13:13:13.123450, 0444-04-04T13:13:13.123456]", + "+I[15, 1969-12-31, 1969-12-31T12:12:12, 1969-12-31T12:12:12.100, 1969-12-31T12:12:12.120, 1969-12-31T12:12:12.123, 1969-12-31T12:12:12.123400, 1969-12-31T12:12:12.123450, 1969-12-31T12:12:12.123456]", + "+I[16, 2019-12-31, 2019-12-31T23:11:11, 2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123, 2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450, 2019-12-31T23:11:11.123456]")); + } + + @Test + public void testAncientDateAndTimeWithoutTimeAdjuster() throws Exception { + runGenericAncientDateAndTimeTest( + MYSQL_CONTAINER, + ancientDatabase, + incrementalSnapshot, + false, + Arrays.asList( + "+I[1, 0017-08-12, 0016-07-13T17:17:17, 0015-06-14T17:17:17.100, 0014-05-15T17:17:17.120, 0013-04-16T17:17:17.123, 0012-03-17T17:17:17.123400, 0011-02-18T17:17:17.123450, 0010-01-19T17:17:17.123456]", + "+I[2, null, null, null, null, null, null, null, null]", + "+I[3, 0001-01-01, 0001-01-01T16:16:16, 0001-01-01T16:16:16.100, 0001-01-01T16:16:16.120, 0001-01-01T16:16:16.123, 0001-01-01T16:16:16.123400, 0001-01-01T16:16:16.123450, 0001-01-01T16:16:16.123456]", + "+I[4, 0002-02-02, 0002-02-02T15:15:15, 0002-02-02T15:15:15.100, 0002-02-02T15:15:15.120, 0002-02-02T15:15:15.123, 0002-02-02T15:15:15.123400, 0002-02-02T15:15:15.123450, 0002-02-02T15:15:15.123456]", + "+I[5, 0033-03-03, 0033-03-03T14:14:14, 0033-03-03T14:14:14.100, 0033-03-03T14:14:14.120, 0033-03-03T14:14:14.123, 0033-03-03T14:14:14.123400, 0033-03-03T14:14:14.123450, 0033-03-03T14:14:14.123456]", + "+I[6, 0444-04-04, 0444-04-04T13:13:13, 0444-04-04T13:13:13.100, 0444-04-04T13:13:13.120, 0444-04-04T13:13:13.123, 0444-04-04T13:13:13.123400, 0444-04-04T13:13:13.123450, 0444-04-04T13:13:13.123456]", + "+I[7, 1969-12-31, 1969-12-31T12:12:12, 1969-12-31T12:12:12.100, 1969-12-31T12:12:12.120, 1969-12-31T12:12:12.123, 1969-12-31T12:12:12.123400, 1969-12-31T12:12:12.123450, 1969-12-31T12:12:12.123456]", + "+I[8, 2019-12-31, 2019-12-31T23:11:11, 2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123, 2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450, 2019-12-31T23:11:11.123456]"), + Arrays.asList( + "+I[9, 0017-08-12, 0016-07-13T17:17:17, 0015-06-14T17:17:17.100, 0014-05-15T17:17:17.120, 0013-04-16T17:17:17.123, 0012-03-17T17:17:17.123400, 0011-02-18T17:17:17.123450, 0010-01-19T17:17:17.123456]", + "+I[10, null, null, null, null, null, null, null, null]", + "+I[11, 0001-01-01, 0001-01-01T16:16:16, 0001-01-01T16:16:16.100, 0001-01-01T16:16:16.120, 0001-01-01T16:16:16.123, 0001-01-01T16:16:16.123400, 0001-01-01T16:16:16.123450, 0001-01-01T16:16:16.123456]", + "+I[12, 0002-02-02, 0002-02-02T15:15:15, 0002-02-02T15:15:15.100, 0002-02-02T15:15:15.120, 0002-02-02T15:15:15.123, 0002-02-02T15:15:15.123400, 0002-02-02T15:15:15.123450, 0002-02-02T15:15:15.123456]", + "+I[13, 0033-03-03, 0033-03-03T14:14:14, 0033-03-03T14:14:14.100, 0033-03-03T14:14:14.120, 0033-03-03T14:14:14.123, 0033-03-03T14:14:14.123400, 0033-03-03T14:14:14.123450, 0033-03-03T14:14:14.123456]", + "+I[14, 0444-04-04, 0444-04-04T13:13:13, 0444-04-04T13:13:13.100, 0444-04-04T13:13:13.120, 0444-04-04T13:13:13.123, 0444-04-04T13:13:13.123400, 0444-04-04T13:13:13.123450, 0444-04-04T13:13:13.123456]", + "+I[15, 1969-12-31, 1969-12-31T12:12:12, 1969-12-31T12:12:12.100, 1969-12-31T12:12:12.120, 1969-12-31T12:12:12.123, 1969-12-31T12:12:12.123400, 1969-12-31T12:12:12.123450, 1969-12-31T12:12:12.123456]", + "+I[16, 2019-12-31, 2019-12-31T23:11:11, 2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123, 2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450, 2019-12-31T23:11:11.123456]")); + } + + private void runGenericAncientDateAndTimeTest( + MySqlContainer container, + UniqueDatabase database, + boolean incrementalSnapshot, + boolean enableTimeAdjuster, + List<String> expectedSnapshotResults, + List<String> expectedStreamingResults) + throws Exception { + String sourceDDL = + String.format( + "CREATE TABLE ancient_db (" + + " `id` INT NOT NULL," + + " date_col DATE," + + " datetime_0_col TIMESTAMP(0)," + + " datetime_1_col TIMESTAMP(1)," + + " datetime_2_col TIMESTAMP(2)," + + " datetime_3_col TIMESTAMP(3)," + + " datetime_4_col TIMESTAMP(4)," + + " datetime_5_col TIMESTAMP(5)," + + " datetime_6_col TIMESTAMP(6)," + + " primary key (`id`) not enforced" + + ") WITH (" + + " 'connector' = 'mysql-cdc'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'table-name' = '%s'," + + " 'scan.incremental.snapshot.enabled' = '%s'," + + " 'server-time-zone' = 'UTC'," + + " 'server-id' = '%s'," + + " 'debezium.enable.time.adjuster' = '%s'" + + ")", + container.getHost(), + container.getDatabasePort(), + TEST_USER, + TEST_PASSWORD, + database.getDatabaseName(), + "ancient_times", + incrementalSnapshot, + getServerId(), + enableTimeAdjuster); + + tEnv.executeSql(sourceDDL); + + TableResult result = tEnv.executeSql("SELECT * FROM ancient_db"); + do { + Thread.sleep(5000L); + } while (result.getJobClient().get().getJobStatus().get() != RUNNING); + + CloseableIterator<Row> iterator = result.collect(); + + List<String> expectedRows = new ArrayList<>(expectedSnapshotResults); + + Assertions.assertThat(fetchRows(iterator, expectedRows.size())) + .containsExactlyInAnyOrderElementsOf(expectedRows); + + createBinlogEvents(database); + + Assertions.assertThat(fetchRows(iterator, expectedStreamingResults.size())) + .containsExactlyInAnyOrderElementsOf(expectedStreamingResults); + result.getJobClient().get().cancel().get(); + } + + private static void createBinlogEvents(UniqueDatabase database) throws SQLException { + // Test reading identical data in binlog stage again + try (Connection connection = database.getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + "INSERT INTO ancient_times VALUES (\n" + + " DEFAULT,\n" + + " '0017-08-12',\n" + + " '0016-07-13 17:17:17',\n" + + " '0015-06-14 17:17:17.1',\n" + + " '0014-05-15 17:17:17.12',\n" + + " '0013-04-16 17:17:17.123',\n" + + " '0012-03-17 17:17:17.1234',\n" + + " '0011-02-18 17:17:17.12345',\n" + + " '0010-01-19 17:17:17.123456'\n" + + ");"); + statement.execute( + "INSERT INTO ancient_times VALUES (\n" + + " DEFAULT,\n" + + " '0000-00-00',\n" + + " '0000-00-00 00:00:00',\n" + + " '0000-00-00 00:00:00.0',\n" + + " '0000-00-00 00:00:00.00',\n" + + " '0000-00-00 00:00:00.000',\n" + + " '0000-00-00 00:00:00.0000',\n" + + " '0000-00-00 00:00:00.00000',\n" + + " '0000-00-00 00:00:00.000000'\n" + + ");"); + statement.execute( + "INSERT INTO ancient_times VALUES (\n" + + " DEFAULT,\n" + + " '0001-01-01',\n" + + " '0001-01-01 16:16:16',\n" + + " '0001-01-01 16:16:16.1',\n" + + " '0001-01-01 16:16:16.12',\n" + + " '0001-01-01 16:16:16.123',\n" + + " '0001-01-01 16:16:16.1234',\n" + + " '0001-01-01 16:16:16.12345',\n" + + " '0001-01-01 16:16:16.123456'\n" + + ");"); + statement.execute( + "INSERT INTO ancient_times VALUES (\n" + + " DEFAULT,\n" + + " '0002-02-02',\n" + + " '0002-02-02 15:15:15',\n" + + " '0002-02-02 15:15:15.1',\n" + + " '0002-02-02 15:15:15.12',\n" + + " '0002-02-02 15:15:15.123',\n" + + " '0002-02-02 15:15:15.1234',\n" + + " '0002-02-02 15:15:15.12345',\n" + + " '0002-02-02 15:15:15.123456'\n" + + ");"); + statement.execute( + "INSERT INTO ancient_times VALUES (\n" + + " DEFAULT,\n" + + " '0033-03-03',\n" + + " '0033-03-03 14:14:14',\n" + + " '0033-03-03 14:14:14.1',\n" + + " '0033-03-03 14:14:14.12',\n" + + " '0033-03-03 14:14:14.123',\n" + + " '0033-03-03 14:14:14.1234',\n" + + " '0033-03-03 14:14:14.12345',\n" + + " '0033-03-03 14:14:14.123456'\n" + + ");"); + statement.execute( + "INSERT INTO ancient_times VALUES (\n" + + " DEFAULT,\n" + + " '0444-04-04',\n" + + " '0444-04-04 13:13:13',\n" + + " '0444-04-04 13:13:13.1',\n" + + " '0444-04-04 13:13:13.12',\n" + + " '0444-04-04 13:13:13.123',\n" + + " '0444-04-04 13:13:13.1234',\n" + + " '0444-04-04 13:13:13.12345',\n" + + " '0444-04-04 13:13:13.123456'\n" + + ");"); + statement.execute( + "INSERT INTO ancient_times VALUES (\n" + + " DEFAULT,\n" + + " '1969-12-31',\n" + + " '1969-12-31 12:12:12',\n" + + " '1969-12-31 12:12:12.1',\n" + + " '1969-12-31 12:12:12.12',\n" + + " '1969-12-31 12:12:12.123',\n" + + " '1969-12-31 12:12:12.1234',\n" + + " '1969-12-31 12:12:12.12345',\n" + + " '1969-12-31 12:12:12.123456'\n" + + ");"); + statement.execute( + "INSERT INTO ancient_times VALUES (\n" + + " DEFAULT,\n" + + " '2019-12-31',\n" + + " '2019-12-31 23:11:11',\n" + + " '2019-12-31 23:11:11.1',\n" + + " '2019-12-31 23:11:11.12',\n" + + " '2019-12-31 23:11:11.123',\n" + + " '2019-12-31 23:11:11.1234',\n" + + " '2019-12-31 23:11:11.12345',\n" + + " '2019-12-31 23:11:11.123456'\n" + + ");"); + } + } + + private static List<String> fetchRows(Iterator<Row> iter, int size) { + List<String> rows = new ArrayList<>(size); + while (size > 0 && iter.hasNext()) { + Row row = iter.next(); + rows.add(row.toString()); + size--; + } + return rows; + } + + private String getServerId() { + final Random random = new Random(); + int serverId = random.nextInt(100) + 5400; + if (incrementalSnapshot) { + return serverId + "-" + (serverId + env.getParallelism()); + } + return String.valueOf(serverId); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/ancient_date_and_time.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/ancient_date_and_time.sql new file mode 100644 index 000000000..e4ff4e595 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/ancient_date_and_time.sql @@ -0,0 +1,124 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +CREATE TABLE ancient_times +( + id SERIAL, + date_col DATE, + datetime_0_col DATETIME(0), + datetime_1_col DATETIME(1), + datetime_2_col DATETIME(2), + datetime_3_col DATETIME(3), + datetime_4_col DATETIME(4), + datetime_5_col DATETIME(5), + datetime_6_col DATETIME(6), + PRIMARY KEY (id) +); + +INSERT INTO ancient_times VALUES ( + DEFAULT, + '0017-08-12', + '0016-07-13 17:17:17', + '0015-06-14 17:17:17.1', + '0014-05-15 17:17:17.12', + '0013-04-16 17:17:17.123', + '0012-03-17 17:17:17.1234', + '0011-02-18 17:17:17.12345', + '0010-01-19 17:17:17.123456' +); + +INSERT INTO ancient_times VALUES ( + DEFAULT, + '0000-00-00', + '0000-00-00 00:00:00', + '0000-00-00 00:00:00.0', + '0000-00-00 00:00:00.00', + '0000-00-00 00:00:00.000', + '0000-00-00 00:00:00.0000', + '0000-00-00 00:00:00.00000', + '0000-00-00 00:00:00.000000' +); + +INSERT INTO ancient_times VALUES ( + DEFAULT, + '0001-01-01', + '0001-01-01 16:16:16', + '0001-01-01 16:16:16.1', + '0001-01-01 16:16:16.12', + '0001-01-01 16:16:16.123', + '0001-01-01 16:16:16.1234', + '0001-01-01 16:16:16.12345', + '0001-01-01 16:16:16.123456' +); + +INSERT INTO ancient_times VALUES ( + DEFAULT, + '0002-02-02', + '0002-02-02 15:15:15', + '0002-02-02 15:15:15.1', + '0002-02-02 15:15:15.12', + '0002-02-02 15:15:15.123', + '0002-02-02 15:15:15.1234', + '0002-02-02 15:15:15.12345', + '0002-02-02 15:15:15.123456' +); + +INSERT INTO ancient_times VALUES ( + DEFAULT, + '0033-03-03', + '0033-03-03 14:14:14', + '0033-03-03 14:14:14.1', + '0033-03-03 14:14:14.12', + '0033-03-03 14:14:14.123', + '0033-03-03 14:14:14.1234', + '0033-03-03 14:14:14.12345', + '0033-03-03 14:14:14.123456' +); + +INSERT INTO ancient_times VALUES ( + DEFAULT, + '0444-04-04', + '0444-04-04 13:13:13', + '0444-04-04 13:13:13.1', + '0444-04-04 13:13:13.12', + '0444-04-04 13:13:13.123', + '0444-04-04 13:13:13.1234', + '0444-04-04 13:13:13.12345', + '0444-04-04 13:13:13.123456' +); + +INSERT INTO ancient_times VALUES ( + DEFAULT, + '1969-12-31', + '1969-12-31 12:12:12', + '1969-12-31 12:12:12.1', + '1969-12-31 12:12:12.12', + '1969-12-31 12:12:12.123', + '1969-12-31 12:12:12.1234', + '1969-12-31 12:12:12.12345', + '1969-12-31 12:12:12.123456' +); + +INSERT INTO ancient_times VALUES ( + DEFAULT, + '2019-12-31', + '2019-12-31 23:11:11', + '2019-12-31 23:11:11.1', + '2019-12-31 23:11:11.12', + '2019-12-31 23:11:11.123', + '2019-12-31 23:11:11.1234', + '2019-12-31 23:11:11.12345', + '2019-12-31 23:11:11.123456' +); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/docker/server-allow-ancient-date-time/my.cnf b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/docker/server-allow-ancient-date-time/my.cnf new file mode 100644 index 000000000..ca0483780 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/docker/server-allow-ancient-date-time/my.cnf @@ -0,0 +1,58 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# For advice on how to change settings please see +# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html + +[mysqld] +# +# Remove leading # and set to the amount of RAM for the most important data +# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%. +# innodb_buffer_pool_size = 128M +# +# Remove leading # to turn on a very important data integrity option: logging +# changes to the binary log between backups. +# log_bin +# +# Remove leading # to set options mainly useful for reporting servers. +# The server defaults are faster for transactions and fast SELECTs. +# Adjust sizes as needed, experiment to find the optimal values. +# join_buffer_size = 128M +# sort_buffer_size = 2M +# read_rnd_buffer_size = 2M +skip-host-cache +skip-name-resolve +#datadir=/var/lib/mysql +#socket=/var/lib/mysql/mysql.sock +secure-file-priv=/var/lib/mysql +user=mysql + +# Disabling symbolic-links is recommended to prevent assorted security risks +symbolic-links=0 + +#log-error=/var/log/mysqld.log +#pid-file=/var/run/mysqld/mysqld.pid + +# ---------------------------------------------- +# Enable the binlog for replication & CDC +# ---------------------------------------------- + +# Enable binary replication log and set the prefix, expiration, and log format. +# The prefix is arbitrary, expiration can be short for integration tests but would +# be longer on a production system. Row-level info is required for ingest to work. +# Server ID is required, but this will vary on production systems +server-id = 223344 +log_bin = mysql-bin +expire_logs_days = 1 +binlog_format = row +sql_mode = "STRICT_TRANS_TABLES,NO_ENGINE_SUBSTITUTION"